Using Bitquery Subscriptions to Load On-chain Data into S3
In this tutorial we will use Bitquery Subscription queries to fetch latest Pumpfun information on Solana upload it to an S3 Bucket.
1. Prerequisites​
Before diving into the tutorial, ensure you have:
- AWS Account: With access to an S3 bucket and all permissions configured for Write Access.
- Bitquery Account: For generating a token to access the Streaming APIs. Follow this link for token generation.
- Python Environment: With required libraries installed.
Install required libraries if not already done:
pip install asyncio websockets boto3
2. Setting Up AWS S3 Configuration​
The code initializes the AWS S3 client to upload JSON data:
import boto3
# AWS S3 configuration
s3_client = boto3.client(
's3',
aws_access_key_id='your_aws_access_key',
aws_secret_access_key='your_aws_secret_key',
region_name='your_aws_region'
)
bucket_name = 'your_s3_bucket_name'
- Replace
your_aws_access_key
,your_aws_secret_key
,your_aws_region
, andyour_s3_bucket_name
with your AWS credentials and bucket name.
3. Setting Up the WebSocket Connection​
The WebSocket connection is established with the Bitquery API:
import asyncio
import json
import websockets
# Bitquery WebSocket API details
url = "wss://streaming.bitquery.io/eap?token=your_bitquery_token"
- Replace
your_bitquery_token
with the token generated from Bitquery (guide).
4. Writing the Subscription Query​
The subscription query gets real-time Pumpfun DEX trades :
query = """
subscription MyQuery {
Solana {
DEXTrades(
where: {
Trade: { Dex: { ProtocolName: { is: "pump" } } }
Transaction: { Result: { Success: true } }
}
) {
Trade {
Dex {
ProtocolFamily
ProtocolName
}
Buy {
Amount
Account {
Address
}
}
Sell {
Amount
Account {
Address
}
}
}
Transaction {
Signature
}
}
}
}
"""
You can find more queries here: Solana Pump Fun API.
5. Fetching and Uploading Data​
The fetch_and_upload
function manages the WebSocket connection, listens for messages, and uploads them to S3.
a. Initialize Connection​
await websocket.send(json.dumps({"type": "connection_init"}))
The WebSocket connection is initialized by sending a connection_init
message. It waits for an acknowledgment (connection_ack
).
b. Send Subscription Query​
await websocket.send(json.dumps({"type": "start", "id": "1", "payload": {"query": query}}))
After acknowledgment, the subscription query is sent.
c. Listen for Messages​
while True:
response = await websocket.recv()
data = json.loads(response)
if data.get("type") == "data" and "payload" in data:
trades = data['payload']['data'].get('Solana', {}).get('DEXTrades', [])
The WebSocket listens continuously for messages and processes subscription data.
d. Upload Data to S3​
def upload_to_s3(data):
s3_key = f"data/{data['transaction_signature']}.json"
s3_client.put_object(Body=json.dumps(data), Bucket=bucket_name, Key=s3_key)
print(f"Uploaded message to S3: {s3_key}")
For each message, a JSON file is created with a unique key (transaction_signature
) and uploaded to S3.
6. Error Handling​
The code includes error handling for the WebSocket and S3 uploads:
try:
await fetch_and_upload()
except Exception as e:
print(f"Error occurred: {e}")
This ensures the program continues running even if an error occurs.
7. Running the Script​
The asyncio.run(main())
function starts the asynchronous process:
async def main():
try:
await fetch_and_upload()
except Exception as e:
print(f"Error occurred: {e}")
asyncio.run(main())
Execution Steps​
- Replace placeholder values (
your_aws_access_key
,your_bitquery_token
, etc.) with your credentials. - Save the script as
bitquery_s3_upload.py
. - Run the script:
python bitquery_s3_upload.py
- Check your S3 bucket for uploaded JSON files.