Skip to main content

Monitoring the Solana Blockchain in Real Time - Easy Tutorial

Monitoring blockchain activities is crucial for developers, investors, and regulatory bodies. Bitquery offers the infrastructure to monitor blockchain activities effectively. In this tutorial, we'll use Bitquery Solana real-time data and Python to build a real-time Solana DEX monitoring dashboard.

We will create a dashboard that monitors Solana DEX trades and Raydium DEX trades. Here is a step-by-step guide to build this system. The code can be found here on GitHub.

Prerequisites

Ensure you have Python installed and pip configured correctly. Install the required libraries:

pip install asyncio pandas streamlit gql websockets

Step 1: Import Necessary Libraries

The first step is to import all necessary libraries that the script requires. These include libraries for asynchronous programming, data manipulation, creating the web interface, and handling GraphQL subscriptions over WebSockets.

import asyncio
import pandas as pd
import streamlit as st
from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport

Step 2: Set Up the WebSocket Connection

To generate a token to run the subscription, check the guide here.

Define the asynchronous function run_subscription to establish and manage the WebSocket connection with Bitquery. This function will handle subscribing to the real-time data streams.

async def run_subscription():
# Setup WebSocket connection
transport = WebsocketsTransport(
url="wss://streaming.bitquery.io/eap?token=<YOUR TOKEN HERE>",
headers={"Sec-WebSocket-Protocol": "graphql-ws"}
)

# Establish the connection
await transport.connect()
print("Connected to WebSocket")

Step 3: Handle User Input

Use Streamlit's sidebar to allow users to choose between different datasets ('General' or 'Raydium'):

page = st.sidebar.radio("Select Page", ["General", "Raydium"])
general_df = pd.DataFrame()
raydium_df = pd.DataFrame()

Step 4: Subscribe to GraphQL Queries

Based on user selection, subscribe to the respective GraphQL queries to receive real-time data. Process the incoming data and update the dashboard accordingly.

All DEX Trades:

if page == "General":
st.subheader("General Table")
table = st.table(general_df)
while True:
async for result in transport.subscribe(
gql("""
subscription {
Solana {
General: DEXTradeByTokens {
Block { Time }
Trade {
Amount
Price
Currency { Symbol Name }
Side { Amount Currency { Symbol Name MetadataAddress }}
Dex { ProgramAddress ProtocolFamily ProtocolName }
Market { MarketAddress }
Order { LimitAmount LimitPrice OrderId }
PriceInUSD
}
}
}
}
""")
):
if result.data:
new_data = pd.json_normalize(result.data['Solana']['General'])
general_df = pd.concat([general_df, new_data], ignore_index=True)
with st.spinner('Updating data...'):
table.table(general_df)

Raydium DEX Trades:

elif page == "Raydium":
st.subheader("Raydium Table")
table = st.table(raydium_df)
while True:
async for result in transport.subscribe(
gql("""
subscription {
Solana {
Raydium: DEXTrades(
where: {Trade: {Dex: {ProgramAddress: {is: "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"}}}}
) {
Trade {
Dex { ProgramAddress ProtocolName }
Buy {
Account { Address }
Amount
Currency { Symbol Name }
PriceInUSD
}
Sell {
Account { Address }
Amount
Currency { Symbol Name }
PriceInUSD
}
}
Block { Time Height }
Transaction { Signature }
}
}
}
""")
):
if result.data:
new_data = pd.json_normalize(result.data['Solana']['Raydium'])
raydium_df = pd.concat([raydium_df, new_data], ignore_index=True)
with st.spinner('Updating data...'):
table.table(raydium_df)

Step 5: Clean Up and Close Connection

Once the subscription ends or the user closes the dashboard, ensure to properly close the WebSocket connection to free up resources.

finally:
await transport.close()

Step 6: Run the Streamlit App

Define the main entry point for your Streamlit application:

def main():
st.title("Solana DEX General & Raydium Data Dashboard")
asyncio.run(run_subscription())

if __name__ == "__main__":
main()

The Final Result