Skip to main content

Building with WebSockets: Code Sample in Rust

In this section we will see how to use bitquery subscriptions in Rust. The final output will look something like this

You can find the complete code here

1. Set Up Your Rust Project

First, ensure you have a Rust project set up. If not, you can create a new one:

cargo new bitquery_realtime
cd bitquery_realtime

2. Add Dependencies

Open your Cargo.toml file and add the necessary dependencies. For real-time data fetching and processing, you'll likely need dependencies like tokio, reqwest, serde, and serde_json.

[dependencies]
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = "0.3"
graphql-client = "0.10.0" # Adjust version as necessary

3. Implement the Logic

This is the basic outline of what we will do

  • Establish a WebSocket connection to Bitquery's streaming endpoint. Use use /eap instead of /graphql if you are using chains on EAP endpoint.
  • Set up up a GraphQL subscription to receive real-time DEX trades data.
  • The subscribe function handles the connection setup and starts the streaming operation.
  • The main function initializes the subscription and processes the incoming data.

This Rust code sets up a WebSocket client to subscribe to real-time data from Bitquery Solana APIs using the GraphQL over WebSocket protocol.

How Everything Connects

  1. Tokio (Async Runtime): Handles asynchronous operations.
  2. WebSockets (async_tungstenite): Maintains a real-time connection with Bitquery’s API.
  3. GraphQL Queries (queries.rs): Defines how we request and handle blockchain data.
  4. Tokio Task Spawner (tokio_spawner.rs): Helps manage background tasks.
  5. Main Function (main.rs): Sets up and runs our application.

TokioSpawner Module

Let's create a TokioSpawner module responsible for handling task execution within the Tokio runtime. It allows tasks to be spawned asynchronously within the WebSocket client.

Key Features:

  • Encapsulates Tokio's Runtime Handle:
    It holds a reference to the Tokio runtime, ensuring tasks are spawned properly.
  • Implements futures::task::Spawn:
    This trait allows it to be used as a task spawner for the GraphQL WebSocket client.

Implementation Details:

pub struct TokioSpawner(tokio::runtime::Handle);

impl TokioSpawner {
pub fn new(handle: tokio::runtime::Handle) -> Self {
TokioSpawner(handle)
}

pub fn current() -> Self {
TokioSpawner::new(tokio::runtime::Handle::current())
}
}

impl futures::task::Spawn for TokioSpawner {
fn spawn_obj(
&self,
object: futures::task::FutureObj<'static, ()>,
) -> Result<(), futures::task::SpawnError> {
self.0.spawn(object);
Ok(())
}
}

How It Works:

  1. TokioSpawner::current() retrieves the current Tokio runtime handle.
  2. It implements the Spawn trait from futures::task, allowing it to schedule tasks asynchronously.
  3. This spawner is passed into the WebSocket client for handling GraphQL subscription events efficiently.

GraphQL Queries Module (queries.rs)

Create a queries.rs file that defines custom data types and the GraphQL subscription structure for fetching real-time DEX trades.

Key Features:

  • Defines Custom Scalar Types:
    GraphQL often uses custom scalar types that don’t directly map to Rust’s built-in types. This file defines wrappers for common data types like Decimal, BigInt, Timestamp, and DateTime.
  • Implements GraphQL Query with graphql_client:
    The DexTrades struct is automatically generated by the graphql_client crate and represents the structure of the GraphQL query.

Custom Scalar Type Implementations

#[derive(Debug, Clone, Deserialize)]
pub struct Decimal(pub String);

impl Decimal {
pub fn new(decimal: String) -> Self {
Self(decimal)
}
}

impl From<String> for Decimal {
fn from(item: String) -> Self {
Self::new(item)
}
}

  • The Decimal struct ensures that decimal numbers returned as strings from GraphQL are handled properly.
  • Similar wrappers exist for:
    • BigInt (large integers stored as strings)
    • Timestamp (Unix timestamps)
    • DateTime (ISO 8601 formatted date-time strings)

GraphQL Query Definition

#[derive(GraphQLQuery, Debug, Clone, Deserialize)]
#[graphql(
schema_path = "graphql/schema.graphql",
query_path = "graphql/subscriptions/dextrades.graphql",
response_derives = "Debug, Clone"
)]
pub struct DexTrades;

This defines the GraphQL subscription structure for DEX trades.

subscribe Function

pub async fn subscribe<T: GraphQLQuery + Send + Sync + Unpin + 'static>(
oauth_token: &str,
variables: T::Variables,
) -> Result<(
AsyncWebsocketClient<GraphQLClient, Message>,
SubscriptionStream<GraphQLClient, StreamingOperation<T>>,
)>
where
<T as GraphQLQuery>::Variables: Send + Sync + Unpin,
<T as GraphQLQuery>::ResponseData: std::fmt::Debug,
{
let mut request = "wss://streaming.bitquery.io/eap".into_client_request()?;
request.headers_mut().insert(
header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_str("graphql-transport-ws")?,
);
request
.headers_mut()
.insert("Authorization", HeaderValue::from_str(format!("Bearer {}", oauth_token).as_str())?);

let (connection, _) = async_tungstenite::tokio::connect_async(request).await?;

let (sink, stream) = connection.split::<Message>();

let mut client = GraphQLClientClientBuilder::new()
.build(stream, sink, TokioSpawner::current())
.await?;

let stream = client
.streaming_operation(StreamingOperation::<T>::new(variables))
.await?;

Ok((client, stream))
}

This function sets up a WebSocket connection to the Bitquery streaming endpoint.

  1. Create WebSocket Request:

    • Create a WebSocket request to the Bitquery streaming endpoint.
    • Add headers for the WebSocket protocol and authorization.
  2. Connect to WebSocket:

    • Establish an asynchronous WebSocket connection using async_tungstenite.
  3. Split Connection:

    • Split the connection into a sink (for sending messages) and a stream (for receiving messages).
  4. Build GraphQL Client:

    • Create a GraphQLClient using GraphQLClientClientBuilder.
  5. Start Streaming Operation:

    • Start a streaming GraphQL operation using the provided variables.
  6. Return Client and Stream:

    • Return the client and the stream of data.

Main Function

In this function we will pass the OAuth token. The best practise would be to include it as an environment variable, but for the sake of this tutorial it has been hard coded. You can generate a token here

Imports

use async_tungstenite::tungstenite::{
client::IntoClientRequest,
http::{header, HeaderValue},
Message,
};
use eyre::Result;
use futures::StreamExt;
use graphql_client::GraphQLQuery;
use graphql_ws_client::{
graphql::{GraphQLClient, StreamingOperation},
AsyncWebsocketClient, GraphQLClientClientBuilder, SubscriptionStream,
};

pub mod queries;
mod tokio_spawner;

use tokio_spawner::TokioSpawner;
  • async_tungstenite: Used for WebSocket communication.
  • eyre::Result: A result type for error handling.
  • futures::StreamExt: Provides extensions for working with streams.
  • graphql_client::GraphQLQuery: Defines the GraphQL query structure.
  • graphql_ws_client: Provides WebSocket client functionality for GraphQL.
  • queries: Contains the GraphQL queries.
  • tokio_spawner: Contains the task spawner implementation.

Type Definitions

pub type DexTradesQuery = queries::DexTrades;
pub type DexTradesVariables = queries::dex_trades::Variables;

Defines type aliases for the GraphQL query and variables.

#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
let token = "YOUR_HARDCODED_TOKEN";
let (_client, mut stream) = subscribe::<DexTradesQuery>(
token,
DexTradesVariables {
program_id: "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P".to_string(),
},
)
.await?;

while let Some(response) = stream.next().await {
dbg!(&response);
}

Ok(())
}

This is the entry point of the application.

  1. Load Environment Variables:

    • Load environment variables from a .env file if it exists.
  2. Subscribe to Data:

    • Call the subscribe function with the hardcoded API key and query variables.
  3. Process Incoming Data:

    • Iterate over the stream of incoming responses.
    • Print each response using dbg!.

4. Run the Project

With everything set up, you can now build and run your project:

cargo run

This should start your application and begin fetching real-time data from Bitquery, printing the results to the console.