Python Example to Use Kafka Streams for Real-time Data


This guide explains how to implement a Python Kafka consumer to receive onchain data streams from Bitquery in real-time using the Confluent Kafka library. The consumer is secured with SSL and uses certificates for authentication, subscribing to a Kafka topic and logging messages to the console.

The complete code is available here.

Video Tutorial


Ensure that you have the following components in place before running the code:

  1. Kafka Cluster: Accessible Kafka brokers from Bitquery.
  2. Username and Password: For authentication with the Kafka brokers.
  3. Topic name(s) to subscribe to.
  4. Python: Version >= 3.7.
  5. Confluent Kafka Python Client: Kafka client library for Python.


The script relies on several dependencies, which must be installed using pip:

pip install confluent_kafka
  • confluent_kafka: A Python client for Apache Kafka.
  • ssl and pathlib: Standard Python libraries for SSL certificates and file path handling.

Kafka Client Initialization

The Kafka client is initialized using the Consumer class from the confluent_kafka library. The client is configured with SSL to authenticate communication with the Kafka brokers.

from confluent_kafka import Consumer, KafkaError, KafkaException
import ssl
from pathlib import Path

# Kafka consumer configuration
conf = {
'bootstrap.servers': ',,',
'': 'trontest1-group-1', # the group id has to start with the username
'': 30000,
'security.protocol': 'SASL_SSL',
'': 'server.cer.pem',
'ssl.key.location': 'client.key.pem',
'ssl.certificate.location': 'client.cer.pem',
'ssl.endpoint.identification.algorithm': 'none',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': 'username',
'sasl.password': 'passwrod',
'auto.offset.reset': 'latest'
  • A unique identifier for the consumer group. It has to start with the username that was shared with you, for e.g. trontest1-group-3, trontest1-group-5 etc.
  • bootstrap.servers: List of Kafka broker addresses.
  • SSL configuration: Paths to the CA, key, and certificate files are provided for SSL authentication.
  • SASL configuration: Username and password are used for secure communication.

Kafka Consumer Setup

The Kafka consumer is initialized to consume messages from a specified topic. In this case, the consumer listens to the tron.broadcasted.transactions topic.

consumer = Consumer(conf)
topic = 'tron.broadcasted.transactions'

Message Processing

A function process_message is used to handle each incoming message. It first attempts to decompress the message.

def process_message(message):
buffer = message.value()
decompressed_value = None

# Attempt to decompress frame
decompressed_value = buffer.decode('utf-8')
except Exception as err:
print(f'Decompression failed: {err}')

# Log message data
log_entry = {
'partition': message.partition(),
'offset': message.offset(),
'value': decompressed_value

except Exception as err:
print(f'Error processing message: {err}')
  • Decompression: The message is decompressed using UTF-8 decoding.
  • Logging: The partition, offset, and message content are printed to the console.

Subscribing and Polling

The consumer subscribes to the topic and polls for new messages. Messages are processed in a loop until interrupted.

# Subscribe to the topic

# Poll messages and process them
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
raise KafkaException(msg.error())

except KeyboardInterrupt:

# Close down consumer
  • Polling: The consumer polls the Kafka broker for new messages. If a message is available, it is processed by process_message().
  • Error Handling: Errors in message processing or communication with the broker are logged, and the consumer gracefully shuts down on keyboard interruption.

Execution Workflow

The following sequence of operations occurs when the script runs:

  1. Kafka Client Initialization: The Kafka client is initialized with SSL and SASL configurations.
  2. Group ID Assignment: A unique is used to ensure independent message processing.
  3. Kafka Consumer Connection: The consumer subscribes to a Kafka topic.
  4. Message Processing:
    • Polling: The consumer polls messages from Kafka, attempting to decompress them if necessary.
    • Logging: The partition, offset, and message content are logged.
  5. Graceful Shutdown: The consumer closes cleanly upon termination.