OCPP WS IOocpp-ws-io
Core WebSocket RPC

Plugins

Extend your OCPP server with built-in plugins for observability, security, event streaming, and protocol transformation.

Plugins

ocpp-ws-io ships 19 built-in plugins organized into a 4-level power hierarchy. All plugins are imported from ocpp-ws-io/plugins.

import { metricsPlugin, heartbeatPlugin, circuitBreakerPlugin } from "ocpp-ws-io/plugins";

server.plugin(
  metricsPlugin(),
  heartbeatPlugin(),
  circuitBreakerPlugin(),
);

Plugin Power Levels

Plugins execute in registration order, but they are designed to be layered by power level. Register them from highest to lowest:

┌─────────────────────────────────────────────────────────┐
│  Level 4 — Middleware           (mutates payloads)       │
│  pii-redactor · schema-versioning                        │
├─────────────────────────────────────────────────────────┤
│  Level 3 — Interceptor          (can drop messages)      │
│  message-dedup · replay-buffer                           │
├─────────────────────────────────────────────────────────┤
│  Level 2 — Lifecycle Controller  (acts on clients)       │
│  connection-guard · anomaly · circuit-breaker            │
├─────────────────────────────────────────────────────────┤
│  Level 1 — Passive Hook         (observes, never blocks) │
│  kafka · amqp · mqtt · redis-pubsub · webhook · metrics  │
│  otel · session-log · heartbeat · rate-limit-notifier    │
└─────────────────────────────────────────────────────────┘
server.plugin(
  // L4: Middleware — transform payloads before anything else sees them
  piiRedactorPlugin({ fields: ["idToken", "email"] }),
  schemaVersioningPlugin({ sourceVersion: "ocpp1.6", targetVersion: "ocpp2.0.1", rules }),

  // L3: Interceptor — deduplicate or replay before handlers process
  messageDedupPlugin({ redis }),
  replayBufferPlugin({ redis }),

  // L2: Lifecycle — guard and monitor connections
  connectionGuardPlugin({ maxConnections: 50_000 }),
  anomalyPlugin({ reconnectThreshold: 10 }),
  circuitBreakerPlugin({ failureThreshold: 5 }),

  // L1: Passive — observe and report
  metricsPlugin(),
  otelPlugin({ serviceName: "my-csms" }),
  heartbeatPlugin(),
  sessionLogPlugin(),
  webhookPlugin({ url: "https://api.example.com/events" }),
  kafkaPlugin({ producer }),
  rateLimitNotifierPlugin({ sink: "https://alerts.example.com" }),
);

Why does order matter? Level 4 plugins transform the payload before Level 3 plugins check for duplicates. Level 2 plugins can disconnect a client before Level 1 plugins log the message. Registering in the wrong order can cause unexpected behavior (e.g., dedup checking a PII-redacted payload won't match the original message ID).


Plugin API

Every plugin implements the OCPPPlugin interface. The system provides 22 lifecycle hooks divided into several categories:

1. Connection & Server Lifecycle

HookSignatureDescription
onInit(server)Server starts listening and plugin is mounted.
onConnection(client)New charging station connects and is fully initialized.
onDisconnect(client, code, reason)Client disconnects normally.
onEviction(evicted, new)Client is forcibly disconnected because another client connected with the same identity.
onClosing()Pre-shutdown hook before connections are drained.
onClose()Server shutting down.
onReconfigure(newOpts, oldOpts)Server configuration is hot-reloaded.

2. Message Interception & Observability

HookSignatureDescription
onBeforeReceive(client, rawData)Intercept raw incoming buffer before parsing. Return false to drop.
onBeforeSend(client, message)Intercept outgoing tuple before serialization. Return false to block.
onMessage(client, payload)Unified observability event for both sent (OUT) and received (IN) message tuples.

3. Error Handling

HookSignatureDescription
onBadMessage(client, raw, err)Malformed or non-JSON message received.
onValidationFailure(client, msg, err)Message failed JSON Schema validation (strict mode).
onHandlerError(client, method, err)User-provided CALL handler threw an exception.
onError(client, err)Core WebSocket transport error.

4. Security & Rate Limiting

HookSignatureDescription
onSecurityEvent(evt)Audit-ready security events (e.g. rate limit hit, auth failed, anomalies).
onAuthFailed(handshake, code, msg)Connection rejected during the handshake/authorization phase.
onRateLimitExceeded(client, raw)Message dropped because internal rate limit thresholds were exceeded.
onTLSUpdate(tlsOpts)TLS certificates are hot-reloaded by the server.

5. Telemetry & Infrastructure

HookSignatureDescription
onTelemetry(stats, adapterStats?)Periodic server stats push (opt-in via telemetry.pushIntervalMs).
getCustomMetrics()Return Prometheus strings to merge into the /metrics endpoint.
onBackpressure(client, buffered)Socket buffer is filling up (client isn't reading fast enough).
onPongTimeout(client)Dead peer detected (no response to ping).

Level 4: Middleware Plugins

piiRedactorPlugin(options?)

Recursively redacts Personally Identifiable Information from OCPP message payloads before they reach handlers or downstream plugins. Installs as client middleware.

OptionTypeDefaultDescription
fieldsstring[]["idToken", "idTag", ...]Field names to redact.
maskstring"[REDACTED]"Replacement string.
deepbooleantrueRecursively traverse nested objects.
import { piiRedactorPlugin } from "ocpp-ws-io/plugins";

server.plugin(piiRedactorPlugin({
  fields: ["idToken", "idTag", "email", "phoneNumber", "groupIdToken"],
  mask: "***",
}));

PII redaction is irreversible — the original values are not preserved. If you need to forward the original payload to a secure store, do so before this plugin runs (register your forwarding plugin at Level 1 first, or use a separate onMessage hook).


schemaVersioningPlugin(options)

Transforms OCPP message payloads between protocol versions using user-provided rules. Enables a CSMS to support mixed-version charging station fleets (e.g., 1.6 and 2.0.1) with unified application handlers.

OptionTypeDefaultDescription
sourceVersionstringrequiredClient-side OCPP version (e.g., "ocpp1.6").
targetVersionstringrequiredHandler-side OCPP version (e.g., "ocpp2.0.1").
rulesTransformRule[]requiredArray of per-method transformation rules.
unmatchedBehavior"passthrough" | "reject""passthrough"What to do with methods without rules.
applyWhenstringOnly apply when client protocol matches this value.
import { schemaVersioningPlugin } from "ocpp-ws-io/plugins";

server.plugin(schemaVersioningPlugin({
  sourceVersion: "ocpp1.6",
  targetVersion: "ocpp2.0.1",
  applyWhen: "ocpp1.6", // Only transform 1.6 clients
  rules: [
    {
      method: "BootNotification",
      transform: (payload, direction) => {
        if (direction === "up") {
          // 1.6 → 2.0.1: Wrap flat fields into chargingStation object
          return {
            chargingStation: {
              model: payload.chargePointModel,
              vendorName: payload.chargePointVendor,
              serialNumber: payload.chargePointSerialNumber,
              firmwareVersion: payload.firmwareVersion,
            },
            reason: "PowerUp",
          };
        }
        // 2.0.1 → 1.6: Flatten chargingStation back
        const cs = (payload.chargingStation ?? {}) as Record<string, unknown>;
        return {
          chargePointModel: cs.model,
          chargePointVendor: cs.vendorName,
          chargePointSerialNumber: cs.serialNumber,
          firmwareVersion: cs.firmwareVersion,
        };
      },
    },
    {
      method: "StatusNotification",
      transform: (payload, direction) => {
        if (direction === "up") {
          return {
            timestamp: new Date().toISOString(),
            connectorId: payload.connectorId,
            connectorStatus: payload.status === "Available" ? "Available" : "Occupied",
            evseId: payload.connectorId,
          };
        }
        return payload;
      },
    },
  ],
}));

How it works:

Station (1.6) ──→ [Transform UP] ──→ Application Handler (2.0.1)
Station (1.6) ←── [Transform DOWN] ←── Application Handler (2.0.1)

Level 3: Interceptor Plugins

messageDedupPlugin(options)

Redis-backed message deduplication that prevents duplicate OCPP CALL messages from being processed. Hooks into onBeforeReceive and returns false to silently drop duplicate message IDs.

OptionTypeDefaultDescription
redisRedisLikerequiredRedis client instance (ioredis or node-redis v4).
redisStyle"positional" | "options""positional"Redis SET command argument style.
ttlMsnumber300000TTL for dedup keys (5 minutes default).
keyPrefixstring"ocpp:dedup"Redis key prefix.

redisStyle explained: Redis clients differ in how they accept SET arguments.

  • "positional" (ioredis): redis.set(key, value, "PX", ttl, "NX") — arguments passed as positional parameters.
  • "options" (node-redis v4): redis.set(key, value, { PX: ttl, NX: true }) — arguments passed as an options object.

The plugin defaults to "positional" (ioredis). If you use @redis/client (node-redis v4), set redisStyle: "options".

import { messageDedupPlugin } from "ocpp-ws-io/plugins";
import Redis from "ioredis";

const redis = new Redis(process.env.REDIS_URL);

// With ioredis (default)
server.plugin(messageDedupPlugin({ redis }));

// With node-redis v4
import { createClient } from "redis";
const nodeRedis = createClient({ url: process.env.REDIS_URL });
await nodeRedis.connect();

server.plugin(messageDedupPlugin({
  redis: nodeRedis,
  redisStyle: "options",
  ttlMs: 60_000,
}));

Fail-open design: If Redis is unreachable, the plugin logs an error and allows the message through rather than crashing the server.


replayBufferPlugin(options)

Persistent offline command queue backed by Redis. When the server needs to send a command to a station that's disconnected, the message is queued in Redis. When the station reconnects, buffered commands are automatically flushed.

OptionTypeDefaultDescription
redisRedisLikerequiredRedis client instance.
keyPrefixstring"ocpp:replay"Redis list key prefix.
flushConcurrencynumber5Max concurrent flush calls per client.
flushDelayMsnumber50Delay between individual flush calls (throttle).
import { replayBufferPlugin } from "ocpp-ws-io/plugins";

server.plugin(replayBufferPlugin({
  redis,
  flushConcurrency: 10,
  flushDelayMs: 100,
}));

// Queue a command for a disconnected station
await redis.rpush(
  "ocpp:replay:CP-001",
  JSON.stringify([2, "queued-1", "UnlockConnector", { connectorId: 1 }]),
);

// When CP-001 reconnects, the plugin automatically calls:
//   client.call("UnlockConnector", { connectorId: 1 })

Level 2: Lifecycle Controller Plugins

connectionGuardPlugin(options)

Enforces a hard limit on concurrent connections. Exceeding clients are force-closed with code 4029.

OptionTypeRequiredDescription
maxConnectionsnumberMaximum allowed concurrent connections.
import { connectionGuardPlugin } from "ocpp-ws-io/plugins";

server.plugin(connectionGuardPlugin({ maxConnections: 50_000 }));

anomalyPlugin(options?)

Detects anomalous connection patterns (rapid reconnections, fuzzing attempts). Emits securityEvent when thresholds are breached.

OptionTypeDefaultDescription
reconnectThresholdnumber5Max reconnects within window before alert.
windowMsnumber60000Sliding window duration in ms.
import { anomalyPlugin } from "ocpp-ws-io/plugins";

server.plugin(anomalyPlugin({ reconnectThreshold: 10 }));

server.on("securityEvent", (evt) => {
  if (evt.type === "ANOMALY_RAPID_RECONNECT") {
    console.warn(`Reconnect storm: ${evt.identity}`);
  }
});

circuitBreakerPlugin(options?)

Per-client circuit breaker for flapping or unreliable charging stations. Prevents a misbehaving client from degrading system performance by fast-failing outgoing calls after repeated errors.

OptionTypeDefaultDescription
failureThresholdnumber5Consecutive failures before circuit opens.
resetTimeoutMsnumber30000Duration circuit stays OPEN before probe (ms).
maxConcurrentnumber20Max concurrent outgoing calls per client.
onStateChangefunctionCallback: (identity, from, to) => void.

State Machine:

CLOSED ─(failures ≥ threshold)─→ OPEN
OPEN ─(resetTimeout expires)──→ HALF_OPEN
HALF_OPEN ─(success)───────────→ CLOSED
HALF_OPEN ─(failure)───────────→ OPEN
import { circuitBreakerPlugin } from "ocpp-ws-io/plugins";

server.plugin(circuitBreakerPlugin({
  failureThreshold: 3,
  resetTimeoutMs: 15_000,
  maxConcurrent: 10,
  onStateChange: (identity, from, to) => {
    console.log(`Circuit ${identity}: ${from} → ${to}`);
    if (to === "OPEN") {
      // Alert ops team — this station is flapping
      alerting.fire({ station: identity, event: "circuit_open" });
    }
  },
}));

Level 1: Passive Hook Plugins

heartbeatPlugin()

Auto-responds to OCPP Heartbeat calls with { currentTime }. Drop-in replacement for manual Heartbeat handlers.

import { heartbeatPlugin } from "ocpp-ws-io/plugins";

server.plugin(heartbeatPlugin());

metricsPlugin(options?)

Tracks real-time server metrics: active/peak connections, average duration, uptime, validation failures, and security events. Exports Prometheus-format metrics via getCustomMetrics().

OptionTypeDefaultDescription
intervalMsnumber30000Snapshot emit interval. 0 to disable.
onSnapshotfunctionCallback fired each interval with metrics.
import { metricsPlugin } from "ocpp-ws-io/plugins";

const metrics = metricsPlugin({
  intervalMs: 10_000,
  onSnapshot: (snap) => console.log(`Active: ${snap.activeConnections}`),
});
server.plugin(metrics);

// On-demand access
const snap = metrics.getMetrics();
// { activeConnections, peakConnections, totalConnections, totalValidationFailures, totalSecurityEvents, ... }

// Prometheus export
const lines = await metrics.getCustomMetrics();
// ["ocpp_connections_active 42", "ocpp_connections_peak 128", ...]

sessionLogPlugin(options?)

Logs connect/disconnect events with identity, IP, protocol, and connection duration.

OptionTypeDefaultDescription
logger{ info, warn }consoleCustom logger instance.
logLevel"minimal" | "standard" | "verbose""standard"Verbosity level.
import { sessionLogPlugin } from "ocpp-ws-io/plugins";

server.plugin(sessionLogPlugin());
// => [session] connected { identity: "CP-101", ip: "192.168.1.1", protocol: "ocpp1.6" }
// => [session] disconnected { identity: "CP-101", durationSec: 3600, code: 1000 }

otelPlugin(options?)

OpenTelemetry integration — creates spans for connection lifecycle events, records validation failures, rate limit events, backpressure, and server telemetry. Requires @opentelemetry/api as an optional peer dependency. If not installed, the plugin silently disables itself.

OptionTypeDefaultDescription
serviceNamestring"ocpp-server"Tracer service name.
tracerTracerLikeCustom tracer instance (overrides auto-detection).
import { otelPlugin } from "ocpp-ws-io/plugins";

server.plugin(otelPlugin({ serviceName: "my-csms" }));

Instrumented hooks: onConnection, onDisconnect, onError, onBadMessage, onValidationFailure, onRateLimitExceeded, onBackpressure, onPongTimeout, onSecurityEvent, onAuthFailed, onEviction, onTelemetry, onClosing, onClose.


webhookPlugin(options)

Sends HTTP POST webhooks on server lifecycle events with HMAC-SHA256 signing and retry support.

OptionTypeDefaultDescription
urlstringrequiredWebhook endpoint URL.
eventsstring[]AllFilter: "init", "connect", "disconnect", "close".
headersRecordCustom HTTP headers.
secretstringHMAC-SHA256 secret (sent as X-Signature).
timeoutnumber5000Fetch timeout in ms.
retriesnumber1Retry count on failure.
import { webhookPlugin } from "ocpp-ws-io/plugins";

server.plugin(webhookPlugin({
  url: "https://api.example.com/ocpp-events",
  secret: process.env.WEBHOOK_SECRET,
  events: ["connect", "disconnect"],
  headers: { Authorization: "Bearer token123" },
}));

kafkaPlugin(options)

High-throughput Kafka event streaming sink. Publishes OCPP lifecycle events to Kafka topics using a user-provided producer (e.g., kafkajs).

OptionTypeDefaultDescription
producerKafkaProducerLikerequiredKafka producer instance.
topicstring"ocpp.events"Base topic name.
eventsKafkaEvent[]AllFilter which events to publish.
topicRoutingbooleanfalseRoute events to topic.{event} subtopics.
import { kafkaPlugin } from "ocpp-ws-io/plugins";
import { Kafka } from "kafkajs";

const kafka = new Kafka({ brokers: ["kafka:9092"] });
const producer = kafka.producer();
await producer.connect();

server.plugin(kafkaPlugin({
  producer,
  topic: "ocpp.events",
  topicRouting: true, // → ocpp.events.connect, ocpp.events.message, etc.
  events: ["connect", "disconnect", "message", "security"],
}));

rateLimitNotifierPlugin(options)

Fires alerts to an external sink (webhook URL or custom object) when clients exceed rate limits. Implements per-identity cooldown and threshold-based alerting to prevent alert storms.

OptionTypeDefaultDescription
sinkstring | AlertSinkrequiredWebhook URL or custom sink with send(payload).
cooldownMsnumber60000Min interval between alerts per identity.
thresholdnumber1Events before alert fires.
windowMsnumber300000Sliding window for counting events.
headersRecordCustom HTTP headers for webhook sink.
import { rateLimitNotifierPlugin } from "ocpp-ws-io/plugins";

// Simple webhook
server.plugin(rateLimitNotifierPlugin({
  sink: "https://slack.example.com/webhook",
  cooldownMs: 120_000,
  threshold: 3,
}));

// Custom sink (e.g., Kafka, PagerDuty, etc.)
server.plugin(rateLimitNotifierPlugin({
  sink: {
    send: async (alert) => {
      await pagerduty.trigger({
        summary: `Rate limit: ${alert.identity} (${alert.count} events)`,
        severity: "warning",
      });
    },
  },
  threshold: 5,
}));

Creating Custom Plugins

Use createPlugin for type-safe custom plugins:

import { createPlugin } from "ocpp-ws-io";

const myPlugin = createPlugin({
  name: "my-advanced-plugin",

  onInit(server) {
    console.log(`Plugin loaded on server listening at ${server.options.port}`);
  },

  onSecurityEvent(evt) {
    if (evt.type === "RATE_LIMIT_EXCEEDED") {
      console.warn(`[Security] Rate limit hit by ${evt.identity}`);
    }
  },

  onBeforeReceive(client, rawData) {
    // Arbitrary size limits or binary magic number checks
    if (rawData.length > 1024 * 1024) {
      console.error(`Payload too large from ${client.identity}`);
      return false; // Drops the message
    }
  },

  onMessage(client, payload) {
    // Unified observability: both outgoing and incoming messages
    const { message, direction, ctx } = payload;
    console.log(`[${direction}] ${ctx.action || ctx.type} at ${ctx.timestamp.toISOString()}`);
  },

  onTelemetry(stats) {
    // Push stats to your external monitoring system
    console.log(`Active Connections: ${stats.activeConnections}`);
  }
});

server.plugin(myPlugin);

On this page