Skip to main content

How to efficiently track millions of Solana Wallets Balance with BQ Kafka Streams

In the high-stakes world of cryptocurrency, real-time wallet balance data isn't just a nice-to-have — it's mission-critical infrastructure. Whether you're running an exchange processing customer deposits, a DeFi protocol managing liquidations, or an analytics platform tracking whale movements, being able to monitor millions of wallet balances in real-time can be the difference between success and failure. This article showcases an implementation for monitoring Solana wallet balances using Bitquery's Kafka Streams, with code that demonstrates the core concepts needed to track wallet balances at scale. To get basic understanding of Kafka Streams, read Kafka Streaming Concepts page.

Who Needs Real-Time Wallet Tracking?

  • Exchanges: Detect deposits, monitor wallets, verify withdrawals

  • DeFi: Track collateral, trigger liquidations, verify transactions

  • Wallets: Show live balances, send alerts, log history

  • Analytics: Spot whale moves, analyze trends, assess risks

note

The code presented here is a proof-of-concept intended to demonstrate core concepts. Production implementations for exchanges and financial institutions will require additional engineering as briefly outlined later in this article.

How is Balance Calculated in Solana?

Solana handles balances in a unique way that's important to understand for accurate tracking:

What is PostBalance?

In Solana transaction data, each account involvement includes two important balance values:

  • PreBalance: The account's balance before the transaction execution
  • PostBalance: The account's balance after the transaction execution

These are in the smallest token units (lamports for SOL, where 1 SOL = 1,000,000,000 lamports).

How We Use PostBalance

We use PostBalance because it's:

  1. The final account state after all operations
  2. Blockchain-verified (more reliable than manual calculations)

Our implementation:

# Extract PostBalance from Kafka stream
post_balance = balance_update.PostBalance

# Convert to human-readable format
human_balance = raw_balance / (10 ** decimals)

This ensures accurate balance tracking that matches wallet and explorer displays.

Quick Start Guide

GitHub Repository: https://github.com/akshat-cs/solana-wallet-tracker

Prerequisites

  1. Python
  2. Access to Bitquery Kafka Streams (reach out to the Bitquery team on Telegram for credentials) is a completely free trial

Installation & Setup

  1. Clone the repository:
git clone https://github.com/akshat-cs/solana-wallet-tracker.git
cd solana-wallet-tracker
  1. Install dependencies:
pip install confluent-kafka protobuf base58 bitquery-pb2-kafka-package python-dotenv
  1. Configure your credentials:

Set these variables in a newly created .env file with the credentials you got from BQ support TG channel.

# Kafka credentials
KAFKA_USERNAME = <username>
KAFKA_PASSWORD = <password>
  1. Run the wallet tracker:
python wallet_balance_extractor.py

Understanding the Kafka Stream Data

Before diving into the implementation, let's look at what the actual data from the Bitquery Kafka stream looks like. This will help you understand the rich information available to work with:

Full Message Details:
Header:
Slot: 333862663
Hash:
ParentSlot: 0
Height: 0
Timestamp: 0
ParentHash:
Transactions (repeated):
[0]:
Index: 190
Signature: 5SLabhmtAh8Bynx4Ee9aPUd8g7pWSBCMSvEoWJaKN9aGRzpxgUS83oqyvPYje4rHLBjH2h6tCttNFC6RmsMyLFMM
Status:
Success: False
ErrorMessage: Error processing Instruction 2: custom program error: 0x1771
Header:
Fee: 5625
FeePayer: CtxJeBMW3kGg3yG4MPacVxFNp76pieVrWXVe86VEcrfP
RecentBlockhash: EB3zVPGyV3ia6sDVxi2MHvu2B9KVGRuzUm2biggwBnbw
Signer: CtxJeBMW3kGg3yG4MPacVxFNp76pieVrWXVe86VEcrfP
Transfers (repeated):
[0]:
InstructionIndex: 4
Amount: 1691000000
Sender:
Address: AkG8tvC27HbMLunku4GLSEXbmmTwgX1nDEGoBjyHTJFJ
IsSigner: False
IsWritable: True
Token:
Mint: So11111111111111111111111111111111111111112
Owner: CtxJeBMW3kGg3yG4MPacVxFNp76pieVrWXVe86VEcrfP
Decimals: 9
ProgramId: TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA
Receiver:
Address: DrVWWu8y13x5ZdSX7JTNH3XJJwptzh8xJ8nUphJZxY94
IsSigner: False
IsWritable: True
Token:
Mint: So11111111111111111111111111111111111111112
Owner: 8wJymmvXgo7eK7kGgvwU3GaKaTN9nvZV2S36WC9gbrq4
Decimals: 9
ProgramId: TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA
Currency:
Name: Wrapped Solana
Decimals: 9
Uri:
Symbol: WSOL
Native: False
Wrapped: True
Fungible: True
MintAddress: So11111111111111111111111111111111111111112
TokenBalanceUpdates (repeated):
[0]:
PreBalance: 6012783083
PostBalance: 4321783083
AccountIndex: 5
[1]:
PreBalance: 1172836026228
PostBalance: 1174527026228
AccountIndex: 7
BalanceUpdates (repeated):
[0]:
BalanceUpdate:
PreBalance: 120566794
PostBalance: 120561169
AccountIndex: 0
Currency:
Name: Solana
Decimals: 9
Symbol: SOL
Native: True
Wrapped: False
Fungible: True
MintAddress: 11111111111111111111111111111111

This is just a small fraction of a single message - the actual messages can be several thousand lines long, containing detailed information about all token transfers, balance updates, and transaction details in a block.

Complete Code Walkthrough

Let's examine the entire implementation in detail:

Imports

import uuid
import base58
import json
import os
from dotenv import load_dotenv
import time
from datetime import datetime
from confluent_kafka import Consumer, KafkaError, KafkaException
from google.protobuf.message import DecodeError
from solana import token_block_message_pb2

Class Structure: IndexedWalletTracker

The IndexedWalletTracker class is the heart of our implementation. It manages the in-memory wallet index and processes Kafka messages to update balances.

Initialization

def __init__(self):
# Create output directory
self.output_dir = "wallet_balances"
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)

# Timestamp for file naming
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# In-memory index of wallets: {address: {token: {balance, decimals, symbol, last_updated}}}
self.wallet_index = {}

# Token metadata cache: {token_mint: {symbol, decimals, name}}
self.token_metadata = {}

# Statistics tracking
self.stats = {
'messages_processed': 0,
'balance_updates_received': 0,
'unique_addresses': 0,
'unique_tokens': 0,
'start_time': time.time(),
'last_export_time': time.time()
}

# Export settings
self.export_interval = 30 # seconds between exports

This initializes:

  • A directory for storing exported data
  • The in-memory wallet index (nested dictionaries for fast lookups)
  • A token metadata cache (to avoid redundant processing)
  • Statistics tracking
  • Export interval settings

Utility Functions

def convert_bytes(self, value):
"""Convert bytes to base58 string"""
if isinstance(value, bytes):
return base58.b58encode(value).decode()
return str(value)

This simple utility converts byte values (like addresses) to Solana's base58 format.

def calculate_human_balance(self, raw_balance, decimals):
"""Calculate human-readable balance based on token decimals"""
if decimals > 0:
return raw_balance / (10 ** decimals)
return raw_balance

This function converts raw token amounts (like 1000000000 lamports) to human-readable values (1 SOL) based on the token's decimal places.

Extracting Data from Messages

def extract_token_metadata(self, currency):
"""Extract token metadata from Currency object"""
token_mint = "UNKNOWN"
token_symbol = "UNKNOWN"
token_decimals = 0
token_name = "UNKNOWN"

if hasattr(currency, 'MintAddress') and currency.MintAddress:
token_mint = self.convert_bytes(currency.MintAddress)

if hasattr(currency, 'Symbol') and currency.Symbol:
token_symbol = currency.Symbol

if hasattr(currency, 'Decimals'):
token_decimals = currency.Decimals

if hasattr(currency, 'Name') and currency.Name:
token_name = currency.Name
else:
token_name = token_symbol

# Update token metadata cache
if token_mint != "UNKNOWN":
self.token_metadata[token_mint] = {
"symbol": token_symbol,
"decimals": token_decimals,
"name": token_name
}

# Update unique tokens count
if len(self.token_metadata) > self.stats['unique_tokens']:
self.stats['unique_tokens'] = len(self.token_metadata)

return {
"mint": token_mint,
"symbol": token_symbol,
"decimals": token_decimals,
"name": token_name
}

This function extracts token metadata (mint address, symbol, decimals, name) from the Currency objects in the Kafka messages. It also updates the token metadata cache for future reference.

def extract_address(self, tx, account_index):
"""Extract address from transaction using account index"""
address = None

# Try Header.Accounts
if hasattr(tx, 'Header') and hasattr(tx.Header, 'Accounts'):
accounts = tx.Header.Accounts
if account_index < len(accounts):
account = accounts[account_index]
if hasattr(account, 'Address'):
address = self.convert_bytes(account.Address)

# Try Accounts directly
if address is None and hasattr(tx, 'Accounts'):
accounts = tx.Accounts
if account_index < len(accounts):
account = accounts[account_index]
if hasattr(account, 'Address'):
address = self.convert_bytes(account.Address)

return address

This function extracts a wallet address from a transaction given an account index. It tries multiple locations because the Solana message structure can vary.

Updating Wallet Balances

def update_wallet_balance(self, address, token_mint, token_info, raw_balance):
"""Update the wallet balance index with the latest balance"""
if address == "UNKNOWN" or address is None:
return False # Skip unknown addresses

# Initialize address in index if needed
if address not in self.wallet_index:
self.wallet_index[address] = {}

# Update unique addresses count
self.stats['unique_addresses'] = len(self.wallet_index)

# Calculate human-readable balance
human_balance = self.calculate_human_balance(raw_balance, token_info['decimals'])

# Update token balance for this address
self.wallet_index[address][token_mint] = {
'raw_balance': raw_balance,
'human_balance': human_balance,
'symbol': token_info['symbol'],
'decimals': token_info['decimals'],
'last_updated': int(time.time())
}

return True

This is the core function that updates our in-memory index with the latest wallet balance. It also calculates a human-readable balance value and tracks when the balance was last updated.

def process_balance_update(self, address, token_info, raw_balance):
"""Process a balance update and update the index"""
if address and token_info['mint'] != "UNKNOWN":
success = self.update_wallet_balance(
address,
token_info['mint'],
token_info,
raw_balance
)

if success:
self.stats['balance_updates_received'] += 1
return True

return False

This is a wrapper around update_wallet_balance that also updates our statistics tracking.

Exporting Balances

def export_balances(self, force=False):
"""Export current balances to file if interval has passed or forced"""
current_time = time.time()
elapsed = current_time - self.stats['last_export_time']

if force or elapsed >= self.export_interval:
# Create export file with timestamp
export_file = os.path.join(self.output_dir, f"balances_{self.timestamp}_latest.json")

# Prepare export data
export_data = {
'timestamp': datetime.now().isoformat(),
'stats': self.stats.copy(),
'wallets': self.wallet_index
}

# Add elapsed time and rate stats
total_elapsed = current_time - self.stats['start_time']
export_data['stats']['elapsed_seconds'] = total_elapsed
export_data['stats']['updates_per_second'] = (
self.stats['balance_updates_received'] / total_elapsed
if total_elapsed > 0 else 0
)

# Write to file
with open(export_file, 'w') as f:
json.dump(export_data, f, indent=2)

# Also create a CSV version for easy viewing
csv_file = os.path.join(self.output_dir, f"balances_{self.timestamp}_latest.csv")
with open(csv_file, 'w') as f:
# Write header
f.write("Address,Token,Symbol,HumanReadableBalance,RawBalance,Decimals,LastUpdated\n")

# Write each wallet balance
for address, tokens in self.wallet_index.items():
for token_mint, data in tokens.items():
f.write(f"{address},{token_mint},{data['symbol']}," +
f"{data['human_balance']},{data['raw_balance']}," +
f"{data['decimals']},{data['last_updated']}\n")

# Update last export time
self.stats['last_export_time'] = current_time

print(f"Exported {len(self.wallet_index)} wallets with " +
f"{self.stats['balance_updates_received']} balance records to {export_file}")
print(f"CSV export available at: {csv_file}")

return True

return False

This function exports our in-memory wallet balances to both JSON and CSV files. It only exports if the export interval has passed or if forced (e.g., at shutdown).

Processing Messages

def process_message(self, token_block):
"""Process a token block message"""
self.stats['messages_processed'] += 1

# Process balance updates at block level
if hasattr(token_block, 'BalanceUpdates'):
for update in token_block.BalanceUpdates:
if hasattr(update, 'BalanceUpdate') and hasattr(update, 'Currency'):
balance_update = update.BalanceUpdate
currency = update.Currency

# Extract token metadata
token_info = self.extract_token_metadata(currency)

# Extract balance and account
if hasattr(balance_update, 'PostBalance') and hasattr(balance_update, 'AccountIndex'):
post_balance = balance_update.PostBalance
account_index = balance_update.AccountIndex

# For block level updates, we often can't resolve the address
# But we can try to look in transactions
address = None
if hasattr(token_block, 'Transactions'):
for tx in token_block.Transactions:
addr = self.extract_address(tx, account_index)
if addr:
address = addr
break

if address:
self.process_balance_update(address, token_info, post_balance)

This is the first part of the process_message method, which handles block-level balance updates. The method is quite long, as it needs to handle multiple data sources within each message.

   # Process transactions
if hasattr(token_block, 'Transactions'):
for tx in token_block.Transactions:
# Process transfers - these usually have the best address information
if hasattr(tx, 'Transfers'):
for transfer in tx.Transfers:
# Extract token metadata
token_info = {"mint": "UNKNOWN", "symbol": "UNKNOWN", "decimals": 0, "name": "UNKNOWN"}
if hasattr(transfer, 'Currency'):
token_info = self.extract_token_metadata(transfer.Currency)

# Get sender and receiver addresses
sender_address = None
if hasattr(transfer, 'Sender') and hasattr(transfer.Sender, 'Address'):
sender_address = self.convert_bytes(transfer.Sender.Address)

receiver_address = None
if hasattr(transfer, 'Receiver') and hasattr(transfer.Receiver, 'Address'):
receiver_address = self.convert_bytes(transfer.Receiver.Address)

# Process balance updates in instruction
if hasattr(transfer, 'Instruction') and hasattr(transfer.Instruction, 'TokenBalanceUpdates'):
for balance_update in transfer.Instruction.TokenBalanceUpdates:
if hasattr(balance_update, 'PostBalance') and hasattr(balance_update, 'AccountIndex'):
post_balance = balance_update.PostBalance
account_index = balance_update.AccountIndex

# Determine address based on account index
address = None
if account_index == 0 and sender_address:
address = sender_address
elif account_index == 2 and receiver_address:
address = receiver_address

if address:
self.process_balance_update(address, token_info, post_balance)

The next segment processes transfers, which provide the most accurate and direct information about wallet balance changes. It extracts the token metadata, sender and receiver addresses, and processes the balance updates for each.

           # Process transaction-level balance updates
if hasattr(tx, 'BalanceUpdates'):
for update in tx.BalanceUpdates:
if hasattr(update, 'BalanceUpdate') and hasattr(update, 'Currency'):
balance_update = update.BalanceUpdate
currency = update.Currency

# Extract token metadata
token_info = self.extract_token_metadata(currency)

# Extract balance and account
if hasattr(balance_update, 'PostBalance') and hasattr(balance_update, 'AccountIndex'):
post_balance = balance_update.PostBalance
account_index = balance_update.AccountIndex

# Get address from transaction
address = self.extract_address(tx, account_index)

if address:
self.process_balance_update(address, token_info, post_balance)

This segment processes transaction-level balance updates, which provide additional balance information that might not be captured in transfers.

           # Process token balance updates
if hasattr(tx, 'TokenBalanceUpdates'):
for update in tx.TokenBalanceUpdates:
if hasattr(update, 'PostBalance') and hasattr(update, 'AccountIndex'):
post_balance = update.PostBalance
account_index = update.AccountIndex

# Get address from transaction
address = self.extract_address(tx, account_index)

# Get token info from account
token_info = {"mint": "UNKNOWN", "symbol": "UNKNOWN", "decimals": 0, "name": "UNKNOWN"}

if hasattr(tx, 'Accounts') or (hasattr(tx, 'Header') and hasattr(tx.Header, 'Accounts')):
accounts = tx.Accounts if hasattr(tx, 'Accounts') else tx.Header.Accounts
if account_index < len(accounts):
account = accounts[account_index]
if hasattr(account, 'Token'):
token = account.Token
if hasattr(token, 'Mint'):
token_info["mint"] = self.convert_bytes(token.Mint)
if hasattr(token, 'Decimals'):
token_info["decimals"] = token.Decimals

# Use token metadata from cache if available
if token_info["mint"] != "UNKNOWN" and token_info["mint"] in self.token_metadata:
cached_data = self.token_metadata[token_info["mint"]]
token_info["symbol"] = cached_data["symbol"]
token_info["name"] = cached_data["name"]

if address:
self.process_balance_update(address, token_info, post_balance)

Finally, the method processes token balance updates, which provide yet another source of balance information. This multi-layered approach ensures we capture all possible balance changes.

   # Export balances if it's time
self.export_balances()

# Periodically print stats
if self.stats['messages_processed'] % 10 == 0:
self.print_stats()

After processing all the balance updates, the method checks if it's time to export the balances and prints statistics periodically.

Statistics Reporting

   def print_stats(self):
"""Print tracker statistics"""
elapsed = time.time() - self.stats['start_time']
updates_per_sec = (
self.stats['balance_updates_received'] / elapsed
if elapsed > 0 else 0
)

print(f"\n--- Wallet Balance Tracker Stats ---")
print(f"Runtime: {elapsed:.2f} seconds")
print(f"Messages processed: {self.stats['messages_processed']}")
print(f"Balance updates received: {self.stats['balance_updates_received']} ({updates_per_sec:.2f}/sec)")
print(f"Unique addresses tracked: {self.stats['unique_addresses']}")
print(f"Unique tokens tracked: {self.stats['unique_tokens']}")

This method prints statistics about the tracker's performance, including runtime, messages processed, balance updates received, and the number of unique addresses and tokens tracked.

Main Consumer Function

def run_consumer():
"""Run the Kafka consumer with the indexed wallet tracker"""
# Load environment variables from .env file
load_dotenv()

# Get credentials from environment variables
kafka_username = os.getenv("KAFKA_USERNAME")
kafka_password = os.getenv("KAFKA_PASSWORD")
# Kafka configuration
group_id_suffix = uuid.uuid4().hex
conf = {
'bootstrap.servers': 'rpk0.bitquery.io:9092,rpk1.bitquery.io:9092,rpk2.bitquery.io:9092',
'group.id': f'{kafka_username}-group-{group_id_suffix}',
'session.timeout.ms': 30000,
'security.protocol': 'SASL_PLAINTEXT',
'ssl.endpoint.identification.algorithm': 'none',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': kafka_username,
'sasl.password': kafka_password,
'auto.offset.reset': 'latest',
}

# Initialize consumer
consumer = Consumer(conf)
topic = 'solana.tokens.proto'
consumer.subscribe([topic])

# Initialize wallet tracker
tracker = IndexedWalletTracker()

print(f"Starting indexed wallet balance tracker on topic: {topic}")
print("Press Ctrl+C to stop...")

This function initializes the Kafka consumer with the appropriate configuration and sets up the wallet tracker.

  try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())

try:
# Parse the message
buffer = msg.value()
token_block = token_block_message_pb2.TokenBlockMessage()
token_block.ParseFromString(buffer)

# Process the message
tracker.process_message(token_block)

except DecodeError as err:
print(f"Protobuf decoding error: {err}")
except Exception as err:
print(f"Error processing message: {err}")
import traceback
traceback.print_exc()

The main processing loop polls for messages from Kafka, parses them using the protobuf definition, and passes them to the tracker for processing. It includes error handling to catch and report any issues.

   except KeyboardInterrupt:
print("\nStopping wallet balance tracker...")

finally:
# Export final balances
tracker.export_balances(force=True)
consumer.close()
print("Consumer closed.")

The function concludes with cleanup code that exports the final balances and closes the consumer when the program is stopped.

Data Flow Through the System

To understand how data flows through this system:

  1. Kafka Consumer retrieves messages from Bitquery's Solana token stream
  2. Each message is parsed into a protobuf TokenBlockMessage object
  3. The process_message method extracts:
    • Block-level balance updates
    • Transfers and their associated balance updates
    • Transaction-level balance updates
    • Token balance updates
  4. For each balance update, the system:
    • Extracts token metadata (mint, symbol, decimals)
    • Finds the associated wallet address
    • Updates the in-memory wallet index with the latest balance
  5. Periodically, the system exports the in-memory balances to JSON and CSV files
  6. Statistics are printed to the console to monitor performance

Scaling for Production Use

While this proof-of-concept demonstrates the core concepts, a production implementation would require several additional components:

  • Database integration: Replace file exports with a proper database system
  • Distributed processing: Run multiple consumer instances with the same group ID
  • Caching strategy: Implement memory management for less active wallets
  • Error recovery: Add robust error handling and consumer position checkpointing
  • Monitoring and alerting: Add systems to detect processing lags or failures
  • API layer: Create endpoints for querying balance data

For exchanges, custodial services, and other financial institutions, additional considerations around security, high availability, and compliance would also be necessary.

Conclusion: Beyond RPC Polling

Traditional RPC-based balance monitoring simply can't keep up with the demands of modern blockchain applications. By leveraging Bitquery's Kafka Streams, you can:

  • Reduce infrastructure costs by up to 90% compared to RPC polling
  • Improve detection latency from seconds to milliseconds
  • Scale to millions of wallets without performance degradation
  • Achieve 100% accuracy with no missed transactions

Ready to explore how Bitquery's Kafka Streams could transform your wallet monitoring capabilities? Clone the repository for a starting point and reach out to the Bitquery team on Telegram to discuss your specific requirements and get access to the streams.


This article provides a technical overview and implementation guidance. The code presented is a proof-of-concept that demonstrates core principles but would require additional engineering for production use. Always conduct thorough testing and security reviews before deploying systems handling financial data.