Getting started with Apache Pulsar using Python

Getting started with Apache Pulsar using Python

·

3 min read

What is Pulsar?

Apache Pulsar is a cloud-native, multi-tenant, high-performance solution for server-to-server messaging and queuing built on the publisher-subscribe (pub-sub) pattern.

3 Concepts

Messages

These are the basic units of pulsar Few components of message are:

ComponentDescription
payloadThe data carried by the message.
KeyMessages are optionally tagged with keys
Topic nameThe name of the topic that the message is published to.
Message IDThe message ID of a message is assigned by bookies as soon as the message is persistently stored.

Producer

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker. These messages are processed by the broker.

There are 3 different types of access modes on topics for producing.

  1. Shared (Default) - More than one client can produce the message
  2. Exclusive - Only one client can producer
  3. WaitForExclusive - If there is already a client connected, the producer creation is pending until the producer gets the Exclusive access.

Consumer

A consumer is a process that attaches to a topic via a subscription and then receives messages.

In pulsar, there are 4 types of subscription, as shown below: image.png Image source: pulsar.apache.org/docs/concepts-messaging#s..

Demo

Setting up pulsar using docker-compose

version: "3"
services:
  pulsar:
    image: apachepulsar/pulsar:latest
    command: bin/pulsar standalone
    hostname: pulsar
    container_name: pulsar
    ports:
      - "8090:8080"
      - "6650:6650"
    restart: always
    volumes:
      - "./data:/pulsar/data"

Run the docker-compose:

docker-compose up -d

To check the logs use:

docker logs -f pulsar

Requirements

requirement.txt

pulsar-client[all]==2.10.1
pip3 install -r requirements.txt

Producer

producer.py

import pulsar
import json
import random

uri = 'pulsar://localhost:6650'
topic= 'persistent://public/default/sample'

client = pulsar.Client(uri)
producer = client.create_producer(topic)

for i in range(10):
    data = 'message-%d' % random.randint(10,100)
    print('data:',data)
    producer.send(data.encode('utf-8'))

client.close()

Consumer

consumer.py

import pulsar
import datetime

uri = 'pulsar://localhost:6650'
topic = 'persistent://public/default/sample'
subscription_name = "topic-sub-1"

client = pulsar.Client(uri)
consumer = client.subscribe(topic,subscription_name=subscription_name)

while True:
    msg = consumer.receive()
    try:
        print(msg.publish_timestamp(), msg.message_id())
        consumer.acknowledge(msg)
    except Exception:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)

    client.close()

Run

python producer.py

output


data: message-96
data: message-17
data: message-74
data: message-94
data: message-40
data: message-28
data: message-81
data: message-82
data: message-73
data: message-57
python consumer.py

output


Timestamp: 1668181403573, Message ID: (837,39,-1,-1), Payload: b'message-96'
Timestamp: 1668181403638, Message ID: (837,40,-1,-1), Payload: b'message-17'
Timestamp: 1668181403653, Message ID: (837,41,-1,-1), Payload: b'message-74'
Timestamp: 1668181403668, Message ID: (837,42,-1,-1), Payload: b'message-94'
Timestamp: 1668181403688, Message ID: (837,43,-1,-1), Payload: b'message-40'
Timestamp: 1668181403698, Message ID: (837,44,-1,-1), Payload: b'message-28'
Timestamp: 1668181403711, Message ID: (837,45,-1,-1), Payload: b'message-81'
Timestamp: 1668181403726, Message ID: (837,46,-1,-1), Payload: b'message-82'
Timestamp: 1668181403742, Message ID: (837,47,-1,-1), Payload: b'message-73'
Timestamp: 1668181403754, Message ID: (837,48,-1,-1), Payload: b'message-57'

Note: Run consumer before producer to see the consumed message in real-time