Skip to main content

Python Tutorial to use Solana Shreds from Kafka

This tutorial explains how to consume Solana transaction protobuf messages from Bitquery Kafka using Python, and print them with bytes fields shown in base58 (pipe-friendly stdout; logs on stderr).

Background: Kafka streaming concepts — Protobuf streams.

Runnable project: bitquery/kafka-streams-examples-usecases — folder python-consumer-example/ (consumer.py, settings.py, protobuf_print.py).

Scaling: This sample is a single process. For high throughput, add parallel partition consumption and/or worker pools behind the poll loop, following Bitquery’s Kafka guidance in Kafka streaming concepts.

Prerequisites

Install dependencies from requirements.txt (pinned for compatibility with generated protobuf code):

pip install -r requirements.txt

Typical packages (see file for exact versions):

  • confluent-kafka
  • bitquery-pb2-kafka-package (Solana ParsedIdlBlockMessage and related generated code)
  • protobuf, base58, python-dotenv

You also need Kafka username and password from Bitquery for stream access.

You need separate Kafka credentials. Please contact sales on our official telegram channel or fill out the form on our website.

1. Setup Kafka consumer configuration

Configuration is not embedded as a large literal in the tutorial source: it is built in settings.load_settings() from environment variables (after load_dotenv()).

Conceptually, the consumer uses non-TLS Bitquery brokers by default:

  • bootstrap.servers: rpk0.bitquery.io:9092,rpk1.bitquery.io:9092,rpk2.bitquery.io:9092 (overridable)
  • security.protocol: SASL_PLAINTEXT
  • sasl.mechanisms: SCRAM-SHA-512
  • enable.auto.commit: False
  • auto.offset.reset: latest or earliest (from KAFKA_AUTO_OFFSET_RESET)

Full key list: settings.py.

Environment variables

VariableRequiredNotes
KAFKA_USERNAMEYes
KAFKA_PASSWORDYes
KAFKA_TOPICNoDefault solana.transactions.proto
KAFKA_BOOTSTRAP_SERVERSNoDefault Bitquery rpk* 9092 cluster
KAFKA_GROUP_IDNoIf unset: {username}-group-{uuid}
KAFKA_AUTO_OFFSET_RESETNolatest or earliest

You need separate Kafka credentials. Please contact sales on our official telegram channel or fill out the form on our website.

2. Define / use the protobuf print helper

The runnable project implements traversal in protobuf_print.py (print_protobuf_message) instead of pasting a long snippet into the docs. It walks protobuf fields recursively and uses base58 when encoding='base58'.

Solana vs EVM bytes

  • Solana: base58 for typical addresses / signatures (this tutorial default).
  • EVM (Ethereum, BSC, Polygon, …): prefer hex (often 0x + hex).

If you switch consumers to EVM protobuf types later, align print_protobuf_message(..., encoding=...) and any convert_bytes logic with your chain—not with Solana base58 defaults.

(Implementation detail: protobuf_print.py uses field.is_repeated() where available so it stays compatible with modern protobuf runtimes pinned in requirements.txt.)

3. Process messages from Kafka

In consumer.py, process_payload parses the wire bytes:

block = parsed_idl_block_message_pb2.ParsedIdlBlockMessage()
block.ParseFromString(raw)
print_protobuf_message(block, indent=0, encoding="base58")

Adapting to another topic

You can adapt the script by changing KAFKA_TOPIC and the imported message class / ParseFromString target so the generated type matches the topic schema. Other plumbing (Kafka config, polling) can stay parallel to this sample.

4. Poll and shut down cleanly

The main loop (consumer.py) polls until SIGINT / SIGTERM, logs on stderr, and closes the consumer in finally.

Clone and run (quick reference)

You need separate Kafka credentials. Please contact sales on our official telegram channel or fill out the form on our website.

git clone https://github.com/bitquery/kafka-streams-examples-usecases.git
cd kafka-streams-examples-usecases/python-consumer-example
python3 -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -r requirements.txt
cp .env.example .env
# edit KAFKA_USERNAME, KAFKA_PASSWORD
python consumer.py

TLS (optional)

Extend the conf dict from settings.py using Bitquery’s SASL_SSL snippet (brokers 9093, PEM paths). Summary and curl for PEMs: examples repo README.md.

Troubleshooting

IssueAction
Missing env varscp .env.example .env and set credentials
KafkaException / authCredentials, topic enabled for account, outbound 9092
DecodeErrorTopic schema ≠ ParsedIdlBlockMessage
Protobuf reflection errors after upgradeKeep requirements.txt pins aligned with bitquery-pb2-kafka-package

See also