What is Pulsar?
Apache Pulsar is a cloud-native, multi-tenant, high-performance solution for server-to-server messaging and queuing built on the publisher-subscribe (pub-sub) pattern.
3 Concepts
Messages
These are the basic units of pulsar Few components of message are:
Component | Description |
payload | The data carried by the message. |
Key | Messages are optionally tagged with keys |
Topic name | The name of the topic that the message is published to. |
Message ID | The message ID of a message is assigned by bookies as soon as the message is persistently stored. |
Producer
A producer is a process that attaches to a topic and publishes messages to a Pulsar broker. These messages are processed by the broker.
There are 3 different types of access modes on topics for producing.
- Shared (Default) - More than one client can produce the message
- Exclusive - Only one client can producer
- WaitForExclusive - If there is already a client connected, the producer creation is pending until the producer gets the
Exclusive
access.
Consumer
A consumer is a process that attaches to a topic via a subscription and then receives messages.
In pulsar, there are 4 types of subscription, as shown below:
Image source: pulsar.apache.org/docs/concepts-messaging#s..
Demo
Setting up pulsar using docker-compose
version: "3"
services:
pulsar:
image: apachepulsar/pulsar:latest
command: bin/pulsar standalone
hostname: pulsar
container_name: pulsar
ports:
- "8090:8080"
- "6650:6650"
restart: always
volumes:
- "./data:/pulsar/data"
Run the docker-compose:
docker-compose up -d
To check the logs use:
docker logs -f pulsar
Requirements
requirement.txt
pulsar-client[all]==2.10.1
pip3 install -r requirements.txt
Producer
import pulsar
import json
import random
uri = 'pulsar://localhost:6650'
topic= 'persistent://public/default/sample'
client = pulsar.Client(uri)
producer = client.create_producer(topic)
for i in range(10):
data = 'message-%d' % random.randint(10,100)
print('data:',data)
producer.send(data.encode('utf-8'))
client.close()
Consumer
import pulsar
import datetime
uri = 'pulsar://localhost:6650'
topic = 'persistent://public/default/sample'
subscription_name = "topic-sub-1"
client = pulsar.Client(uri)
consumer = client.subscribe(topic,subscription_name=subscription_name)
while True:
msg = consumer.receive()
try:
print(msg.publish_timestamp(), msg.message_id())
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
Run
python producer.py
output
data: message-96
data: message-17
data: message-74
data: message-94
data: message-40
data: message-28
data: message-81
data: message-82
data: message-73
data: message-57
python consumer.py
output
Timestamp: 1668181403573, Message ID: (837,39,-1,-1), Payload: b'message-96'
Timestamp: 1668181403638, Message ID: (837,40,-1,-1), Payload: b'message-17'
Timestamp: 1668181403653, Message ID: (837,41,-1,-1), Payload: b'message-74'
Timestamp: 1668181403668, Message ID: (837,42,-1,-1), Payload: b'message-94'
Timestamp: 1668181403688, Message ID: (837,43,-1,-1), Payload: b'message-40'
Timestamp: 1668181403698, Message ID: (837,44,-1,-1), Payload: b'message-28'
Timestamp: 1668181403711, Message ID: (837,45,-1,-1), Payload: b'message-81'
Timestamp: 1668181403726, Message ID: (837,46,-1,-1), Payload: b'message-82'
Timestamp: 1668181403742, Message ID: (837,47,-1,-1), Payload: b'message-73'
Timestamp: 1668181403754, Message ID: (837,48,-1,-1), Payload: b'message-57'
Note: Run consumer before producer to see the consumed message in real-time