Real-time price data from DEXs is the foundation of arbitrage bots, portfolio trackers, and DeFi dashboards. But most tutorials show you how to poll a REST API — which introduces latency and API rate limits. The right approach: listen directly to blockchain events via WebSocket.
This post builds a complete DEX price monitor from scratch: Ethereum node WebSocket connection → Uniswap V2 event decoding → price calculation → client broadcast. All in TypeScript.
Table of contents
Open Table of contents
Architecture Overview
Ethereum Node (WebSocket) │ │ eth_subscribe('logs', filter) ▼ Event Listener (Node.js/TypeScript) │ │ Decode Swap event → calculate price ▼ Price Aggregator (per-pair state) │ │ Broadcast via WebSocket ▼ Connected Clients (browser, bots, dashboards)No polling. No REST API rate limits. Events flow in real-time as transactions are mined.
Prerequisites
npm init -ynpm install ethers ws expressnpm install -D typescript @types/ws @types/node ts-nodeYou’ll need an Ethereum node with WebSocket support. Options:
- Alchemy: Free tier includes WebSocket, 300M compute units/month
- Infura: Similar free tier
- QuickNode: Better performance, paid
- Self-hosted: geth or erigon with
--wsflag
Step 1: Uniswap V2 Event Structure
Uniswap V2 emits a Swap event on every trade:
event Swap( address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to);The topic for this event (keccak256 of the signature):
keccak256("Swap(address,uint256,uint256,uint256,uint256,address)")= 0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822Step 2: The Event Listener
import { ethers, WebSocketProvider } from 'ethers';
const UNISWAP_V2_SWAP_TOPIC = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822';
// Monitor specific pairs — add more as neededconst MONITORED_PAIRS: Record<string, { name: string; token0Decimals: number; token1Decimals: number }> = { '0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc': { name: 'USDC/ETH', token0Decimals: 6, // USDC token1Decimals: 18, // WETH }, '0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852': { name: 'ETH/USDT', token0Decimals: 18, // WETH token1Decimals: 6, // USDT },};
const SWAP_EVENT_ABI = [ 'event Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)',];
const iface = new ethers.Interface(SWAP_EVENT_ABI);
export interface PriceUpdate { pair: string; pairAddress: string; price: number; // token1 per token0 (e.g., USDT per WETH) volume: number; // trade volume in token0 timestamp: number; txHash: string; blockNumber: number;}
export type PriceUpdateCallback = (update: PriceUpdate) => void;
export class DEXListener { private provider: WebSocketProvider; private callbacks: PriceUpdateCallback[] = []; private reconnectTimer?: NodeJS.Timeout;
constructor(private readonly wsUrl: string) { this.provider = this.createProvider(); }
private createProvider(): WebSocketProvider { const provider = new WebSocketProvider(this.wsUrl);
provider.on('error', (err) => { console.error('Provider error:', err); this.scheduleReconnect(); });
// ethers v6 doesn't expose 'close' directly — monitor via polling const checkConnection = setInterval(async () => { try { await provider.getBlockNumber(); } catch { clearInterval(checkConnection); this.scheduleReconnect(); } }, 30_000);
return provider; }
private scheduleReconnect() { if (this.reconnectTimer) return; console.log('Reconnecting in 5 seconds...'); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = undefined; this.provider = this.createProvider(); this.startListening(); }, 5_000); }
onPriceUpdate(callback: PriceUpdateCallback) { this.callbacks.push(callback); }
async startListening() { const pairAddresses = Object.keys(MONITORED_PAIRS);
console.log(`Listening to ${pairAddresses.length} pairs...`);
// Subscribe to Swap events from all monitored pairs this.provider.on( { address: pairAddresses, topics: [UNISWAP_V2_SWAP_TOPIC], }, async (log) => { await this.processLog(log); } ); }
private async processLog(log: ethers.Log) { const pairAddress = log.address.toLowerCase(); const pairInfo = MONITORED_PAIRS[pairAddress]; if (!pairInfo) return;
try { const decoded = iface.parseLog({ topics: log.topics as string[], data: log.data, });
if (!decoded) return;
const { amount0In, amount1In, amount0Out, amount1Out } = decoded.args;
// Determine direction and calculate price let price: number; let volume: number;
const t0Dec = 10 ** pairInfo.token0Decimals; const t1Dec = 10 ** pairInfo.token1Decimals;
if (amount0In > 0n) { // Selling token0 for token1: price = token1Out / token0In const t0InFormatted = Number(amount0In) / t0Dec; const t1OutFormatted = Number(amount1Out) / t1Dec; price = t1OutFormatted / t0InFormatted; volume = t0InFormatted; } else { // Selling token1 for token0: price = token1In / token0Out const t0OutFormatted = Number(amount0Out) / t0Dec; const t1InFormatted = Number(amount1In) / t1Dec; price = t1InFormatted / t0OutFormatted; volume = t0OutFormatted; }
const update: PriceUpdate = { pair: pairInfo.name, pairAddress, price, volume, timestamp: Date.now(), txHash: log.transactionHash, blockNumber: log.blockNumber, };
this.callbacks.forEach(cb => cb(update));
} catch (err) { console.error('Error processing log:', err); } }
async stop() { await this.provider.destroy(); }}Step 3: Price Aggregator
Raw swap events are noisy — we want OHLCV candles, not individual swaps:
import { PriceUpdate } from './listener';
interface Candle { open: number; high: number; low: number; close: number; volume: number; timestamp: number; // candle start time trades: number;}
export class PriceAggregator { private candles = new Map<string, Map<number, Candle>>(); // pair → timestamp → candle private latestPrice = new Map<string, number>(); private readonly candleDurationMs = 60_000; // 1-minute candles
process(update: PriceUpdate) { const { pair, price, volume, timestamp } = update;
// Update latest price this.latestPrice.set(pair, price);
// Update 1m candle const candleStart = Math.floor(timestamp / this.candleDurationMs) * this.candleDurationMs;
if (!this.candles.has(pair)) { this.candles.set(pair, new Map()); }
const pairCandles = this.candles.get(pair)!; const existing = pairCandles.get(candleStart);
if (existing) { existing.high = Math.max(existing.high, price); existing.low = Math.min(existing.low, price); existing.close = price; existing.volume += volume; existing.trades++; } else { pairCandles.set(candleStart, { open: price, high: price, low: price, close: price, volume, timestamp: candleStart, trades: 1, }); }
// Keep only last 1440 candles (24 hours of 1m candles) if (pairCandles.size > 1440) { const oldestKey = [...pairCandles.keys()].sort()[0]; pairCandles.delete(oldestKey); } }
getLatestPrice(pair: string): number | undefined { return this.latestPrice.get(pair); }
getCandles(pair: string, limit = 60): Candle[] { const pairCandles = this.candles.get(pair); if (!pairCandles) return [];
return [...pairCandles.values()] .sort((a, b) => a.timestamp - b.timestamp) .slice(-limit); }
getAllPrices(): Record<string, number> { return Object.fromEntries(this.latestPrice); }}Step 4: WebSocket Server for Clients
import express from 'express';import { WebSocketServer, WebSocket } from 'ws';import { createServer } from 'http';import { DEXListener } from './listener';import { PriceAggregator } from './aggregator';
const app = express();const httpServer = createServer(app);const wss = new WebSocketServer({ server: httpServer });
const listener = new DEXListener(process.env.WS_URL!);const aggregator = new PriceAggregator();
// REST API for historical dataapp.get('/candles/:pair', (req, res) => { const candles = aggregator.getCandles( req.params.pair, parseInt(req.query.limit as string) || 60 ); res.json(candles);});
app.get('/prices', (req, res) => { res.json(aggregator.getAllPrices());});
// WebSocket broadcastconst clients = new Set<WebSocket>();
wss.on('connection', (ws) => { clients.add(ws); console.log(`Client connected. Total: ${clients.size}`);
// Send current prices immediately on connect ws.send(JSON.stringify({ type: 'snapshot', prices: aggregator.getAllPrices(), }));
ws.on('close', () => { clients.delete(ws); console.log(`Client disconnected. Total: ${clients.size}`); });});
function broadcast(data: object) { const message = JSON.stringify(data); for (const client of clients) { if (client.readyState === WebSocket.OPEN) { client.send(message); } }}
// Wire everything togetherlistener.onPriceUpdate((update) => { aggregator.process(update);
// Broadcast live swap to all connected clients broadcast({ type: 'swap', pair: update.pair, price: update.price, volume: update.volume, txHash: update.txHash, timestamp: update.timestamp, });});
async function start() { await listener.startListening();
httpServer.listen(3000, () => { console.log('Server running on http://localhost:3000'); console.log('WebSocket: ws://localhost:3000'); });}
start().catch(console.error);Step 5: Client Connection (Browser)
// Frontend — minimal price tickerconst ws = new WebSocket('ws://localhost:3000');
ws.onmessage = (event) => { const msg = JSON.parse(event.data);
if (msg.type === 'snapshot') { // Initial state Object.entries(msg.prices).forEach(([pair, price]) => { updatePriceDisplay(pair, price); }); }
if (msg.type === 'swap') { updatePriceDisplay(msg.pair, msg.price); flashIndicator(msg.pair, msg.price); }};
ws.onclose = () => { setTimeout(() => window.location.reload(), 3000); // Auto-reconnect};Production Considerations
Rate limiting: A single busy pair like ETH/USDC can have 10-20 swaps per minute on mainnet. Your WebSocket broadcast handles this trivially, but be careful about per-client message rates if clients run on mobile connections.
Node reconnection: WebSocket connections to Ethereum nodes drop periodically. The scheduleReconnect() logic above handles this, but you should also implement exponential backoff for repeated failures.
Memory: Keeping 1440 candles × N pairs in memory is fine for small N. For production with hundreds of pairs, use Redis for candle storage.
Block reorgs: Occasionally, blocks are reorganized — transactions that appeared confirmed get reverted. For a price monitor, this is usually acceptable (the price update will correct itself in the next block). For financial applications, track log.removed === true events and reverse the update.
Multiple nodes: For reliability in production, connect to 2-3 nodes simultaneously and deduplicate events by txHash + logIndex.
The full implementation handles ~50 pairs with sub-100ms latency from on-chain swap to client display. For a bot or arbitrage system, you’d add position-tracking logic on top of this same event stream — the foundation is identical.