Calculating Real-time OHLC
In this section, we will create a real-time subscription to DEX trade data from the Solana API using WebSocket, processes incoming trades, and calculates OHLC (Open, High, Low, Close) data for one-minute intervals. To learn more about streaming Solana data via subscriptions, go here
Imports and Constants
import { createClient } from "graphql-ws";
- createClient: The
graphql-ws
library is used to create a WebSocket client for subscribing to real-time GraphQL data.
let client;
let lastBar = null;
let dataBuffer = [];
const BUFFER_TIMEOUT = 60000; // 1 minute interval for OHLC calculation
const BITQUERY_ENDPOINT =
"wss://streaming.bitquery.io/eap?token=ory_at_...";
- client: Stores the WebSocket client instance.
- lastBar: Stores the latest OHLC bar being calculated for the current minute.
- dataBuffer: Holds the finalized OHLC bars to be processed at regular intervals.
- BUFFER_TIMEOUT: Defines the time interval (one minute) for processing the buffer and calculating OHLC.
- BITQUERY_ENDPOINT: The WebSocket endpoint for Bitquery's real-time data API. The token is required for authentication.You can use the same token generated previously.
GraphQL Subscription Query
const subscriptionQuery = `
subscription {
Solana {
DEXTrades(
where: {Trade: {Buy: {Currency: {MintAddress: {is: "4Yx39Hkci49fdtyUGmrkDqTnVei9tmzPK9aac952xniv"}}}, Sell: {Currency: {MintAddress: {is: "So11111111111111111111111111111111111111112"}}}}}
) {
Trade {
Buy {
Price
}
}
Block {
Time
}
}
}
}
`;
- subscriptionQuery: This GraphQL subscription listens to real-time DEX trades on the Solana blockchain between two specific tokens (identified by their mint addresses). It captures the price of the buy-side trades and the block time of each trade.
processBuffer Function
function processBuffer(callback) {
if (lastBar) {
callback(lastBar);
lastBar = null; // Reset after processing
}
}
- processBuffer: This function processes the
lastBar
and sends it to thecallback
function (typically an update to the chart). After processing, it resetslastBar
to prepare for the next minute's data.
subscribeToWebSocket Function
export function subscribeToWebSocket(onRealtimeCallback) {
client = createClient({
url: BITQUERY_ENDPOINT
});
- subscribeToWebSocket: This function creates a WebSocket client and subscribes to real-time trade data. The
onRealtimeCallback
is passed as an argument to handle the emission of finalized OHLC bars.
Handling Incoming Trades
const onNext = (data) => {
const trade = data.data.Solana.DEXTrades[0];
const tradeTime = new Date(trade.Block.Time).getTime();
const price = parseFloat(trade.Trade.Buy.Price);
// Round the time to the nearest minute
const roundedTime = Math.floor(tradeTime / 60000) * 60000;
- onNext: This callback processes each trade received via the WebSocket. It extracts the trade's price and block time, rounding the time to the nearest minute to align trades within the same 1-minute interval for OHLC calculation.
Calculating OHLC for the Current Minute
// If it's a new minute, finalize the last bar and start a new one
if (!lastBar || lastBar.time !== roundedTime) {
if (lastBar) {
dataBuffer.push(lastBar); // Push the finalized bar to the buffer
}
lastBar = {
time: roundedTime,
open: price,
high: price,
low: price,
close: price,
volume: 1, // Can modify to include volume data if available
};
console.log("lastBar", lastBar)
} else {
// Update the OHLC data for the current minute
lastBar.high = Math.max(lastBar.high, price);
lastBar.low = Math.min(lastBar.low, price);
lastBar.close = price;
lastBar.volume += 1; // Increment trade count (or add volume if applicable)
}
};
- OHLC Calculation:
- If a new minute starts, the previous
lastBar
is pushed into thedataBuffer
, and a newlastBar
is created with the current trade's price as the open, high, low, and close values. - If the trade occurs within the same minute, the OHLC values are updated accordingly:
- high: The highest price observed.
- low: The lowest price observed.
- close: The latest price.
- volume: The number of trades (or volume, if available).
- If a new minute starts, the previous
WebSocket Subscription
client.subscribe(
{ query: subscriptionQuery },
{ next: onNext, error: console.error }
);
- client.subscribe: This initiates the subscription to the real-time data stream, using the
onNext
callback to process each incoming trade. Errors are logged usingconsole.error
.
Periodic Buffer Processing
// Process buffer every minute to emit the finalized OHLC bar
setInterval(() => {
if (dataBuffer.length > 0) {
processBuffer(onRealtimeCallback);
dataBuffer = lastBar ? [lastBar] : []; // Clear the buffer
}
}, BUFFER_TIMEOUT);
}
- setInterval: Every minute (
BUFFER_TIMEOUT
), the buffer is processed, and the finalized OHLC bars are emitted using theonRealtimeCallback
function. After processing, the buffer is cleared to prepare for the next interval.
unsubscribeFromWebSocket Function
export function unsubscribeFromWebSocket() {
if (client) {
client.dispose();
}
}
- unsubscribeFromWebSocket: This function disposes of the WebSocket client, stopping the real-time data stream when no longer needed.
Summary
The websocketOHLC.js
file is responsible for:
- Subscribing to a WebSocket data stream for real-time Solana DEX trades using Bitquery's Solana API.
- Calculating OHLC (Open, High, Low, Close) values in 1-minute intervals.
- Buffering and emitting the finalized OHLC bars at the end of each minute.
- Handling WebSocket subscription and cleanup.