Skip to content

Building a Real-Time DEX Price Monitor with Node.js and WebSockets

Posted on:March 17, 2025 at 10:00 AM

Real-time price data from DEXs is the foundation of arbitrage bots, portfolio trackers, and DeFi dashboards. But most tutorials show you how to poll a REST API — which introduces latency and API rate limits. The right approach: listen directly to blockchain events via WebSocket.

This post builds a complete DEX price monitor from scratch: Ethereum node WebSocket connection → Uniswap V2 event decoding → price calculation → client broadcast. All in TypeScript.

Table of contents

Open Table of contents

Architecture Overview

Ethereum Node (WebSocket)
│ eth_subscribe('logs', filter)
Event Listener
(Node.js/TypeScript)
│ Decode Swap event → calculate price
Price Aggregator
(per-pair state)
│ Broadcast via WebSocket
Connected Clients
(browser, bots, dashboards)

No polling. No REST API rate limits. Events flow in real-time as transactions are mined.

Prerequisites

Terminal window
npm init -y
npm install ethers ws express
npm install -D typescript @types/ws @types/node ts-node

You’ll need an Ethereum node with WebSocket support. Options:

Step 1: Uniswap V2 Event Structure

Uniswap V2 emits a Swap event on every trade:

event Swap(
address indexed sender,
uint amount0In,
uint amount1In,
uint amount0Out,
uint amount1Out,
address indexed to
);

The topic for this event (keccak256 of the signature):

keccak256("Swap(address,uint256,uint256,uint256,uint256,address)")
= 0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822

Step 2: The Event Listener

src/listener.ts
import { ethers, WebSocketProvider } from 'ethers';
const UNISWAP_V2_SWAP_TOPIC =
'0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822';
// Monitor specific pairs — add more as needed
const MONITORED_PAIRS: Record<string, { name: string; token0Decimals: number; token1Decimals: number }> = {
'0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc': {
name: 'USDC/ETH',
token0Decimals: 6, // USDC
token1Decimals: 18, // WETH
},
'0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852': {
name: 'ETH/USDT',
token0Decimals: 18, // WETH
token1Decimals: 6, // USDT
},
};
const SWAP_EVENT_ABI = [
'event Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)',
];
const iface = new ethers.Interface(SWAP_EVENT_ABI);
export interface PriceUpdate {
pair: string;
pairAddress: string;
price: number; // token1 per token0 (e.g., USDT per WETH)
volume: number; // trade volume in token0
timestamp: number;
txHash: string;
blockNumber: number;
}
export type PriceUpdateCallback = (update: PriceUpdate) => void;
export class DEXListener {
private provider: WebSocketProvider;
private callbacks: PriceUpdateCallback[] = [];
private reconnectTimer?: NodeJS.Timeout;
constructor(private readonly wsUrl: string) {
this.provider = this.createProvider();
}
private createProvider(): WebSocketProvider {
const provider = new WebSocketProvider(this.wsUrl);
provider.on('error', (err) => {
console.error('Provider error:', err);
this.scheduleReconnect();
});
// ethers v6 doesn't expose 'close' directly — monitor via polling
const checkConnection = setInterval(async () => {
try {
await provider.getBlockNumber();
} catch {
clearInterval(checkConnection);
this.scheduleReconnect();
}
}, 30_000);
return provider;
}
private scheduleReconnect() {
if (this.reconnectTimer) return;
console.log('Reconnecting in 5 seconds...');
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = undefined;
this.provider = this.createProvider();
this.startListening();
}, 5_000);
}
onPriceUpdate(callback: PriceUpdateCallback) {
this.callbacks.push(callback);
}
async startListening() {
const pairAddresses = Object.keys(MONITORED_PAIRS);
console.log(`Listening to ${pairAddresses.length} pairs...`);
// Subscribe to Swap events from all monitored pairs
this.provider.on(
{
address: pairAddresses,
topics: [UNISWAP_V2_SWAP_TOPIC],
},
async (log) => {
await this.processLog(log);
}
);
}
private async processLog(log: ethers.Log) {
const pairAddress = log.address.toLowerCase();
const pairInfo = MONITORED_PAIRS[pairAddress];
if (!pairInfo) return;
try {
const decoded = iface.parseLog({
topics: log.topics as string[],
data: log.data,
});
if (!decoded) return;
const { amount0In, amount1In, amount0Out, amount1Out } = decoded.args;
// Determine direction and calculate price
let price: number;
let volume: number;
const t0Dec = 10 ** pairInfo.token0Decimals;
const t1Dec = 10 ** pairInfo.token1Decimals;
if (amount0In > 0n) {
// Selling token0 for token1: price = token1Out / token0In
const t0InFormatted = Number(amount0In) / t0Dec;
const t1OutFormatted = Number(amount1Out) / t1Dec;
price = t1OutFormatted / t0InFormatted;
volume = t0InFormatted;
} else {
// Selling token1 for token0: price = token1In / token0Out
const t0OutFormatted = Number(amount0Out) / t0Dec;
const t1InFormatted = Number(amount1In) / t1Dec;
price = t1InFormatted / t0OutFormatted;
volume = t0OutFormatted;
}
const update: PriceUpdate = {
pair: pairInfo.name,
pairAddress,
price,
volume,
timestamp: Date.now(),
txHash: log.transactionHash,
blockNumber: log.blockNumber,
};
this.callbacks.forEach(cb => cb(update));
} catch (err) {
console.error('Error processing log:', err);
}
}
async stop() {
await this.provider.destroy();
}
}

Step 3: Price Aggregator

Raw swap events are noisy — we want OHLCV candles, not individual swaps:

src/aggregator.ts
import { PriceUpdate } from './listener';
interface Candle {
open: number;
high: number;
low: number;
close: number;
volume: number;
timestamp: number; // candle start time
trades: number;
}
export class PriceAggregator {
private candles = new Map<string, Map<number, Candle>>(); // pair → timestamp → candle
private latestPrice = new Map<string, number>();
private readonly candleDurationMs = 60_000; // 1-minute candles
process(update: PriceUpdate) {
const { pair, price, volume, timestamp } = update;
// Update latest price
this.latestPrice.set(pair, price);
// Update 1m candle
const candleStart = Math.floor(timestamp / this.candleDurationMs) * this.candleDurationMs;
if (!this.candles.has(pair)) {
this.candles.set(pair, new Map());
}
const pairCandles = this.candles.get(pair)!;
const existing = pairCandles.get(candleStart);
if (existing) {
existing.high = Math.max(existing.high, price);
existing.low = Math.min(existing.low, price);
existing.close = price;
existing.volume += volume;
existing.trades++;
} else {
pairCandles.set(candleStart, {
open: price,
high: price,
low: price,
close: price,
volume,
timestamp: candleStart,
trades: 1,
});
}
// Keep only last 1440 candles (24 hours of 1m candles)
if (pairCandles.size > 1440) {
const oldestKey = [...pairCandles.keys()].sort()[0];
pairCandles.delete(oldestKey);
}
}
getLatestPrice(pair: string): number | undefined {
return this.latestPrice.get(pair);
}
getCandles(pair: string, limit = 60): Candle[] {
const pairCandles = this.candles.get(pair);
if (!pairCandles) return [];
return [...pairCandles.values()]
.sort((a, b) => a.timestamp - b.timestamp)
.slice(-limit);
}
getAllPrices(): Record<string, number> {
return Object.fromEntries(this.latestPrice);
}
}

Step 4: WebSocket Server for Clients

src/server.ts
import express from 'express';
import { WebSocketServer, WebSocket } from 'ws';
import { createServer } from 'http';
import { DEXListener } from './listener';
import { PriceAggregator } from './aggregator';
const app = express();
const httpServer = createServer(app);
const wss = new WebSocketServer({ server: httpServer });
const listener = new DEXListener(process.env.WS_URL!);
const aggregator = new PriceAggregator();
// REST API for historical data
app.get('/candles/:pair', (req, res) => {
const candles = aggregator.getCandles(
req.params.pair,
parseInt(req.query.limit as string) || 60
);
res.json(candles);
});
app.get('/prices', (req, res) => {
res.json(aggregator.getAllPrices());
});
// WebSocket broadcast
const clients = new Set<WebSocket>();
wss.on('connection', (ws) => {
clients.add(ws);
console.log(`Client connected. Total: ${clients.size}`);
// Send current prices immediately on connect
ws.send(JSON.stringify({
type: 'snapshot',
prices: aggregator.getAllPrices(),
}));
ws.on('close', () => {
clients.delete(ws);
console.log(`Client disconnected. Total: ${clients.size}`);
});
});
function broadcast(data: object) {
const message = JSON.stringify(data);
for (const client of clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
}
// Wire everything together
listener.onPriceUpdate((update) => {
aggregator.process(update);
// Broadcast live swap to all connected clients
broadcast({
type: 'swap',
pair: update.pair,
price: update.price,
volume: update.volume,
txHash: update.txHash,
timestamp: update.timestamp,
});
});
async function start() {
await listener.startListening();
httpServer.listen(3000, () => {
console.log('Server running on http://localhost:3000');
console.log('WebSocket: ws://localhost:3000');
});
}
start().catch(console.error);

Step 5: Client Connection (Browser)

// Frontend — minimal price ticker
const ws = new WebSocket('ws://localhost:3000');
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'snapshot') {
// Initial state
Object.entries(msg.prices).forEach(([pair, price]) => {
updatePriceDisplay(pair, price);
});
}
if (msg.type === 'swap') {
updatePriceDisplay(msg.pair, msg.price);
flashIndicator(msg.pair, msg.price);
}
};
ws.onclose = () => {
setTimeout(() => window.location.reload(), 3000); // Auto-reconnect
};

Production Considerations

Rate limiting: A single busy pair like ETH/USDC can have 10-20 swaps per minute on mainnet. Your WebSocket broadcast handles this trivially, but be careful about per-client message rates if clients run on mobile connections.

Node reconnection: WebSocket connections to Ethereum nodes drop periodically. The scheduleReconnect() logic above handles this, but you should also implement exponential backoff for repeated failures.

Memory: Keeping 1440 candles × N pairs in memory is fine for small N. For production with hundreds of pairs, use Redis for candle storage.

Block reorgs: Occasionally, blocks are reorganized — transactions that appeared confirmed get reverted. For a price monitor, this is usually acceptable (the price update will correct itself in the next block). For financial applications, track log.removed === true events and reverse the update.

Multiple nodes: For reliability in production, connect to 2-3 nodes simultaneously and deduplicate events by txHash + logIndex.

The full implementation handles ~50 pairs with sub-100ms latency from on-chain swap to client display. For a bot or arbitrage system, you’d add position-tracking logic on top of this same event stream — the foundation is identical.