Skip to main content

Best Practices for Solana gRPC Streams

When building applications with Bitquery's Solana gRPC streams, follow these best practices to ensure your application is reliable, efficient, and handles data effectively. This guide covers filtering, connection management, error handling, and monitoring.

Filter Configuration​

Use specific filters that cover your usecase to reduce amount of data consumed. There is no limit on the number of filters you can use.

# config.yaml
server:
address: "corecast.bitquery.io"
authorization: "<your_api_token>"
insecure: false

stream:
type: "dex_trades"

filters:
# Filter by specific programs
programs:
- "9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM" # Raydium

# Filter by specific tokens
tokens:
- "So11111111111111111111111111111111111111112" # WSOL

# Filter by minimum trade amount
min_amount: 1000000 # 1 USDC (6 decimals)

Connection Management​

Automatic Reconnection​

Always implement automatic reconnection with exponential backoff to handle network interruptions:

let currentStream = null;
let reconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 10;
const INITIAL_RECONNECT_DELAY = 1000; // 1 second
const MAX_RECONNECT_DELAY = 60000; // 60 seconds

function calculateReconnectDelay(attempt) {
const delay = Math.min(
INITIAL_RECONNECT_DELAY * Math.pow(2, attempt),
MAX_RECONNECT_DELAY
);
return delay + Math.random() * 1000; // Add jitter
}

function attemptReconnection() {
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
console.error("Max reconnection attempts reached");
return;
}

reconnectAttempts++;
const delay = calculateReconnectDelay(reconnectAttempts - 1);

setTimeout(() => {
console.log(`Reconnecting... (attempt ${reconnectAttempts})`);
listenToStream();
}, delay);
}

Error Handling​

Handle different types of gRPC errors appropriately:

currentStream.on("error", (error) => {
console.error("Stream error:", error);

// Handle connection drops (code 14)
if (error.code === 14 || error.details === "Connection dropped") {
console.log("Connection dropped, attempting to reconnect...");
attemptReconnection();
} else {
console.error("Non-recoverable error:", error);
process.exit(1);
}
});

currentStream.on("end", () => {
console.log("Stream ended");
if (!isReconnecting) {
attemptReconnection();
}
});

Performance Optimization​

Efficient Message Processing​

Process messages efficiently to avoid blocking the event loop:

let messageCount = 0;
const BATCH_SIZE = 100;
let messageBatch = [];

currentStream.on("data", (message) => {
messageBatch.push(message);
messageCount++;

// Process in batches to avoid blocking
if (messageBatch.length >= BATCH_SIZE) {
processBatch(messageBatch);
messageBatch = [];
}
});

function processBatch(batch) {
// Process messages asynchronously
setImmediate(() => {
batch.forEach((message) => {
if (message.Trade) {
handleTrade(message.Trade);
} else if (message.Transaction) {
handleTransaction(message.Transaction);
}
// Handle other message types...
});
});
}

Memory Management​

Implement proper cleanup to prevent memory leaks:

function cleanupStream() {
if (currentStream) {
try {
currentStream.cancel();
} catch (error) {
// Stream might already be closed
}
currentStream = null;
}
}

// Handle graceful shutdown
process.on("SIGINT", () => {
cleanupStream();
process.exit(0);
});

Client Configuration​

Use appropriate gRPC client options for production:

const client = new solanaCorecast.CoreCast(
config.server.address,
grpc.credentials.createSsl(),
{
// Keep-alive settings
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 5000,
"grpc.keepalive_permit_without_calls": true,

// Message size limits
"grpc.max_receive_message_length": 4 * 1024 * 1024, // 4MB
"grpc.max_send_message_length": 4 * 1024 * 1024, // 4MB

// Connection management
"grpc.enable_retries": 1,
"grpc.max_connection_idle_ms": 30000,
}
);

Monitoring and Logging​

Performance Monitoring​

Track key metrics for monitoring:

let stats = {
messagesProcessed: 0,
errors: 0,
lastMessageTime: null,
startTime: Date.now(),
};

function logStats() {
const uptime = Date.now() - stats.startTime;
const rate = stats.messagesProcessed / (uptime / 1000);

console.log(
`Stats: ${stats.messagesProcessed} messages, ${rate.toFixed(2)} msg/sec`
);
}

// Log stats every 30 seconds
setInterval(logStats, 30000);

Error Tracking​

Implement comprehensive error tracking:

function trackError(error, context) {
console.error(`Error in ${context}:`, {
message: error.message,
code: error.code,
details: error.details,
timestamp: new Date().toISOString(),
});

stats.errors++;
}

Health Checks​

Implement health checks for monitoring:

function healthCheck() {
return {
status: currentStream ? "connected" : "disconnected",
reconnectAttempts: reconnectAttempts,
messagesProcessed: stats.messagesProcessed,
uptime: Date.now() - stats.startTime,
};
}

Common Pitfalls​

  1. Not handling connection drops: Always implement reconnection logic
  2. Blocking the event loop: Process messages asynchronously
  3. Memory leaks: Clean up streams and timers properly
  4. Missing error handling: Handle all gRPC error codes
  5. Inefficient filtering: Use server-side filters to reduce bandwidth
  6. No monitoring: Track key metrics for production deployments