Skip to main content

Python Tutorial to Consume and Traverse Solana Protobuf Messages from Kafka

This tutorial explains how to consume Solana transaction messages in protobuf format from Kafka using Python, and print them efficiently with decoded bytes fields in base58 format.

You can read more about Bitquery Protobuf Streams here:
Bitquery Kafka Streaming Concepts - Protobuf Streams.

Prerequisites

Before you begin, install the required Python package that includes the compiled .pb2 files:

pip install bitquery-pb2-kafka-package

You’ll also need your Kafka username/password provided by the Bitquery team.

1. Setup Kafka Consumer Configuration

import uuid
import base58
from confluent_kafka import Consumer, KafkaError, KafkaException
from google.protobuf.message import DecodeError
from google.protobuf.descriptor import FieldDescriptor
from solana import parsed_idl_block_message_pb2 # Bitquery's compiled protobuf schema

Generate a unique Kafka consumer group ID:

group_id_suffix = uuid.uuid4().hex

Define your Kafka config:

You can use either SSL:

'bootstrap.servers': 'rpk0.bitquery.io:9093,rpk1.bitquery.io:9093,rpk2.bitquery.io:9093',
'security.protocol': 'SASL_SSL',

Or non-SSL:

'bootstrap.servers': 'rpk0.bitquery.io:9092,rpk1.bitquery.io:9092,rpk2.bitquery.io:9092',
'security.protocol': 'SASL_PLAINTEXT',

Full example:

conf = {
'bootstrap.servers': 'rpk0.bitquery.io:9092,rpk1.bitquery.io:9092,rpk2.bitquery.io:9092',
'group.id': f'username-group-{group_id_suffix}',
'session.timeout.ms': 30000,
'security.protocol': 'SASL_PLAINTEXT',
'ssl.endpoint.identification.algorithm': 'none',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
'auto.offset.reset': 'latest',
}

consumer = Consumer(conf)
topic = 'solana.transactions.proto'
consumer.subscribe([topic])

2. Define a Protobuf Traversal Print Function

This function recursively walks through any protobuf message and prints all its fields, converting bytes to base58.

def convert_bytes(value, encoding='base58'):
if encoding == 'base58':
return base58.b58encode(value).decode()
return value.hex()

def print_protobuf_message(msg, indent=0, encoding='base58'):
prefix = ' ' * indent
for field in msg.DESCRIPTOR.fields:
value = getattr(msg, field.name)

if field.label == FieldDescriptor.LABEL_REPEATED: # The field is a repeated (i.e. array/list) field.
if not value:
continue
print(f"{prefix}{field.name} (repeated):")
for idx, item in enumerate(value):
if field.type == FieldDescriptor.TYPE_MESSAGE:
print(f"{prefix} [{idx}]:")
print_protobuf_message(item, indent + 4, encoding)
elif field.type == FieldDescriptor.TYPE_BYTES:
print(f"{prefix} [{idx}]: {convert_bytes(item, encoding)}")
else:
print(f"{prefix} [{idx}]: {item}")

elif field.type == FieldDescriptor.TYPE_MESSAGE: # The field is a nested protobuf message.
if msg.HasField(field.name):
print(f"{prefix}{field.name}:")
print_protobuf_message(value, indent + 4, encoding)

elif field.type == FieldDescriptor.TYPE_BYTES:
print(f"{prefix}{field.name}: {convert_bytes(value, encoding)}")

elif field.containing_oneof:
if msg.WhichOneof(field.containing_oneof.name) == field.name:
print(f"{prefix}{field.name} (oneof): {value}")

else:
print(f"{prefix}{field.name}: {value}")


3. Process Messages From Kafka

This function decodes the raw Protobuf message and passes it to our traversal printer.

def process_message(message):
try:
buffer = message.value()

tx_block = parsed_idl_block_message_pb2.ParsedIdlBlockMessage()
tx_block.ParseFromString(buffer)

print("\nNew ParsedIdlBlockMessage received:\n")
print_protobuf_message(tx_block, encoding='base58')

except DecodeError as err:
print(f"Protobuf decoding error: {err}")
except Exception as err:
print(f"Error processing message: {err}")

Note

You can easily adapt this script to any topic and Protobuf message type — simply change the topic name and the corresponding message class in process_message(). The rest of the logic remains the same!

For example, if you're consuming a different topic with a different .proto schema:

# Change the topic name
topic = 'other_topic.proto'

# Change the message class in process_message
your_message = your_other_pb2.YourOtherMessage()
your_message.ParseFromString(buffer)

4. Poll and Print Messages in Real Time

This is the main loop for consuming Kafka messages and printing parsed protobuf content.

try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
process_message(msg)

except KeyboardInterrupt:
print("Stopping consumer...")

finally:
consumer.close()