How to Backfill Data After a WebSocket Disconnection in Python
In this section, we'll discuss potential approaches to backfill data when stream disconnects. We will write code to receive live data, handles potential errors and disconnections, backfills any missing data during downtime, and ensures a graceful closure of connections.
Overview
In this tutorial, we'll build a system that:
- Subscribes to a live data stream via WebSocket.
- Handles errors and detects disconnections.
- Backfills any missing data during downtime by querying historical data.
- Ensures the WebSocket connection is gracefully closed when necessary.
We'll use Python's asyncio
for asynchronous operations, the gql
library for GraphQL interactions, and the requests
library for HTTP requests.
Prerequisites
Before diving into the implementation, ensure you have the following:
- Python 3.7 or higher installed.
- Familiarity with WebSocket and GraphQL.
- API access to the Bitquery.
Installing Required Libraries
Use pip
to install the necessary Python libraries:
pip install asyncio gql requests
System Architecture
The system's workflow can be summarized as follows:
- Live Subscription: Connect to the Bitquery Ethereum Subscription to receive real-time data.
- Error Handling: Monitor the connection for any issues or disconnections.
- Backfilling: Upon detecting a disconnection, query historical data to fill in any gaps.
- Graceful Closure: Properly close the WebSocket connection when done or when an error occurs.
Let's delve into each component in detail.
Step-by-Step Implementation
1. Live Subscription
Objective: Establish a live connection to the WebSocket API to receive real-time data.
Implementation Details:
- WebsocketsTransport: Used from the
gql
library to handle WebSocket connections. - GraphQL Subscription: Defines the query to subscribe to live data.
Code Explanation:
import asyncio
from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport
import requests
import datetime
We start by importing the necessary libraries. asyncio
facilitates asynchronous operations, gql
handles GraphQL queries and subscriptions, requests
is used for HTTP requests during backfilling, and datetime
manages time-related operations.
# Bitquery streaming URL and OAuth token
url = "https://streaming.bitquery.io/graphql"
token = "ory_at_...4"
Set up the streaming URL and your oAuth token. Replace "ory_at_...4"
with your actual Bitquery token.
Live Subscription Function:
async def subscribe():
transport = WebsocketsTransport(
url="wss://streaming.bitquery.io/graphql?token=" + token,
headers={"Sec-WebSocket-Protocol": "graphql-ws"}
)
await transport.connect()
print("connected")
try:
async for result in transport.subscribe(
gql("""
subscription MyQuery {
EVM(network: eth) {
DEXTrades {
Block {
Time
}
Transaction {
Hash
}
Trade {
Buy {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
Dex {
SmartContract
ProtocolName
ProtocolVersion
}
Sell {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
}
}
}
}
""")
):
# Processing incoming data
print("Live Data:", result)
except Exception as e:
print(f"Error during subscription: {e}")
finally:
await transport.close()
Explanation:
Establish Connection: Create a
WebsocketsTransport
instance with the WebSocket URL and necessary headers. Connect to the WebSocket usingawait transport.connect()
.Subscription: Use
transport.subscribe()
with a GraphQL subscription query to listen for live data (DEXTrades
in this case).Data Handling: As live data comes in, it's printed out. In a real-world scenario, you'd process or store this data as needed.
Error Handling: If any exception occurs during subscription, it's caught and printed.
Graceful Closure: Regardless of success or failure, the WebSocket connection is closed using
await transport.close()
.
2. Error Handling
Objective: Detect and handle any network issues or exceptions that may disrupt the WebSocket connection.
Implementation Details:
- Try-Except Blocks: Used to catch and handle exceptions during the subscription.
- Logging Errors: Errors are printed for debugging purposes.
Code Explanation:
In the subscribe
function, the try-except-finally
block ensures that any exceptions during the subscription process are caught. If an error occurs, it's printed, and the connection is closed gracefully in the finally
block.
Additionally, the main
function is designed to catch exceptions from the subscribe
function and trigger the backfilling process.
def handle_disconnection(start_time, end_time):
print(f"Connection lost. Backfilling data from {start_time} to {end_time}.")
asyncio.run(backfill_data(start_time, end_time))
The handle_disconnection
function is invoked when a disconnection is detected. It logs the disconnection and triggers the backfilling process for the time range between start_time
and end_time
.
3. Backfilling Missing Data
Objective: Retrieve any data that was missed during the disconnection period to ensure data completeness.
Implementation Details:
- Historical Data Query: In backfilling, the GraphQL query is essentially the same as the live subscription query, but instead of listening to real-time updates, we perform a query (instead of a subscription) for historical data. The query includes an additional time filter to specify the period for which we want to retrieve the missing data.
- Time Filter: The query uses a
where
clause to filter data by a specific time range. The parameterssince
andtill
are added to ensure only data from the disconnected period is queried. This allows us to target the exact time window for which we need to backfill data. - Time Range Calculation: The
start_time
(when the disconnection began) andend_time
(when the connection is restored) are used to define the missing data period. This time range is passed into the backfill query to retrieve the missing trades or data.
Code Explanation:
# Function to backfill data for a given time range
async def backfill_data(start_time, end_time):
query = """
{
EVM(network: eth) {
DEXTrades(where: {Block: {Time: {since: "%s", till: "%s"}}}) {
Block {
Time
}
Transaction {
Hash
}
Trade {
Buy {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
Dex {
SmartContract
ProtocolName
ProtocolVersion
}
Sell {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
}
}
}
}
""" % (start_time, end_time)
# Make an HTTP request to backfill missing data
headers = {
'Content-Type': 'application/json',
'Authorization': "Bearer "+token
}
response = requests.post(url, json={"query": query}, headers=headers)
if response.status_code == 200:
result = response.json()
print("Backfilled Data:", result)
else:
print("Failed to backfill data:", response.status_code, response.text)
Explanation:
GraphQL Query: The query fetches
DEXTrades
within the specifiedstart_time
andend_time
.HTTP Request: Sends the query to the Bitquery API using an HTTP POST request with the necessary headers, including the authorization token.
Response Handling: If the request is successful (
status_code == 200
), the backfilled data is printed. Otherwise, an error message with the status code and response text is displayed.
Note: In a production environment, instead of printing the data, you'd likely store it in a database or process it further.
4. Graceful Closure
Objective: Ensure that the WebSocket connection is properly closed when it's no longer needed or when an error occurs.
Implementation Details:
- Finally Block: Ensures the connection is closed regardless of success or failure.
- Explicit Closure: Uses
await transport.close()
to terminate the WebSocket connection.
Code Explanation:
Within the subscribe
function, the finally
block guarantees that the WebSocket connection is closed even if an error occurs during data streaming. This prevents resource leaks and ensures that the connection state is cleanly managed.
finally:
await transport.close()
Additionally, the handle_disconnection
function ensures that any necessary cleanup or data retrieval occurs when a disconnection is detected.
Putting It All Together
Main Function:
# Main function to run the subscription and detect disconnection
async def main():
start_time = datetime.datetime.now(datetime.timezone.utc)
try:
await subscribe()
except Exception as e:
print(f"Disconnection detected: {e}")
end_time = datetime.datetime.now(datetime.timezone.utc)
# Call backfill function to handle missing data
handle_disconnection(start_time.isoformat(), end_time.isoformat())
Explanation:
Start Time: Records the current UTC time before initiating the subscription.
Subscription: Awaits the
subscribe
function to start receiving live data.Exception Handling: If an exception occurs (indicating a disconnection), it captures the current time as
end_time
and callshandle_disconnection
to backfill data for the period betweenstart_time
andend_time
.
Running the Event Loop:
# Run the asyncio event loop
asyncio.run(main())
This line starts the asynchronous event loop, executing the main
function.
Complete Code
Here's the complete code assembled from the components discussed:
import asyncio
from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport
import requests
import datetime
# Bitquery streaming URL and API token
url = "https://streaming.bitquery.io/graphql"
token = "ory_at_...4"
# Function to backfill data for a given time range
async def backfill_data(start_time, end_time):
query = """
{
EVM(network: eth) {
DEXTrades(where: {Block: {Time: {since: "%s", till: "%s"}}}) {
Block {
Time
}
Transaction {
Hash
}
Trade {
Buy {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
Dex {
SmartContract
ProtocolName
ProtocolVersion
}
Sell {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
}
}
}
}
""" % (start_time, end_time)
# Make an HTTP request to backfill missing data
headers = {
'Content-Type': 'application/json',
'Authorization': "Bearer "+token
}
response = requests.post(url, json={"query": query}, headers=headers)
if response.status_code == 200:
result = response.json()
print("Backfilled Data:", result)
else:
print("Failed to backfill data:", response.status_code, response.text)
# Function to subscribe to live data stream
async def subscribe():
transport = WebsocketsTransport(
url="wss://streaming.bitquery.io/graphql?token=" + token,
headers={"Sec-WebSocket-Protocol": "graphql-ws"}
)
await transport.connect()
print("connected")
try:
async for result in transport.subscribe(
gql("""
subscription MyQuery {
EVM(network: eth) {
DEXTrades {
Block {
Time
}
Transaction {
Hash
}
Trade {
Buy {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
Dex {
SmartContract
ProtocolName
ProtocolVersion
}
Sell {
Buyer
AmountInUSD
Amount
Seller
PriceInUSD
Price
Currency {
Name
Symbol
SmartContract
}
}
}
}
}
}
""")
):
# Processing incoming data
print("Live Data:", result)
except Exception as e:
print(f"Error during subscription: {e}")
finally:
await transport.close()
# Function to calculate time range and trigger backfill
def handle_disconnection(start_time, end_time):
print(f"Connection lost. Backfilling data from {start_time} to {end_time}.")
asyncio.run(backfill_data(start_time, end_time))
# Main function to run the subscription and detect disconnection
async def main():
start_time = datetime.datetime.now(datetime.timezone.utc)
try:
await subscribe()
except Exception as e:
print(f"Disconnection detected: {e}")
end_time = datetime.datetime.now(datetime.timezone.utc)
# Call backfill function to handle missing data
handle_disconnection(start_time.isoformat(), end_time.isoformat())
# Run the asyncio event loop
asyncio.run(main())
Running the System
Configure OAuth Token: Replace
"ory_at_...4"
with your actual Bitquery token.Execute the Script: Run the script using Python.
python your_script_name.py
- Monitor Output: The console will display messages indicating successful connections, live data, any errors, and backfilled data as needed.
Example Output:
connected
Live Data: {...} # Real-time data received
Error during subscription: Connection closed unexpectedly
Disconnection detected: Connection closed unexpectedly
Connection lost. Backfilling data from 2024-10-14T12:00:00+00:00 to 2024-10-14T12:05:00+00:00.
Backfilled Data: {...} # Historical data fetched to fill the gap
Alternative Approach: Using Block Heights to Backfill Data
You can implement an alternative approach that uses block heights instead of relying on time periods.
- Track the Last Processed Block Height: Continuously update and save the latest block height received from the live data stream.
- Detect Disconnections: Monitor the WebSocket connection for any interruptions.
- Backfill Missing Data: Using historical query, use the last saved block height to query and retrieve data from the missed blocks.
- Update the Last Processed Block Height: After successful backfilling, update the saved block height to reflect the latest processed block.
Conclusion
In this tutorial, we built a resilient real-time data streaming system using Python. The system effectively manages live data subscriptions, handles errors and disconnections, backfills any missing data to maintain data integrity, and ensures that connections are gracefully closed when necessary.