OCPP WS IOocpp-ws-io
Core WebSocket RPC

Clustering (Redis)

Scale your OCPP server with Redis Streams and Pub/Sub.

Clustering

ocpp-ws-io includes a Redis adapter that allows you to scale your WebSocket server across multiple nodes (e.g., in Kubernetes or behind a load balancer).

Architecture

The adapter uses a hybrid approach for maximum performance and reliability:

  1. Broadcast (Pub/Sub): For messages that must reach all nodes (e.g., broad system events).
  2. Unicast (Streams): For point-to-point routing (e.g., sending a command to a specific Charge Point). This uses Redis Streams so that if a node restarts with the same consumer group, it can resume processing pending messages. Note: messages may be lost if the stream is trimmed (via streamMaxLen) before consumption.
  3. Presence: Automatically tracks which node a Charge Point is connected to.

Installation

npm install ioredis

Configuration

You should provide three Redis connections for optimal performance:

  1. Publisher: Sends commands and events.
  2. Subscriber: Listens for broadcasts.
  3. Blocking: Dedicted connection for XREAD (blocking stream reads).

Usage with ioredis

import { OCPPServer } from "ocpp-ws-io";
import { RedisAdapter } from "ocpp-ws-io/adapters/redis";
import Redis from "ioredis";

const server = new OCPPServer({ protocols: ["ocpp1.6", "ocpp2.0.1"] });

// Create isolated connections
const pub = new Redis(process.env.REDIS_URL);
const sub = new Redis(process.env.REDIS_URL);
const blocking = new Redis(process.env.REDIS_URL);

server.setAdapter(
  new RedisAdapter({
    pubClient: pub,
    subClient: sub,
    blockingClient: blocking, // Recommended for Streams
    prefix: "ocpp-cluster:", // Optional
    streamMaxLen: 1000, // Keep last 1000 messages per node
  }),
);

await server.listen(3000);

Features

Unicast Routing (safeSendToClient)

When you use server.safeSendToClient(), the adapter:

  1. Checks the Presence Registry to find which node holds the connection.
  2. Publishes the message to that specific node's Redis Stream.
  3. The target node consumes the stream and sends the WebSocket message.
// Works from ANY node in the cluster
await server.safeSendToClient("CP001", "ocpp1.6", "Reset", { type: "Hard" });

Reliability & Rehydration

Unlike standard Pub/Sub, Redis Streams persist messages. If a node crashes and restarts with the same ID, it will resume processing its stream, ensuring no pending commands are lost.

Furthermore, if the Redis connection itself experiences a temporary blackout, the RedisAdapter automatically initiates an eager rehydration pipeline upon reconnection. It instantly re-registers all currently connected WebSockets into the global presence registry, preventing out-of-sync routing states without waiting for the next station heartbeat.

Configuration Options

OptionTypeDescription
pubClientRedisUsed for publishing and setting keys.
subClientRedisUsed for Pub/Sub subscriptions.
blockingClientRedis(Optional) Dedicated for XREAD.
prefixstringKey prefix (default: ocpp-ws-io:).
streamMaxLennumberMax stream length (default: 1000).
streamTtlSecondsnumberTTL for ephemeral stream keys (default: 300).
presenceTtlSecondsnumberPresence heartbeat TTL (default: 300).
poolSizenumberConnection pool size (default: 1). See below.
driverFactoryfunctionFactory for additional pool drivers. Required when poolSize > 1.

Advanced Redis Functions

The RedisAdapter now exposes native observability metrics and cluster-optimized data pipelines:

adapter.metrics()

Exposes the internal state of the RedisAdapter, including the Unicast Consumer Lag (unprocessed stream messages) and the count of active subscriptions. This is critical for scaling workers horizontally during traffic surges.

const telemetry = await adapter.metrics();
console.log(telemetry.redisConsumerLag.pendingMessages); // "142"

Pipeline Batching (publishBatch)

When dispatching multi-node Unicast events (like a fleet-wide ClearCache), ocpp-ws-io groups all messages per target node and dispatches them using a single Redis.pipeline(). This collapses N individual Redis round-trips into one pipeline call per node, reducing latency under bulk dispatch.

This is automatically utilized when you call server.broadcastBatch().

Connection Pooling

At 30k+ connections, a single Redis TCP connection becomes a bottleneck due to head-of-line blocking. Connection pooling distributes write operations across multiple connections using round-robin.

import Redis from "ioredis";
import { RedisAdapter } from "ocpp-ws-io/adapters/redis";
import { createDriver } from "ocpp-ws-io/adapters/redis";

const adapter = new RedisAdapter({
  pubClient: new Redis(process.env.REDIS_URL),
  subClient: new Redis(process.env.REDIS_URL),
  poolSize: 4, // 4 write connections
  driverFactory: () =>
    createDriver(
      new Redis(process.env.REDIS_URL),
      new Redis(process.env.REDIS_URL),
    ),
});

How it works:

  • Writes (xadd, publish, set) rotate across the pool via round-robin
  • Pub/Sub subscriptions always use the primary driver (index 0) since they're stateful
  • poolSize: 1 (default) preserves existing single-connection behavior
  • disconnect() closes all pool members gracefully

Redis Cluster Mode

For deployments using Redis Cluster (multiple shards), use the built-in ClusterDriver:

import { ClusterDriver, RedisAdapter } from "ocpp-ws-io";

const clusterDriver = new ClusterDriver({
  nodes: [
    { host: "10.0.0.1", port: 6379 },
    { host: "10.0.0.2", port: 6379 },
    { host: "10.0.0.3", port: 6379 },
  ],
  // Optional: Docker/k8s NAT mapping
  natMap: {
    "172.17.0.2:6379": { host: "localhost", port: 6380 },
  },
});

// Use the cluster driver as a pool factory
const adapter = new RedisAdapter({
  pubClient: {} as any, // Not used when driverFactory is provided
  subClient: {} as any,
  driverFactory: () => clusterDriver,
});

The ClusterDriver requires ioredis as a peer dependency. It automatically handles MOVED/ASK redirections and gracefully falls back to individual GET calls when MGET spans multiple hash slots.

Custom Adapters (EventAdapterInterface)

If you don't use Redis, you can easily implement your own clustering mechanism (e.g., using RabbitMQ, NATS, or Kafka) by using the defineAdapter helper function or by creating a class that implements EventAdapterInterface.

Using defineAdapter is the recommended way to create highly-typed adapters without boilerplate classes:

import { defineAdapter } from "ocpp-ws-io";

const myRabbitMQAdapter = defineAdapter({
  // 1. Broadcast an outbound message/event from this node to others
  publish: async (channel, data) => {
    await rabbitmq.publish(channel, JSON.stringify(data));
  },

  // 2. Point-to-Point Messaging (Mass Delivery)
  publishBatch: async (messages) => {
    // Optional: optimize delivery if your broker supports batching
  },

  // 3. Listen for incoming messages from the broker
  subscribe: async (channel, handler) => {
    rabbitmq.subscribe(channel, (msg) => handler(JSON.parse(msg)));
  },

  // 4. Cleanup
  unsubscribe: async (channel) => {
    rabbitmq.unsubscribe(channel);
  },

  disconnect: async () => {
    await rabbitmq.close();
  },

  // Optional: Advanced Presence Tracking
  setPresence: async (identity, nodeId, ttl) => {
    /* ... */
  },
  getPresence: async (identity) => {
    /* ... */ return null;
  },
  removePresence: async (identity) => {
    /* ... */
  },
});

// Attach it to your server!
await server.setAdapter(myRabbitMQAdapter);

On this page