Skip to main content

Build Trading Indicators for Crypto Data

Technical indicators are a vital tool for cryptocurrency traders, as they can help to identify trends, predict price movements, and make informed trading decisions.

With our new Crypto Price Stream we can get real-time crypto market data with Simple Moving Average (SMA), Exponential Moving Average (EMA), and Weighted Simple Moving Average (WSMA) at 1-second interval precalculated.

In this tutorial we will see how to stream them and calculate advanced trading indicators. This code is available readily as a Python package here

The list of indicators we will be calculating

  • Simple Moving Average (SMA)
  • Exponential Moving Average (EMA)
  • Weighted Simple Moving Average
  • Relative Strength Index (RSI)
  • Volume Weighted Average Price (VWAP)

Sample Output​

Step 1: Setup WebSocket Connection

from gql import gql
from gql.transport.websockets import WebsocketsTransport
import config # oauth_token = "YOUR_TOKEN"

transport = WebsocketsTransport(
url=f"wss://streaming.bitquery.io/graphql?token={config.oauth_token}",
headers={"Sec-WebSocket-Protocol": "graphql-ws"}
)
await transport.connect()
print("Connected")

Remember: Replace YOUR_TOKEN with an OAuth token from your Bitquery account. You can generate it here

Step 2: Subscribe to the Price Feed

We write a query to get OHLC data for the Uniswap pair ETH-DAI, filtering for trades in the past 10 minutes and returning the volume, trade details, and block timestamps. You can extend the query to get data for longer duration of different periods like year/month/day.

We’ll request Solana token prices in 1-minute intervals, including SMA/EMA/WSMA.

query = gql("""
subscription {
Trading {
Tokens(
where: { Token: { Network: { is: "Solana" } },
Interval: { Time: { Duration: { eq: 1 } } } }
) {
Token { Address Symbol }
Interval { Time { End } }
Volume { Base Quote Usd }
Price {
Ohlc { Close }
Average { SimpleMoving ExponentialMoving WeightedSimpleMoving }
}
}
}
}
""")

async for data in transport.subscribe(query):
for t in data["Trading"]["Tokens"]:
close = t["Price"]["Ohlc"]["Close"]
sma = t["Price"]["Average"]["SimpleMoving"]
ema = t["Price"]["Average"]["ExponentialMoving"]
wsma = t["Price"]["Average"]["WeightedSimpleMoving"]
# Choose a volume basis suitable for your use case
volume = (t.get("Volume", {}).get("Base")
or t.get("Volume", {}).get("Quote")
or t.get("Volume", {}).get("Usd")
or 1.0)
print(t["Token"]["Address"], sma, ema, wsma, close, volume)

Step 3: Calculate RSI in Real Time

We’ll keep RSI state in memory for each token and update it with Wilder’s smoothing.

rsi_period = 14
state = {} # address → RSI state

def update_rsi(address, close):
s = state.setdefault(address, {"prev_close": None, "avg_gain": 0, "avg_loss": 0, "count": 0, "initialized": False})
if s["prev_close"] is None:
s["prev_close"] = close
return None

delta = close - s["prev_close"]
gain, loss = max(delta, 0), max(-delta, 0)

if not s["initialized"]:
s["avg_gain"] += gain
s["avg_loss"] += loss
s["count"] += 1
if s["count"] >= rsi_period:
s["avg_gain"] /= rsi_period
s["avg_loss"] /= rsi_period
s["initialized"] = True
else:
s["avg_gain"] = ((s["avg_gain"] * (rsi_period - 1)) + gain) / rsi_period
s["avg_loss"] = ((s["avg_loss"] * (rsi_period - 1)) + loss) / rsi_period

s["prev_close"] = close
if not s["initialized"]:
return None
return 100 if s["avg_loss"] == 0 else 100 - (100 / (1 + s["avg_gain"] / s["avg_loss"]))

Step 4: Calculate VWAP (rolling window)

We maintain a small rolling window of prices and volumes per token for a simple, streaming VWAP.

from collections import defaultdict

vwap_period = 20
vwap_state = defaultdict(lambda: {"prices": [], "volumes": [], "pv_sum": 0.0, "vol_sum": 0.0})

def update_vwap(address, price, volume):
s = vwap_state[address]
s["prices"].append(price)
s["volumes"].append(volume)
if len(s["prices"]) > vwap_period:
rp = s["prices"].pop(0)
rv = s["volumes"].pop(0)
s["pv_sum"] -= rp * rv
s["vol_sum"] -= rv
s["pv_sum"] += price * volume
s["vol_sum"] += volume
return s["pv_sum"] / s["vol_sum"] if s["vol_sum"] > 0 else price

Step 5: Putting It Together

Inside your subscription loop:

for t in data["Trading"]["Tokens"]:
address = t["Token"]["Address"]
close = t["Price"]["Ohlc"]["Close"]
sma = t["Price"]["Average"]["SimpleMoving"]
ema = t["Price"]["Average"]["ExponentialMoving"]
wsma = t["Price"]["Average"]["WeightedSimpleMoving"]
volume = (t.get("Volume", {}).get("Base")
or t.get("Volume", {}).get("Quote")
or t.get("Volume", {}).get("Usd")
or 1.0)

rsi = update_rsi(address, close)
vwap = update_vwap(address, float(close), float(volume))
if rsi is not None:
print(f"{address} SMA:{sma} EMA:{ema} WSMA:{wsma} VWAP:{vwap:.6f} RSI:{rsi:.2f} Close:{close}")

Advantages of this approach:

  • No polling — prices are pushed to you as soon as they’re available.
  • Built-in indicators — SMA/EMA/WSMA come directly from Bitquery.
  • Custom logic — you can add advanced indicators like RSI without losing speed.