Skip to main content

Setting up Google Pub-Sub

This tutorial walks through a process of subscribing to the Bitquery Streaming API and publishing its data to a Google Cloud Pub/Sub topic.

1. Set Up Google Cloud Pub/Sub

  1. Create a Pub/Sub Topic:

    • Go to the Google Cloud Console.
    • Navigate to Pub/Sub > Topics.
    • Create a new topic named bitquery-data-stream( this is an example).

  1. Create a Subscription:

    • Click on the topic and create a subscription (e.g., test1d).
    • Choose Pull or Push, depending on your architecture.
  1. Service Account Configuration:
    • Create a service account with the role Pub/Sub Publisher.
    • Download the service account key as key.json.

2. Writing Code - Install Required Libraries

Now we will setup the code to publish messages to the subscribers. Install the Python libraries needed for WebSocket communication and Pub/Sub interaction:

pip install websockets google-cloud-pubsub

3. Write the Script

Imports and Setup

Set up the necessary imports and environment variables:

import asyncio
import json
import websockets
from google.cloud import pubsub_v1
import os

# Set Google Application Credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "key.json"

Bitquery WebSocket API Details

Configure the WebSocket URL and the query to fetch Pumpfun DEX trades. To learn how to generate a token to use with the url, go here

url = "wss://streaming.bitquery.io/eap?token=<YOUR_TOKEN>"
query = """
subscription MyQuery {
Solana {
DEXTrades(
where: {
Trade: { Dex: { ProtocolName: { is: "pump" } } }
Transaction: { Result: { Success: true } }
}
) {
Trade {
Dex {
ProtocolFamily
ProtocolName
}
Buy {
Amount
Account {
Address
}
}
Sell {
Amount
Account {
Address
}
}
}
Transaction {
Signature
}
}
}
}
"""

Google Pub/Sub Configuration

Set up the Pub/Sub publisher:

project_id = "your project id"
topic_id = "bitquery-data-stream"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

Fetch and Publish Function

Handle WebSocket communication and Pub/Sub publishing:

async def fetch_and_publish():
async with websockets.connect(url, subprotocols=["graphql-ws"]) as websocket:
# Step 1: Initialize connection
await websocket.send(json.dumps({"type": "connection_init"}))

# Wait for connection acknowledgment
while True:
response = await websocket.recv()
response_data = json.loads(response)
if response_data.get("type") == "connection_ack":
print("Connection acknowledged.")
break

# Step 2: Send subscription query
await websocket.send(json.dumps({"type": "start", "id": "1", "payload": {"query": query}}))

# Step 3: Listen and publish messages to Pub/Sub
while True:
response = await websocket.recv()
data = json.loads(response)

# Process pumpfun data
if data.get("type") == "data" and "payload" in data:
trades = data['payload']['data'].get('Solana', {}).get('DEXTrades', [])

for trade in trades:
message = {
"protocol_family": trade['Trade']['Dex']['ProtocolFamily'],
"protocol_name": trade['Trade']['Dex']['ProtocolName'],
"buy_amount": trade['Trade']['Buy']['Amount'],
"buy_account": trade['Trade']['Buy']['Account']['Address'],
"sell_amount": trade['Trade']['Sell']['Amount'],
"sell_account": trade['Trade']['Sell']['Account']['Address'],
"transaction_signature": trade['Transaction']['Signature']
}
await publish_to_pubsub(message)

Publish to Pub/Sub

Create a function to publish messages to Pub/Sub:

async def publish_to_pubsub(message):
print(f"Publishing message: {message}")
future = publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
future.result() # Wait for the message to be successfully published
print("Message published.")

Main Function

Handle errors and run the WebSocket fetch:

async def main():
try:
await fetch_and_publish()
except Exception as e:
print(f"Error occurred: {e}")

# Run the main function
asyncio.run(main())

4. Part 2 - Setup Data Write to Bigquery

Next, we will create a subscriber to write this data to a Bigquery Table

5. Architecture Overview

  • WebSocket Client: Fetches live data from Bitquery.
  • Google Pub/Sub: Acts as a message bus for downstream consumers.
  • Downstream Processing: Consume and process data from Pub/Sub using Bigquery, Dataflow, or other analytics tools.

6. Debugging Tips

  • Use print() statements or logging to debug errors.
  • Ensure your WebSocket token and Pub/Sub credentials are valid.
  • Test Pub/Sub message flow using the Pub/Sub console.