Go Example to Use Kafka Protobuf Streams for Real-time Data
This guide walks through the implementation of a Kafka consumer in Go to subscribe to a Kafka topic and process onchain data streams from Bitquery in real-time. The consumer connects to the Kafka brokers securely using SSL and handles incoming messages in Protobuf format.
The schema is available here.
The complete code is available here.
Protobuf vs JSON
To understand why Protobuf is better than JSON, you can read the comparison here
Ensure you have the following components set up before running the Go Kafka consumer:
- Bitquery Kafka Server Access: Access to Bitquery Kafka brokers.
- Username and Password: For authentication with the Kafka brokers.
- Topic name(s) to subscribe to like
- Go: Version >= 1.16.
- Confluent Kafka Go Client: Kafka client library for Go.
Key Components
- Kafka Consumer: Listens for incoming messages from a Kafka topic.
- Processor: Handles messages and processes them based on the topic.
- Configuration:
contains settings for Kafka, Consumer, and Processor. - Main Entry Point:
initializes and runs the consumer and processor.
Step by Step Code
- Kafka Config
Purpose: To setup server, login, reconnect and other configuration details
Below is the sample config setup, modify it according to your requirements.
bootstrap.servers: "rpk0.bitquery.io:9093,rpk1.bitquery.io:9093,rpk2.bitquery.io:9093"
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "<your_username_here>"
sasl.password: "<your_password_here>"
group.id: "<username_group-number>"
ssl.key.location: "ssl/client.key.pem"
ssl.certificate.location: "ssl/client.cer.pem"
ssl.ca.location: "ssl/server.cer.pem"
ssl.endpoint.identification.algorithm: "none"
enable.auto.commit: false
topic: solana.dextrades.proto
partitioned: true
buffer: 100V
workers: 8
log_level: "debug"
- Kafka Consumer
Purpose: Connects to Kafka, subscribes to a topic, and receives messages.
Types of Consumers
- SimpleConsumer: Single consumer instance
- PartitionedConsumer: Multiple consumers for high throughput
Key Functions
: Creates a Kafka consumer.waitMessages()
: Continuously reads messages.newPartitionedConsumer()
: Handles multiple Kafka partitions.
package main
import (
type Consumer interface {
waitMessages(ctx context.Context, listener Listener)
type SimpleConsumer struct {
kafkaConsumer *kafka.Consumer
type ConsumerConfig struct {
Topic string
Partitioned bool
type PartitionedConsumer struct {
kafkaConsumers []*kafka.Consumer
func newConsumer(config *Config) (Consumer, error) {
if config.Consumer.Partitioned {
return newPartitionedConsumer(config)
return newSimpleConsumer(config)
func newSimpleConsumer(config *Config) (*SimpleConsumer, error) {
kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
err = kafkaConsumer.Subscribe(config.Consumer.Topic, nil)
if err != nil {
return nil, err
return &SimpleConsumer{
}, nil
func (consumer *SimpleConsumer) close() {
func (consumer *SimpleConsumer) waitMessages(ctx context.Context, listener Listener) {
err := consumerWaitMessages(ctx, consumer.kafkaConsumer, listener)
if err != nil {
fmt.Println("error waiting messages:", err)
func consumerWaitMessages(ctx context.Context, consumer *kafka.Consumer, listener Listener) error {
fmt.Println("Running consumer " + consumer.String())
for {
select {
case <-ctx.Done():
fmt.Println("Done, exiting consumer loop")
return nil
message, err := consumer.ReadMessage(time.Second * 60)
if err != nil {
return err
func newPartitionedConsumer(config *Config) (Consumer, error) {
kafkaAdmin, err := kafka.NewAdminClient(&config.Kafka)
if err != nil {
return nil, err
defer kafkaAdmin.Close()
metadata, err := kafkaAdmin.GetMetadata(&config.Consumer.Topic, false, 10000)
if err != nil {
return nil, err
kafkaErr := metadata.Topics[config.Consumer.Topic].Error
if kafkaErr.Code() != kafka.ErrNoError {
return nil, kafkaErr
partitions := metadata.Topics[config.Consumer.Topic].Partitions
fmt.Printf("Subscribing to topic %s %d partitions: %v...\n", config.Consumer.Topic, len(partitions), partitions)
consumers := make([]*kafka.Consumer, len(partitions))
for i, partition := range partitions {
consumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
err = consumer.Assign([]kafka.TopicPartition{{
Topic: &config.Consumer.Topic,
Partition: partition.ID,
Offset: kafka.OffsetStored,
if err != nil {
return nil, err
err = consumer.Subscribe(config.Consumer.Topic, nil)
if err != nil {
return nil, err
consumers[i] = consumer
fmt.Printf("Assigned %d consumers to %s topic\n", len(consumers), config.Consumer.Topic)
return &PartitionedConsumer{
kafkaConsumers: consumers,
}, nil
func (consumer *PartitionedConsumer) close() {
for _, c := range consumer.kafkaConsumers {
err := c.Close()
if err != nil {
fmt.Println("Error closing consumer: " + err.Error())
func (consumer *PartitionedConsumer) waitMessages(ctx context.Context, listener Listener) {
var wg errgroup.Group
for _, c := range consumer.kafkaConsumers {
c := c
wg.Go(func() error {
return consumerWaitMessages(ctx, c, listener)
- Message Processor
Purpose: Processes Kafka messages using worker threads.
Key Features
- Uses multiple workers to process messages in parallel.
- Different handlers for different Protobuf topics.
- Reports statistics every 100 messages.
- Deduplication Check (isDuplicated function)
Key Functions
: Initializes message queue & workers.enqueue()
: Adds messages to the processing queue.start()
: Spins up worker goroutines to process messages.close()
: Waits for all workers to finish.isDuplicated()
: Prevents the same message from being processed multiple times
How DeDeuplication Works
- Each message is uniquely identified using
. - A Least Recently Used (LRU) cache stores recently processed messages.
- Messages are discarded if already present in the cache.
- Entries expire after 240 seconds, keeping memory usage optimized.
package main
import (
type ProcessorConfig struct {
Buffer int
Workers int
type Listener interface {
enqueue(message *kafka.Message)
type dedupCache struct {
cache *expirable.LRU[string, bool]
mu sync.Mutex
type processFn func(context.Context, *kafka.Message, int, *dedupCache) error
type Processor struct {
queue chan (*kafka.Message)
wg errgroup.Group
config ProcessorConfig
processFn processFn
stat *Statistics
dedup *dedupCache
func newProcessor(config *Config) (*Processor, error) {
processor := &Processor{
queue: make(chan *kafka.Message, config.Processor.Buffer),
config: config.Processor,
stat: newStatistics(),
dedup: &dedupCache{
cache: expirable.NewLRU[string, bool](100000, nil,
var processFn processFn
switch config.Consumer.Topic {
case "solana.dextrades.proto":
processFn = processor.dexTradesMessageHandler
case "solana.transactions.proto":
processFn = processor.transactionsMessageHandler
case "solana.tokens.proto":
processFn = processor.tokensMessageHandler
processFn = processor.jsonMessageHandler
processor.processFn = processFn
return processor, nil
func (processor *Processor) enqueue(message *kafka.Message) {
processor.queue <- message
func (processor *Processor) start(ctx context.Context) {
counter := 0
for i := 0; i < processor.config.Workers; i++ {
processor.wg.Go(func() error {
i := i
fmt.Println("Starting worker ", i)
for {
select {
case <-ctx.Done():
fmt.Println("Done, exiting processor loop worker ", i)
return nil
case message := <-processor.queue:
err := processor.processFn(ctx, message, i, processor.dedup)
if err != nil {
fmt.Println("Error processing message", err)
if counter%100 == 0 {
return nil
func (processor *Processor) close() {
fmt.Println("Shutting down processor...")
fmt.Println("Processor stopped")
func (dedup *dedupCache) isDuplicated(slot uint64, index uint32) bool {
key := fmt.Sprintf("%d-%d", slot, index)
defer dedup.mu.Unlock()
if dedup.cache.Contains(key) {
return true
dedup.cache.Add(key, true)
return false
- Entry Point
Purpose: Loads configuration, initializes the consumer & processor, and starts them.
Key Steps
- Read configuration (
) - Initialize Kafka consumer (
) - Initialize message processor (
) - Start processing messages
- Handle graceful shutdown (Ctrl+C)
package main
import (
type Config struct {
Kafka kafka.ConfigMap
Consumer ConsumerConfig
Processor ProcessorConfig
func main() {
file, err := os.Open("config.yml")
if err != nil {
panic(fmt.Errorf("error opening config file: %v, copy original file config_example.yml to config.yml and edit it", err))
defer file.Close()
bytes, err := io.ReadAll(file)
if err != nil {
var config Config
err = yaml.Unmarshal(bytes, &config)
if err != nil {
consumer, err := newConsumer(&config)
if err != nil {
defer consumer.close()
processor, err := newProcessor(&config)
if err != nil {
defer processor.close()
ctx, cancel := context.WithCancel(context.Background())
fmt.Println("press Ctrl-C to exit")
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
go func() {
select {
case <-signalCh:
fmt.Println("received Ctrl-C, finishing jobs...")
consumer.waitMessages(ctx, processor)
Running the Application
After setting up the config file, start the consumer & processor: