Skip to main content

Go Example to Use Kafka Protobuf Streams for Real-time Data

This guide explains how to consume Bitquery Kafka topics from Go, receive Protocol Buffers payloads, and decode Solana blocks as ParsedIdlBlockMessage. The reference implementation you run is the minimal app in bitquery/kafka-streams-examples-usecases (go-consumer-example/): one process, .env configuration, Poll loop, stdout for decoded data and stderr for logs.

Read the platform overview in Kafka streaming concepts — Protobuf streams. .proto sources and generated Go types live under Bitquery Streaming Protobuf (Solana tree: solana/); this sample imports github.com/bitquery/streaming_protobuf/v2/solana/messages.

Default wire security: SASL (SCRAM-SHA-512) over Kafka without TLS on port 9092 (SASL_PLAINTEXT). Optional TLS is SASL_SSL on 9093 with PEM files—see SSL (SASL_SSL). The minimal example does not enable TLS until you extend kafka.ConfigMap.

Scaling: The repository consumer is intentionally small. For production throughput, use parallel partition readers, queues, and/or multiple consumer instances in the same group, per Bitquery’s partition guidance in Kafka streaming concepts. A larger Go reference (YAML, partitioned consumers, worker-style processing) remains stream_protobuf_example—a different layout than go-consumer-example.

Prerequisites

#Requirement
1Bitquery Kafka access — username and password for streams (access).
2Authorized topic — default solana.transactions.proto; your contract must include the topic you set.
3Go 1.23+ — see go.mod.
4confluent-kafka-go/v2 with CGO and system librdkafka (e.g. macOS: brew install librdkafka pkg-config; Debian/Ubuntu: librdkafka-dev, pkg-config, gcc).
5Git — to clone the examples repository.

You need separate Kafka credentials. Please contact sales on our official telegram channel or fill out the form on our website.

Key components (this repository)

PieceRole
main.goLoads .env, builds kafka.ConfigMap, Subscribe, Poll loop, proto.Unmarshal into ParsedIdlBlockMessage, prints tree to stdout, logs to stderr.
printproto.goWalks protoreflect; encodes bytes as base58 (Solana-style).
.env / .env.exampleKAFKA_USERNAME, KAFKA_PASSWORD, optional topic, bootstrap, group id, offset reset.

Step by step

1. Clone the Go example

git clone https://github.com/bitquery/kafka-streams-examples-usecases.git
cd kafka-streams-examples-usecases/go-consumer-example

2. Install modules

go mod tidy

3. Configure environment

cp .env.example .env

Set at minimum:

You need separate Kafka credentials. Please contact sales on our official telegram channel or fill out the form on our website.

KAFKA_USERNAME=your_kafka_username
KAFKA_PASSWORD=your_kafka_password

Optional (defaults match the Python and Node baselines in the same repository):

# KAFKA_TOPIC=solana.transactions.proto
# KAFKA_BOOTSTRAP_SERVERS=rpk0.bitquery.io:9092,rpk1.bitquery.io:9092,rpk2.bitquery.io:9092
# KAFKA_GROUP_ID=my-username-stable-group
# KAFKA_AUTO_OFFSET_RESET=latest

If KAFKA_GROUP_ID is omitted, the program generates {username}-group-{uuid} (see loadConfigFromEnv in main.go). Bitquery expects group.id to start with your Kafka username when you choose a stable id.

4. Run

go run .

Stop with Ctrl+C (signal.NotifyContext).

5. Configuration map (as built in code)

The following keys are set in main.go (values from env where noted):

cm := kafka.ConfigMap{
"bootstrap.servers": cfg.bootstrap,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "SCRAM-SHA-512",
"sasl.username": cfg.username,
"sasl.password": cfg.password,
"group.id": cfg.groupID,
"session.timeout.ms": 30_000,
"enable.auto.commit": false,
"ssl.endpoint.identification.algorithm": "none",
"auto.offset.reset": cfg.autoOffset,
}

Output and bytes fields

  • Stdout: decoded protobuf tree only (no partition/offset prefix).
  • Stderr: subscribe line, Kafka errors, decode errors, shutdown.

Solana vs EVM bytes

This example prints bytes as base58, which matches typical Solana address / signature style. If you point the decoder at EVM protobuf types later, adjust printproto.go (or an equivalent printer) so bytes render as hex (commonly 0x-prefixed) instead of base58.

Changing topic or message type

Updating KAFKA_TOPIC only works when the topic still decodes as ParsedIdlBlockMessage. Otherwise change the import and proto.Unmarshal target in main.go to the type that matches the topic schema (pkg.go.dev / streaming_protobuf/v2).

TLS (optional)

Follow SASL_SSL and extend kafka.ConfigMap (and broker list, usually 9093). PEM filenames and fetch commands are summarized in the examples repository README.md and kafka-consumer-example.

Troubleshooting

SymptomCheck
Consumer create / load failurelibrdkafka, CGO_ENABLED=1, pkg-config.
SASL / auth errorsCredentials, topic entitlement, reachability of 9092 (or 9093 if using TLS).
Protobuf unmarshal errorsMessage type does not match topic schema.
Little or no stdoutOffset policy (latest vs earliest) and group id; see Bitquery retention and offset docs.

See also