Building Real-Time APIs with WebSockets and Server-Sent Events

Oct 26, 2025
realtimewebsocketsssehttp3
0

Real-time APIs power collaborative apps, dashboards, and notifications. This guide compares WebSockets and SSE, shows implementation patterns, and covers scaling, security, and observability for production systems.

Executive Summary

Real-time APIs enable instant communication between clients and servers, powering modern applications like chat systems, live dashboards, collaborative editing, and real-time notifications. This comprehensive guide covers the essential patterns, technologies, and best practices for building scalable, secure, and maintainable real-time APIs.

Key technologies covered:

  • WebSockets: Bidirectional, low-latency communication
  • Server-Sent Events (SSE): Unidirectional server-to-client streaming
  • HTTP/3: Next-generation protocol with improved performance
  • WebRTC: Peer-to-peer communication for ultra-low latency

Technology Comparison

When to Choose What

Technology Use Case Latency Complexity Browser Support
WebSockets Chat, gaming, collaborative editing 10-50ms High Excellent
SSE Live feeds, notifications, dashboards 50-100ms Low Excellent
HTTP/3 General web APIs with real-time needs 100-200ms Medium Good
WebRTC Video/audio, P2P gaming 5-20ms Very High Good

Decision Matrix

class RealTimeTechnologySelector {
  selectTechnology(requirements: RealTimeRequirements): TechnologyChoice {
    const { bidirectional, latency, complexity, browserSupport } = requirements;
    
    if (bidirectional && latency < 50) {
      return { technology: 'websockets', reasoning: 'Low latency bidirectional communication' };
    }
    
    if (!bidirectional && latency < 100) {
      return { technology: 'sse', reasoning: 'Simple unidirectional streaming' };
    }
    
    if (latency < 20 && complexity === 'high') {
      return { technology: 'webrtc', reasoning: 'Ultra-low latency P2P communication' };
    }
    
    return { technology: 'http3', reasoning: 'General purpose with good real-time characteristics' };
  }
}

WebSocket Implementation

Basic WebSocket Server

import { WebSocketServer, WebSocket } from 'ws';
import { createServer } from 'http';

class WebSocketManager {
  private wss: WebSocketServer;
  private connections: Map<string, WebSocket> = new Map();
  private rooms: Map<string, Set<string>> = new Map();
  
  constructor(port: number) {
    const server = createServer();
    this.wss = new WebSocketServer({ server });
    
    this.wss.on('connection', (ws: WebSocket, request) => {
      this.handleConnection(ws, request);
    });
    
    server.listen(port);
  }
  
  private handleConnection(ws: WebSocket, request: any): void {
    const connectionId = this.generateConnectionId();
    this.connections.set(connectionId, ws);
    
    // Authentication
    const token = this.extractToken(request);
    if (!this.authenticateToken(token)) {
      ws.close(1008, 'Authentication failed');
      return;
    }
    
    // Setup event handlers
    ws.on('message', (data: Buffer) => {
      this.handleMessage(connectionId, data);
    });
    
    ws.on('close', () => {
      this.handleDisconnection(connectionId);
    });
    
    ws.on('error', (error) => {
      this.handleError(connectionId, error);
    });
    
    // Send welcome message
    this.sendMessage(connectionId, {
      type: 'connection_established',
      connectionId,
      timestamp: Date.now()
    });
  }
  
  private async handleMessage(connectionId: string, data: Buffer): Promise<void> {
    try {
      const message = JSON.parse(data.toString());
      
      // Validate message structure
      if (!this.validateMessage(message)) {
        this.sendError(connectionId, 'Invalid message format');
        return;
      }
      
      // Route message based on type
      switch (message.type) {
        case 'join_room':
          await this.handleJoinRoom(connectionId, message.roomId);
          break;
        case 'leave_room':
          await this.handleLeaveRoom(connectionId, message.roomId);
          break;
        case 'broadcast':
          await this.handleBroadcast(connectionId, message);
          break;
        case 'private_message':
          await this.handlePrivateMessage(connectionId, message);
          break;
        default:
          this.sendError(connectionId, 'Unknown message type');
      }
    } catch (error) {
      this.sendError(connectionId, 'Failed to parse message');
    }
  }
  
  private async handleJoinRoom(connectionId: string, roomId: string): Promise<void> {
    // Check permissions
    if (!await this.canJoinRoom(connectionId, roomId)) {
      this.sendError(connectionId, 'Insufficient permissions to join room');
      return;
    }
    
    // Add to room
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }
    this.rooms.get(roomId)!.add(connectionId);
    
    // Notify room members
    this.broadcastToRoom(roomId, {
      type: 'user_joined',
      connectionId,
      roomId,
      timestamp: Date.now()
    }, connectionId);
    
    // Send room state
    this.sendMessage(connectionId, {
      type: 'room_joined',
      roomId,
      memberCount: this.rooms.get(roomId)!.size,
      timestamp: Date.now()
    });
  }
  
  private async handleBroadcast(connectionId: string, message: any): Promise<void> {
    const roomId = message.roomId;
    
    if (!this.rooms.has(roomId) || !this.rooms.get(roomId)!.has(connectionId)) {
      this.sendError(connectionId, 'Not a member of this room');
      return;
    }
    
    // Rate limiting
    if (!await this.checkRateLimit(connectionId)) {
      this.sendError(connectionId, 'Rate limit exceeded');
      return;
    }
    
    // Broadcast to room
    this.broadcastToRoom(roomId, {
      type: 'message',
      from: connectionId,
      content: message.content,
      timestamp: Date.now()
    });
  }
  
  private broadcastToRoom(roomId: string, message: any, excludeConnectionId?: string): void {
    const room = this.rooms.get(roomId);
    if (!room) return;
    
    const messageStr = JSON.stringify(message);
    
    for (const connectionId of room) {
      if (connectionId !== excludeConnectionId) {
        const ws = this.connections.get(connectionId);
        if (ws && ws.readyState === WebSocket.OPEN) {
          ws.send(messageStr);
        }
      }
    }
  }
  
  private sendMessage(connectionId: string, message: any): void {
    const ws = this.connections.get(connectionId);
    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify(message));
    }
  }
  
  private sendError(connectionId: string, error: string): void {
    this.sendMessage(connectionId, {
      type: 'error',
      error,
      timestamp: Date.now()
    });
  }
}

WebSocket Client Implementation

class WebSocketClient {
  private ws: WebSocket | null = null;
  private connectionId: string | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 1000;
  
  constructor(private url: string) {}
  
  async connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      try {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
          console.log('WebSocket connected');
          this.reconnectAttempts = 0;
          resolve();
        };
        
        this.ws.onmessage = (event) => {
          this.handleMessage(event.data);
        };
        
        this.ws.onclose = (event) => {
          console.log('WebSocket closed:', event.code, event.reason);
          this.handleDisconnection();
        };
        
        this.ws.onerror = (error) => {
          console.error('WebSocket error:', error);
          reject(error);
        };
        
      } catch (error) {
        reject(error);
      }
    });
  }
  
  private handleMessage(data: string): void {
    try {
      const message = JSON.parse(data);
      
      switch (message.type) {
        case 'connection_established':
          this.connectionId = message.connectionId;
          this.emit('connected', message);
          break;
        case 'message':
          this.emit('message', message);
          break;
        case 'error':
          this.emit('error', message.error);
          break;
        case 'user_joined':
          this.emit('userJoined', message);
          break;
        case 'user_left':
          this.emit('userLeft', message);
          break;
        default:
          this.emit('unknownMessage', message);
      }
    } catch (error) {
      console.error('Failed to parse message:', error);
    }
  }
  
  private handleDisconnection(): void {
    this.emit('disconnected');
    
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      setTimeout(() => {
        this.reconnectAttempts++;
        this.connect().catch(console.error);
      }, this.reconnectDelay * Math.pow(2, this.reconnectAttempts));
    }
  }
  
  joinRoom(roomId: string): void {
    this.send({
      type: 'join_room',
      roomId
    });
  }
  
  leaveRoom(roomId: string): void {
    this.send({
      type: 'leave_room',
      roomId
    });
  }
  
  sendMessage(roomId: string, content: string): void {
    this.send({
      type: 'broadcast',
      roomId,
      content
    });
  }
  
  private send(message: any): void {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      console.warn('WebSocket not connected');
    }
  }
  
  private emit(event: string, data?: any): void {
    // Implement event emitter logic
    console.log(`Event: ${event}`, data);
  }
  
  disconnect(): void {
    if (this.ws) {
      this.ws.close();
      this.ws = null;
    }
  }
}

Server-Sent Events (SSE) Implementation

SSE Server

import { createServer } from 'http';
import { EventEmitter } from 'events';

class SSEServer extends EventEmitter {
  private clients: Map<string, SSEConnection> = new Map();
  private subscriptions: Map<string, Set<string>> = new Map();
  
  constructor(port: number) {
    super();
    this.setupServer(port);
  }
  
  private setupServer(port: number): void {
    const server = createServer((req, res) => {
      this.handleRequest(req, res);
    });
    
    server.listen(port);
  }
  
  private handleRequest(req: any, res: any): void {
    const url = new URL(req.url, `http://${req.headers.host}`);
    
    if (url.pathname === '/events') {
      this.handleSSERequest(req, res);
    } else {
      res.writeHead(404);
      res.end('Not found');
    }
  }
  
  private handleSSERequest(req: any, res: any): void {
    // Set SSE headers
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'Access-Control-Allow-Origin': '*',
      'Access-Control-Allow-Headers': 'Cache-Control'
    });
    
    const clientId = this.generateClientId();
    const connection = new SSEConnection(clientId, res);
    
    this.clients.set(clientId, connection);
    
    // Send initial connection event
    this.sendEvent(clientId, {
      type: 'connected',
      clientId,
      timestamp: Date.now()
    });
    
    // Handle client disconnect
    req.on('close', () => {
      this.handleClientDisconnect(clientId);
    });
    
    // Handle client errors
    req.on('error', (error: Error) => {
      console.error('SSE client error:', error);
      this.handleClientDisconnect(clientId);
    });
  }
  
  subscribe(clientId: string, topic: string): void {
    if (!this.subscriptions.has(topic)) {
      this.subscriptions.set(topic, new Set());
    }
    this.subscriptions.get(topic)!.add(clientId);
    
    this.sendEvent(clientId, {
      type: 'subscribed',
      topic,
      timestamp: Date.now()
    });
  }
  
  unsubscribe(clientId: string, topic: string): void {
    const subscribers = this.subscriptions.get(topic);
    if (subscribers) {
      subscribers.delete(clientId);
      
      this.sendEvent(clientId, {
        type: 'unsubscribed',
        topic,
        timestamp: Date.now()
      });
    }
  }
  
  publish(topic: string, data: any): void {
    const subscribers = this.subscriptions.get(topic);
    if (!subscribers) return;
    
    const event = {
      type: 'data',
      topic,
      data,
      timestamp: Date.now()
    };
    
    for (const clientId of subscribers) {
      this.sendEvent(clientId, event);
    }
  }
  
  private sendEvent(clientId: string, event: any): void {
    const connection = this.clients.get(clientId);
    if (!connection) return;
    
    try {
      connection.send(event);
    } catch (error) {
      console.error('Failed to send SSE event:', error);
      this.handleClientDisconnect(clientId);
    }
  }
  
  private handleClientDisconnect(clientId: string): void {
    this.clients.delete(clientId);
    
    // Remove from all subscriptions
    for (const [topic, subscribers] of this.subscriptions) {
      subscribers.delete(clientId);
    }
    
    console.log(`SSE client ${clientId} disconnected`);
  }
}

class SSEConnection {
  constructor(
    private clientId: string,
    private response: any
  ) {}
  
  send(event: any): void {
    const eventData = `data: ${JSON.stringify(event)}\n\n`;
    this.response.write(eventData);
  }
  
  close(): void {
    this.response.end();
  }
}

SSE Client Implementation

class SSEClient {
  private eventSource: EventSource | null = null;
  private subscriptions: Set<string> = new Set();
  
  constructor(private url: string) {}
  
  connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      try {
        this.eventSource = new EventSource(this.url);
        
        this.eventSource.onopen = () => {
          console.log('SSE connected');
          resolve();
        };
        
        this.eventSource.onmessage = (event) => {
          this.handleMessage(event);
        };
        
        this.eventSource.onerror = (error) => {
          console.error('SSE error:', error);
          reject(error);
        };
        
      } catch (error) {
        reject(error);
      }
    });
  }
  
  private handleMessage(event: MessageEvent): void {
    try {
      const data = JSON.parse(event.data);
      
      switch (data.type) {
        case 'connected':
          this.emit('connected', data);
          break;
        case 'data':
          this.emit('data', data);
          break;
        case 'subscribed':
          this.emit('subscribed', data);
          break;
        case 'unsubscribed':
          this.emit('unsubscribed', data);
          break;
        default:
          this.emit('unknownMessage', data);
      }
    } catch (error) {
      console.error('Failed to parse SSE message:', error);
    }
  }
  
  subscribe(topic: string): void {
    if (this.eventSource && this.eventSource.readyState === EventSource.OPEN) {
      // Send subscription request via HTTP
      fetch(`${this.url}/subscribe`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ topic })
      });
      
      this.subscriptions.add(topic);
    }
  }
  
  unsubscribe(topic: string): void {
    if (this.eventSource && this.eventSource.readyState === EventSource.OPEN) {
      // Send unsubscription request via HTTP
      fetch(`${this.url}/unsubscribe`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ topic })
      });
      
      this.subscriptions.delete(topic);
    }
  }
  
  private emit(event: string, data?: any): void {
    // Implement event emitter logic
    console.log(`SSE Event: ${event}`, data);
  }
  
  disconnect(): void {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }
}

Scaling Real-Time APIs

Horizontal Scaling with Redis

import Redis from 'ioredis';

class ScalableWebSocketManager {
  private redis: Redis;
  private localConnections: Map<string, WebSocket> = new Map();
  private serverId: string;
  
  constructor() {
    this.redis = new Redis(process.env.REDIS_URL);
    this.serverId = this.generateServerId();
    
    this.setupRedisSubscriptions();
  }
  
  private setupRedisSubscriptions(): void {
    // Subscribe to cross-server messages
    this.redis.subscribe('websocket:broadcast');
    this.redis.subscribe('websocket:private');
    
    this.redis.on('message', (channel, message) => {
      this.handleRedisMessage(channel, message);
    });
  }
  
  private async handleRedisMessage(channel: string, message: string): Promise<void> {
    try {
      const data = JSON.parse(message);
      
      switch (channel) {
        case 'websocket:broadcast':
          await this.handleBroadcastMessage(data);
          break;
        case 'websocket:private':
          await this.handlePrivateMessage(data);
          break;
      }
    } catch (error) {
      console.error('Failed to handle Redis message:', error);
    }
  }
  
  private async handleBroadcastMessage(data: any): Promise<void> {
    const { roomId, message, excludeServerId } = data;
    
    // Only process if not from this server
    if (excludeServerId === this.serverId) return;
    
    // Broadcast to local connections in the room
    const roomConnections = await this.getRoomConnections(roomId);
    
    for (const connectionId of roomConnections) {
      const ws = this.localConnections.get(connectionId);
      if (ws && ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify(message));
      }
    }
  }
  
  private async handlePrivateMessage(data: any): Promise<void> {
    const { connectionId, message } = data;
    
    // Check if connection is local
    const ws = this.localConnections.get(connectionId);
    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify(message));
    }
  }
  
  async broadcastToRoom(roomId: string, message: any, excludeConnectionId?: string): Promise<void> {
    // Get all connections in the room across all servers
    const allConnections = await this.getAllRoomConnections(roomId);
    
    // Separate local and remote connections
    const localConnections = allConnections.filter(id => 
      this.localConnections.has(id) && id !== excludeConnectionId
    );
    const remoteConnections = allConnections.filter(id => 
      !this.localConnections.has(id)
    );
    
    // Send to local connections
    for (const connectionId of localConnections) {
      const ws = this.localConnections.get(connectionId);
      if (ws && ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify(message));
      }
    }
    
    // Publish to Redis for remote servers
    if (remoteConnections.length > 0) {
      await this.redis.publish('websocket:broadcast', JSON.stringify({
        roomId,
        message,
        excludeServerId: this.serverId
      }));
    }
  }
  
  async sendPrivateMessage(connectionId: string, message: any): Promise<void> {
    // Check if connection is local
    if (this.localConnections.has(connectionId)) {
      const ws = this.localConnections.get(connectionId);
      if (ws && ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify(message));
      }
    } else {
      // Publish to Redis for remote server
      await this.redis.publish('websocket:private', JSON.stringify({
        connectionId,
        message
      }));
    }
  }
  
  private async getRoomConnections(roomId: string): Promise<string[]> {
    const key = `room:${roomId}:connections`;
    return await this.redis.smembers(key);
  }
  
  private async getAllRoomConnections(roomId: string): Promise<string[]> {
    const key = `room:${roomId}:connections`;
    return await this.redis.smembers(key);
  }
  
  async addConnectionToRoom(connectionId: string, roomId: string): Promise<void> {
    const key = `room:${roomId}:connections`;
    await this.redis.sadd(key, connectionId);
    
    // Set expiration for cleanup
    await this.redis.expire(key, 3600); // 1 hour
  }
  
  async removeConnectionFromRoom(connectionId: string, roomId: string): Promise<void> {
    const key = `room:${roomId}:connections`;
    await this.redis.srem(key, connectionId);
  }
}

Load Balancing Configuration

# Nginx configuration for WebSocket load balancing
upstream websocket_backend {
    least_conn;
    server ws1.example.com:8080;
    server ws2.example.com:8080;
    server ws3.example.com:8080;
}

server {
    listen 80;
    server_name api.example.com;
    
    location /ws {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # WebSocket specific settings
        proxy_read_timeout 86400;
        proxy_send_timeout 86400;
        proxy_connect_timeout 60;
    }
    
    location /events {
        proxy_pass http://websocket_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # SSE specific settings
        proxy_buffering off;
        proxy_cache off;
        proxy_read_timeout 86400;
    }
}

Security Implementation

Authentication and Authorization

class RealTimeSecurityManager {
  private jwtSecret: string;
  private rateLimiters: Map<string, RateLimiter> = new Map();
  
  constructor(jwtSecret: string) {
    this.jwtSecret = jwtSecret;
  }
  
  async authenticateConnection(token: string): Promise<AuthResult> {
    try {
      // Verify JWT token
      const payload = jwt.verify(token, this.jwtSecret) as any;
      
      // Check token expiration
      if (payload.exp < Date.now() / 1000) {
        return { success: false, error: 'Token expired' };
      }
      
      // Check user permissions
      const permissions = await this.getUserPermissions(payload.userId);
      
      return {
        success: true,
        userId: payload.userId,
        permissions,
        metadata: payload
      };
      
    } catch (error) {
      return { success: false, error: 'Invalid token' };
    }
  }
  
  async authorizeRoomAccess(userId: string, roomId: string): Promise<boolean> {
    // Check if user has permission to join the room
    const userPermissions = await this.getUserPermissions(userId);
    const roomPermissions = await this.getRoomPermissions(roomId);
    
    return this.checkPermissionIntersection(userPermissions, roomPermissions);
  }
  
  async checkRateLimit(connectionId: string, action: string): Promise<boolean> {
    const key = `${connectionId}:${action}`;
    
    if (!this.rateLimiters.has(key)) {
      this.rateLimiters.set(key, new RateLimiter({
        windowMs: 60000, // 1 minute
        max: 100 // 100 requests per minute
      }));
    }
    
    const limiter = this.rateLimiters.get(key)!;
    return limiter.tryConsume();
  }
  
  private async getUserPermissions(userId: string): Promise<string[]> {
    // Implement user permission lookup
    return ['read', 'write']; // Example permissions
  }
  
  private async getRoomPermissions(roomId: string): Promise<string[]> {
    // Implement room permission lookup
    return ['read', 'write']; // Example permissions
  }
  
  private checkPermissionIntersection(userPerms: string[], roomPerms: string[]): boolean {
    return userPerms.some(perm => roomPerms.includes(perm));
  }
}

class RateLimiter {
  private requests: number[] = [];
  
  constructor(private config: { windowMs: number; max: number }) {}
  
  tryConsume(): boolean {
    const now = Date.now();
    const windowStart = now - this.config.windowMs;
    
    // Remove old requests
    this.requests = this.requests.filter(time => time > windowStart);
    
    // Check if under limit
    if (this.requests.length < this.config.max) {
      this.requests.push(now);
      return true;
    }
    
    return false;
  }
}

Input Validation and Sanitization

class MessageValidator {
  private maxMessageLength = 1000;
  private allowedMessageTypes = ['text', 'image', 'file', 'system'];
  
  validateMessage(message: any): ValidationResult {
    const errors: string[] = [];
    
    // Check message structure
    if (!message.type || !this.allowedMessageTypes.includes(message.type)) {
      errors.push('Invalid message type');
    }
    
    if (!message.content || typeof message.content !== 'string') {
      errors.push('Message content is required and must be a string');
    }
    
    if (message.content.length > this.maxMessageLength) {
      errors.push(`Message too long (max ${this.maxMessageLength} characters)`);
    }
    
    // Check for malicious content
    if (this.containsMaliciousContent(message.content)) {
      errors.push('Message contains potentially malicious content');
    }
    
    // Check for PII
    if (this.containsPII(message.content)) {
      errors.push('Message contains personally identifiable information');
    }
    
    return {
      valid: errors.length === 0,
      errors,
      sanitizedMessage: errors.length === 0 ? this.sanitizeMessage(message) : null
    };
  }
  
  private containsMaliciousContent(content: string): boolean {
    // Check for common attack patterns
    const maliciousPatterns = [
      /<script[^>]*>.*?<\/script>/gi,
      /javascript:/gi,
      /on\w+\s*=/gi,
      /<iframe[^>]*>/gi
    ];
    
    return maliciousPatterns.some(pattern => pattern.test(content));
  }
  
  private containsPII(content: string): boolean {
    // Check for common PII patterns
    const piiPatterns = [
      /\b\d{3}-\d{2}-\d{4}\b/, // SSN
      /\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b/, // Credit card
      /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/ // Email
    ];
    
    return piiPatterns.some(pattern => pattern.test(content));
  }
  
  private sanitizeMessage(message: any): any {
    return {
      ...message,
      content: this.sanitizeContent(message.content)
    };
  }
  
  private sanitizeContent(content: string): string {
    // Remove HTML tags
    let sanitized = content.replace(/<[^>]*>/g, '');
    
    // Escape special characters
    sanitized = sanitized
      .replace(/&/g, '&amp;')
      .replace(/</g, '&lt;')
      .replace(/>/g, '&gt;')
      .replace(/"/g, '&quot;')
      .replace(/'/g, '&#x27;');
    
    return sanitized;
  }
}

Observability and Monitoring

Metrics Collection

class RealTimeMetricsCollector {
  private metrics: Map<string, Metric> = new Map();
  private alertManager: AlertManager;
  
  constructor() {
    this.alertManager = new AlertManager();
    this.setupMetrics();
  }
  
  private setupMetrics(): void {
    // Connection metrics
    this.metrics.set('connections.active', new GaugeMetric());
    this.metrics.set('connections.total', new CounterMetric());
    this.metrics.set('connections.failed', new CounterMetric());
    
    // Message metrics
    this.metrics.set('messages.sent', new CounterMetric());
    this.metrics.set('messages.received', new CounterMetric());
    this.metrics.set('messages.failed', new CounterMetric());
    
    // Latency metrics
    this.metrics.set('latency.p50', new HistogramMetric());
    this.metrics.set('latency.p95', new HistogramMetric());
    this.metrics.set('latency.p99', new HistogramMetric());
    
    // Error metrics
    this.metrics.set('errors.rate', new RateMetric());
    this.metrics.set('errors.by_type', new CounterMetric());
  }
  
  recordConnection(connectionId: string, success: boolean): void {
    this.metrics.get('connections.total')?.increment();
    
    if (success) {
      this.metrics.get('connections.active')?.increment();
    } else {
      this.metrics.get('connections.failed')?.increment();
    }
  }
  
  recordMessage(messageType: 'sent' | 'received' | 'failed', latency?: number): void {
    this.metrics.get(`messages.${messageType}`)?.increment();
    
    if (latency !== undefined) {
      this.metrics.get('latency.p50')?.observe(latency);
      this.metrics.get('latency.p95')?.observe(latency);
      this.metrics.get('latency.p99')?.observe(latency);
    }
  }
  
  recordError(errorType: string): void {
    this.metrics.get('errors.by_type')?.increment({ type: errorType });
    this.metrics.get('errors.rate')?.increment();
  }
  
  async checkAlerts(): Promise<void> {
    const activeConnections = this.metrics.get('connections.active')?.getValue() || 0;
    const errorRate = this.metrics.get('errors.rate')?.getRate() || 0;
    const avgLatency = this.metrics.get('latency.p95')?.getAverage() || 0;
    
    // Check connection count alert
    if (activeConnections > 10000) {
      await this.alertManager.sendAlert({
        type: 'high_connection_count',
        severity: 'warning',
        message: `High connection count: ${activeConnections}`,
        timestamp: Date.now()
      });
    }
    
    // Check error rate alert
    if (errorRate > 0.05) { // 5%
      await this.alertManager.sendAlert({
        type: 'high_error_rate',
        severity: 'critical',
        message: `High error rate: ${(errorRate * 100).toFixed(2)}%`,
        timestamp: Date.now()
      });
    }
    
    // Check latency alert
    if (avgLatency > 1000) { // 1 second
      await this.alertManager.sendAlert({
        type: 'high_latency',
        severity: 'warning',
        message: `High latency: ${avgLatency}ms`,
        timestamp: Date.now()
      });
    }
  }
}

class AlertManager {
  async sendAlert(alert: Alert): Promise<void> {
    // Send to monitoring system
    console.log('Alert:', alert);
    
    // Send to Slack/email/etc.
    await this.sendToSlack(alert);
    await this.sendToEmail(alert);
  }
  
  private async sendToSlack(alert: Alert): Promise<void> {
    // Implement Slack notification
  }
  
  private async sendToEmail(alert: Alert): Promise<void> {
    // Implement email notification
  }
}

Distributed Tracing

class RealTimeTracer {
  private tracer: Tracer;
  
  constructor() {
    this.tracer = new Tracer({
      serviceName: 'realtime-api',
      sampler: new ProbabilisticSampler(0.1) // 10% sampling
    });
  }
  
  startConnectionTrace(connectionId: string): Span {
    const span = this.tracer.startSpan('websocket_connection', {
      tags: {
        'connection.id': connectionId,
        'span.kind': 'server'
      }
    });
    
    return span;
  }
  
  startMessageTrace(connectionId: string, messageType: string): Span {
    const span = this.tracer.startSpan('message_processing', {
      tags: {
        'connection.id': connectionId,
        'message.type': messageType,
        'span.kind': 'server'
      }
    });
    
    return span;
  }
  
  startRoomTrace(roomId: string, action: string): Span {
    const span = this.tracer.startSpan('room_operation', {
      tags: {
        'room.id': roomId,
        'room.action': action,
        'span.kind': 'server'
      }
    });
    
    return span;
  }
  
  addSpanTags(span: Span, tags: Record<string, any>): void {
    Object.entries(tags).forEach(([key, value]) => {
      span.setTag(key, value);
    });
  }
  
  finishSpan(span: Span, success: boolean = true): void {
    span.setTag('success', success);
    span.finish();
  }
}

Performance Optimization

Connection Pooling and Management

class ConnectionPoolManager {
  private pools: Map<string, ConnectionPool> = new Map();
  private maxConnectionsPerPool = 1000;
  
  constructor() {
    this.setupDefaultPools();
  }
  
  private setupDefaultPools(): void {
    // WebSocket pool
    this.pools.set('websocket', new ConnectionPool({
      maxConnections: this.maxConnectionsPerPool,
      idleTimeout: 300000, // 5 minutes
      connectionTimeout: 30000 // 30 seconds
    }));
    
    // SSE pool
    this.pools.set('sse', new ConnectionPool({
      maxConnections: this.maxConnectionsPerPool,
      idleTimeout: 600000, // 10 minutes
      connectionTimeout: 30000
    }));
  }
  
  async getConnection(type: string, connectionId: string): Promise<Connection> {
    const pool = this.pools.get(type);
    if (!pool) {
      throw new Error(`Unknown connection type: ${type}`);
    }
    
    return await pool.acquire(connectionId);
  }
  
  async releaseConnection(type: string, connectionId: string): Promise<void> {
    const pool = this.pools.get(type);
    if (pool) {
      await pool.release(connectionId);
    }
  }
  
  async getPoolStats(): Promise<PoolStats> {
    const stats: PoolStats = {};
    
    for (const [type, pool] of this.pools) {
      stats[type] = {
        active: pool.getActiveCount(),
        idle: pool.getIdleCount(),
        total: pool.getTotalCount(),
        max: pool.getMaxCount()
      };
    }
    
    return stats;
  }
}

class ConnectionPool {
  private active: Map<string, Connection> = new Map();
  private idle: Connection[] = [];
  private maxConnections: number;
  private idleTimeout: number;
  private connectionTimeout: number;
  
  constructor(config: PoolConfig) {
    this.maxConnections = config.maxConnections;
    this.idleTimeout = config.idleTimeout;
    this.connectionTimeout = config.connectionTimeout;
  }
  
  async acquire(connectionId: string): Promise<Connection> {
    // Try to reuse idle connection
    if (this.idle.length > 0) {
      const connection = this.idle.pop()!;
      this.active.set(connectionId, connection);
      return connection;
    }
    
    // Create new connection if under limit
    if (this.active.size < this.maxConnections) {
      const connection = await this.createConnection();
      this.active.set(connectionId, connection);
      return connection;
    }
    
    // Wait for available connection
    return await this.waitForConnection(connectionId);
  }
  
  async release(connectionId: string): Promise<void> {
    const connection = this.active.get(connectionId);
    if (!connection) return;
    
    this.active.delete(connectionId);
    
    // Add to idle pool
    this.idle.push(connection);
    
    // Set idle timeout
    setTimeout(() => {
      this.cleanupIdleConnection(connection);
    }, this.idleTimeout);
  }
  
  private async createConnection(): Promise<Connection> {
    // Implement connection creation logic
    return new Connection();
  }
  
  private async waitForConnection(connectionId: string): Promise<Connection> {
    return new Promise((resolve, reject) => {
      const timeout = setTimeout(() => {
        reject(new Error('Connection timeout'));
      }, this.connectionTimeout);
      
      // Check for available connection periodically
      const checkInterval = setInterval(() => {
        if (this.idle.length > 0) {
          clearTimeout(timeout);
          clearInterval(checkInterval);
          
          const connection = this.idle.pop()!;
          this.active.set(connectionId, connection);
          resolve(connection);
        }
      }, 100);
    });
  }
  
  private cleanupIdleConnection(connection: Connection): void {
    const index = this.idle.indexOf(connection);
    if (index > -1) {
      this.idle.splice(index, 1);
      connection.close();
    }
  }
  
  getActiveCount(): number {
    return this.active.size;
  }
  
  getIdleCount(): number {
    return this.idle.length;
  }
  
  getTotalCount(): number {
    return this.active.size + this.idle.length;
  }
  
  getMaxCount(): number {
    return this.maxConnections;
  }
}

Message Batching and Compression

class MessageBatcher {
  private batches: Map<string, MessageBatch> = new Map();
  private batchSize = 100;
  private batchTimeout = 1000; // 1 second
  
  constructor() {
    this.startBatchProcessor();
  }
  
  addMessage(roomId: string, message: any): void {
    if (!this.batches.has(roomId)) {
      this.batches.set(roomId, new MessageBatch(roomId));
    }
    
    const batch = this.batches.get(roomId)!;
    batch.addMessage(message);
    
    // Send batch if full
    if (batch.getMessageCount() >= this.batchSize) {
      this.sendBatch(roomId);
    }
  }
  
  private startBatchProcessor(): void {
    setInterval(() => {
      this.processBatches();
    }, this.batchTimeout);
  }
  
  private processBatches(): void {
    for (const [roomId, batch] of this.batches) {
      if (batch.getMessageCount() > 0) {
        this.sendBatch(roomId);
      }
    }
  }
  
  private async sendBatch(roomId: string): Promise<void> {
    const batch = this.batches.get(roomId);
    if (!batch) return;
    
    const messages = batch.getMessages();
    if (messages.length === 0) return;
    
    // Compress batch
    const compressedBatch = await this.compressBatch(messages);
    
    // Send to room
    await this.broadcastToRoom(roomId, {
      type: 'batch',
      messages: compressedBatch,
      count: messages.length,
      timestamp: Date.now()
    });
    
    // Clear batch
    batch.clear();
  }
  
  private async compressBatch(messages: any[]): Promise<any> {
    // Implement compression logic
    const batchData = {
      messages: messages.map(msg => ({
        id: msg.id,
        type: msg.type,
        content: msg.content,
        timestamp: msg.timestamp
      }))
    };
    
    // Compress using gzip
    const compressed = await this.gzip(JSON.stringify(batchData));
    
    return {
      compressed: true,
      data: compressed.toString('base64'),
      originalSize: JSON.stringify(batchData).length,
      compressedSize: compressed.length
    };
  }
  
  private async gzip(data: string): Promise<Buffer> {
    const zlib = require('zlib');
    return new Promise((resolve, reject) => {
      zlib.gzip(data, (err: Error | null, result: Buffer) => {
        if (err) reject(err);
        else resolve(result);
      });
    });
  }
  
  private async broadcastToRoom(roomId: string, message: any): Promise<void> {
    // Implement room broadcasting logic
    console.log(`Broadcasting to room ${roomId}:`, message);
  }
}

class MessageBatch {
  private messages: any[] = [];
  
  constructor(private roomId: string) {}
  
  addMessage(message: any): void {
    this.messages.push(message);
  }
  
  getMessages(): any[] {
    return [...this.messages];
  }
  
  getMessageCount(): number {
    return this.messages.length;
  }
  
  clear(): void {
    this.messages = [];
  }
}

Testing Strategies

Integration Testing

class RealTimeAPITester {
  private testClients: WebSocketClient[] = [];
  private testResults: TestResult[] = [];
  
  async runIntegrationTests(): Promise<TestSuiteResult> {
    const tests = [
      this.testConnectionEstablishment,
      this.testMessageDelivery,
      this.testRoomManagement,
      this.testErrorHandling,
      this.testRateLimiting,
      this.testConcurrentConnections
    ];
    
    for (const test of tests) {
      try {
        const result = await test.call(this);
        this.testResults.push(result);
      } catch (error) {
        this.testResults.push({
          name: test.name,
          passed: false,
          error: error.message,
          duration: 0
        });
      }
    }
    
    return this.generateTestSuiteResult();
  }
  
  private async testConnectionEstablishment(): Promise<TestResult> {
    const startTime = Date.now();
    
    try {
      const client = new WebSocketClient('ws://localhost:8080');
      await client.connect();
      
      const duration = Date.now() - startTime;
      
      return {
        name: 'testConnectionEstablishment',
        passed: true,
        duration,
        metrics: {
          connectionTime: duration,
          success: true
        }
      };
    } catch (error) {
      return {
        name: 'testConnectionEstablishment',
        passed: false,
        error: error.message,
        duration: Date.now() - startTime
      };
    }
  }
  
  private async testMessageDelivery(): Promise<TestResult> {
    const startTime = Date.now();
    
    try {
      const client = new WebSocketClient('ws://localhost:8080');
      await client.connect();
      
      // Join test room
      client.joinRoom('test-room');
      
      // Send test message
      const messagePromise = new Promise((resolve) => {
        client.on('message', (data) => {
          if (data.type === 'message') {
            resolve(data);
          }
        });
      });
      
      client.sendMessage('test-room', 'Hello, World!');
      
      const receivedMessage = await messagePromise;
      const duration = Date.now() - startTime;
      
      return {
        name: 'testMessageDelivery',
        passed: receivedMessage.content === 'Hello, World!',
        duration,
        metrics: {
          messageLatency: duration,
          messageReceived: true
        }
      };
    } catch (error) {
      return {
        name: 'testMessageDelivery',
        passed: false,
        error: error.message,
        duration: Date.now() - startTime
      };
    }
  }
  
  private async testConcurrentConnections(): Promise<TestResult> {
    const startTime = Date.now();
    const connectionCount = 100;
    const connections: WebSocketClient[] = [];
    
    try {
      // Create multiple connections
      for (let i = 0; i < connectionCount; i++) {
        const client = new WebSocketClient('ws://localhost:8080');
        await client.connect();
        connections.push(client);
      }
      
      // Test message broadcasting
      const messagePromises = connections.map(client => 
        new Promise((resolve) => {
          client.on('message', (data) => {
            if (data.type === 'message') {
              resolve(data);
            }
          });
        })
      );
      
      // Send message from first client
      connections[0].sendMessage('test-room', 'Broadcast test');
      
      // Wait for all messages
      await Promise.all(messagePromises);
      
      const duration = Date.now() - startTime;
      
      // Cleanup
      connections.forEach(client => client.disconnect());
      
      return {
        name: 'testConcurrentConnections',
        passed: true,
        duration,
        metrics: {
          connectionCount,
          averageLatency: duration / connectionCount,
          successRate: 1.0
        }
      };
    } catch (error) {
      // Cleanup on error
      connections.forEach(client => client.disconnect());
      
      return {
        name: 'testConcurrentConnections',
        passed: false,
        error: error.message,
        duration: Date.now() - startTime
      };
    }
  }
  
  private generateTestSuiteResult(): TestSuiteResult {
    const passed = this.testResults.filter(r => r.passed).length;
    const total = this.testResults.length;
    const averageDuration = this.testResults.reduce((sum, r) => sum + r.duration, 0) / total;
    
    return {
      totalTests: total,
      passedTests: passed,
      failedTests: total - passed,
      successRate: passed / total,
      averageDuration,
      results: this.testResults
    };
  }
}

Deployment and Operations

Docker Configuration

# Dockerfile for real-time API
FROM node:18-alpine

WORKDIR /app

# Copy package files
COPY package*.json ./

# Install dependencies
RUN npm ci --only=production

# Copy application code
COPY . .

# Create non-root user
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nextjs -u 1001

# Change ownership
RUN chown -R nextjs:nodejs /app
USER nextjs

# Expose port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8080/health || exit 1

# Start application
CMD ["npm", "start"]

Kubernetes Deployment

# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: realtime-api
  labels:
    app: realtime-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: realtime-api
  template:
    metadata:
      labels:
        app: realtime-api
    spec:
      containers:
      - name: realtime-api
        image: realtime-api:latest
        ports:
        - containerPort: 8080
        env:
        - name: NODE_ENV
          value: "production"
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: redis-secret
              key: url
        - name: JWT_SECRET
          valueFrom:
            secretKeyRef:
              name: jwt-secret
              key: secret
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: realtime-api-service
spec:
  selector:
    app: realtime-api
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080
  type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: realtime-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: realtime-api
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Best Practices Summary

Development Best Practices

  1. Choose the Right Technology: Select WebSockets for bidirectional communication, SSE for server-to-client streaming
  2. Implement Proper Error Handling: Handle connection failures, message parsing errors, and network issues gracefully
  3. Use Connection Pooling: Implement connection pooling to manage resources efficiently
  4. Implement Rate Limiting: Prevent abuse with proper rate limiting and throttling
  5. Add Comprehensive Logging: Log all important events for debugging and monitoring

Security Best Practices

  1. Authenticate Connections: Verify user identity before allowing connections
  2. Authorize Actions: Check permissions for room access and message sending
  3. Validate Input: Sanitize and validate all incoming messages
  4. Implement Rate Limiting: Prevent spam and abuse
  5. Use HTTPS/WSS: Encrypt all communications

Performance Best Practices

  1. Optimize Message Size: Use compression and batching for large messages
  2. Implement Caching: Cache frequently accessed data
  3. Use Connection Pooling: Manage connections efficiently
  4. Monitor Performance: Track latency, throughput, and error rates
  5. Scale Horizontally: Use load balancing and clustering

Operational Best Practices

  1. Monitor Everything: Implement comprehensive monitoring and alerting
  2. Plan for Failures: Implement fallback mechanisms and circuit breakers
  3. Test Thoroughly: Use integration tests and load testing
  4. Document Everything: Maintain detailed documentation and runbooks
  5. Automate Deployment: Use CI/CD pipelines for reliable deployments

Conclusion

Building production-ready real-time APIs requires careful consideration of multiple factors including technology choice, scalability, security, and observability. By following the patterns and best practices outlined in this guide, you can create robust, scalable, and maintainable real-time systems.

The key to success is starting with a solid foundation, implementing comprehensive monitoring from the beginning, and continuously iterating based on real-world performance data. Whether you're building a simple chat application or a complex real-time collaboration platform, the principles and patterns in this guide provide a strong foundation for success.

Remember that real-time systems are inherently complex due to their distributed nature and the need for low latency. Invest in proper testing, monitoring, and operational procedures to ensure your system can handle production workloads reliably.

FAQ

Q: When should I use WebSockets vs Server-Sent Events? A: Use WebSockets for bidirectional communication like chat applications or collaborative editing. Use SSE for unidirectional server-to-client streaming like live feeds, notifications, or dashboards.

Q: How do I handle WebSocket connection failures and reconnection? A: Implement exponential backoff for reconnection attempts, maintain connection state, and use heartbeat/ping-pong mechanisms to detect dead connections early.

Q: What's the best way to scale WebSocket connections across multiple servers? A: Use Redis pub/sub for cross-server communication, implement sticky sessions with load balancers, and use connection pooling to manage resources efficiently.

Q: How do I implement proper authentication for real-time connections? A: Use JWT tokens for authentication, implement proper token validation, and use connection-level authorization for room access and message sending.

Q: What metrics should I monitor for real-time APIs? A: Monitor connection counts, message throughput, latency percentiles, error rates, and resource utilization. Set up alerts for critical thresholds.

Related posts