Plugins
Extend your OCPP server with built-in plugins for observability, security, event streaming, and protocol transformation.
Plugins
ocpp-ws-io ships 19 built-in plugins organized into a 4-level power hierarchy. All plugins are imported from ocpp-ws-io/plugins.
import { metricsPlugin, heartbeatPlugin, circuitBreakerPlugin } from "ocpp-ws-io/plugins";
server.plugin(
metricsPlugin(),
heartbeatPlugin(),
circuitBreakerPlugin(),
);Plugin Power Levels
Plugins execute in registration order, but they are designed to be layered by power level. Register them from highest to lowest:
┌─────────────────────────────────────────────────────────┐
│ Level 4 — Middleware (mutates payloads) │
│ pii-redactor · schema-versioning │
├─────────────────────────────────────────────────────────┤
│ Level 3 — Interceptor (can drop messages) │
│ message-dedup · replay-buffer │
├─────────────────────────────────────────────────────────┤
│ Level 2 — Lifecycle Controller (acts on clients) │
│ connection-guard · anomaly · circuit-breaker │
├─────────────────────────────────────────────────────────┤
│ Level 1 — Passive Hook (observes, never blocks) │
│ kafka · amqp · mqtt · redis-pubsub · webhook · metrics │
│ otel · session-log · heartbeat · rate-limit-notifier │
└─────────────────────────────────────────────────────────┘Recommended Registration Order
server.plugin(
// L4: Middleware — transform payloads before anything else sees them
piiRedactorPlugin({ fields: ["idToken", "email"] }),
schemaVersioningPlugin({ sourceVersion: "ocpp1.6", targetVersion: "ocpp2.0.1", rules }),
// L3: Interceptor — deduplicate or replay before handlers process
messageDedupPlugin({ redis }),
replayBufferPlugin({ redis }),
// L2: Lifecycle — guard and monitor connections
connectionGuardPlugin({ maxConnections: 50_000 }),
anomalyPlugin({ reconnectThreshold: 10 }),
circuitBreakerPlugin({ failureThreshold: 5 }),
// L1: Passive — observe and report
metricsPlugin(),
otelPlugin({ serviceName: "my-csms" }),
heartbeatPlugin(),
sessionLogPlugin(),
webhookPlugin({ url: "https://api.example.com/events" }),
kafkaPlugin({ producer }),
rateLimitNotifierPlugin({ sink: "https://alerts.example.com" }),
);Why does order matter? Level 4 plugins transform the payload before Level 3 plugins check for duplicates. Level 2 plugins can disconnect a client before Level 1 plugins log the message. Registering in the wrong order can cause unexpected behavior (e.g., dedup checking a PII-redacted payload won't match the original message ID).
Plugin API
Every plugin implements the OCPPPlugin interface. The system provides 22 lifecycle hooks divided into several categories:
1. Connection & Server Lifecycle
| Hook | Signature | Description |
|---|---|---|
onInit | (server) | Server starts listening and plugin is mounted. |
onConnection | (client) | New charging station connects and is fully initialized. |
onDisconnect | (client, code, reason) | Client disconnects normally. |
onEviction | (evicted, new) | Client is forcibly disconnected because another client connected with the same identity. |
onClosing | () | Pre-shutdown hook before connections are drained. |
onClose | () | Server shutting down. |
onReconfigure | (newOpts, oldOpts) | Server configuration is hot-reloaded. |
2. Message Interception & Observability
| Hook | Signature | Description |
|---|---|---|
onBeforeReceive | (client, rawData) | Intercept raw incoming buffer before parsing. Return false to drop. |
onBeforeSend | (client, message) | Intercept outgoing tuple before serialization. Return false to block. |
onMessage | (client, payload) | Unified observability event for both sent (OUT) and received (IN) message tuples. |
3. Error Handling
| Hook | Signature | Description |
|---|---|---|
onBadMessage | (client, raw, err) | Malformed or non-JSON message received. |
onValidationFailure | (client, msg, err) | Message failed JSON Schema validation (strict mode). |
onHandlerError | (client, method, err) | User-provided CALL handler threw an exception. |
onError | (client, err) | Core WebSocket transport error. |
4. Security & Rate Limiting
| Hook | Signature | Description |
|---|---|---|
onSecurityEvent | (evt) | Audit-ready security events (e.g. rate limit hit, auth failed, anomalies). |
onAuthFailed | (handshake, code, msg) | Connection rejected during the handshake/authorization phase. |
onRateLimitExceeded | (client, raw) | Message dropped because internal rate limit thresholds were exceeded. |
onTLSUpdate | (tlsOpts) | TLS certificates are hot-reloaded by the server. |
5. Telemetry & Infrastructure
| Hook | Signature | Description |
|---|---|---|
onTelemetry | (stats, adapterStats?) | Periodic server stats push (opt-in via telemetry.pushIntervalMs). |
getCustomMetrics | () | Return Prometheus strings to merge into the /metrics endpoint. |
onBackpressure | (client, buffered) | Socket buffer is filling up (client isn't reading fast enough). |
onPongTimeout | (client) | Dead peer detected (no response to ping). |
Level 4: Middleware Plugins
piiRedactorPlugin(options?)
Recursively redacts Personally Identifiable Information from OCPP message payloads before they reach handlers or downstream plugins. Installs as client middleware.
| Option | Type | Default | Description |
|---|---|---|---|
fields | string[] | ["idToken", "idTag", ...] | Field names to redact. |
mask | string | "[REDACTED]" | Replacement string. |
deep | boolean | true | Recursively traverse nested objects. |
import { piiRedactorPlugin } from "ocpp-ws-io/plugins";
server.plugin(piiRedactorPlugin({
fields: ["idToken", "idTag", "email", "phoneNumber", "groupIdToken"],
mask: "***",
}));PII redaction is irreversible — the original values are not preserved. If you need to forward the original payload to a secure store, do so before this plugin runs (register your forwarding plugin at Level 1 first, or use a separate onMessage hook).
schemaVersioningPlugin(options)
Transforms OCPP message payloads between protocol versions using user-provided rules. Enables a CSMS to support mixed-version charging station fleets (e.g., 1.6 and 2.0.1) with unified application handlers.
| Option | Type | Default | Description |
|---|---|---|---|
sourceVersion | string | required | Client-side OCPP version (e.g., "ocpp1.6"). |
targetVersion | string | required | Handler-side OCPP version (e.g., "ocpp2.0.1"). |
rules | TransformRule[] | required | Array of per-method transformation rules. |
unmatchedBehavior | "passthrough" | "reject" | "passthrough" | What to do with methods without rules. |
applyWhen | string | — | Only apply when client protocol matches this value. |
import { schemaVersioningPlugin } from "ocpp-ws-io/plugins";
server.plugin(schemaVersioningPlugin({
sourceVersion: "ocpp1.6",
targetVersion: "ocpp2.0.1",
applyWhen: "ocpp1.6", // Only transform 1.6 clients
rules: [
{
method: "BootNotification",
transform: (payload, direction) => {
if (direction === "up") {
// 1.6 → 2.0.1: Wrap flat fields into chargingStation object
return {
chargingStation: {
model: payload.chargePointModel,
vendorName: payload.chargePointVendor,
serialNumber: payload.chargePointSerialNumber,
firmwareVersion: payload.firmwareVersion,
},
reason: "PowerUp",
};
}
// 2.0.1 → 1.6: Flatten chargingStation back
const cs = (payload.chargingStation ?? {}) as Record<string, unknown>;
return {
chargePointModel: cs.model,
chargePointVendor: cs.vendorName,
chargePointSerialNumber: cs.serialNumber,
firmwareVersion: cs.firmwareVersion,
};
},
},
{
method: "StatusNotification",
transform: (payload, direction) => {
if (direction === "up") {
return {
timestamp: new Date().toISOString(),
connectorId: payload.connectorId,
connectorStatus: payload.status === "Available" ? "Available" : "Occupied",
evseId: payload.connectorId,
};
}
return payload;
},
},
],
}));How it works:
Station (1.6) ──→ [Transform UP] ──→ Application Handler (2.0.1)
Station (1.6) ←── [Transform DOWN] ←── Application Handler (2.0.1)Level 3: Interceptor Plugins
messageDedupPlugin(options)
Redis-backed message deduplication that prevents duplicate OCPP CALL messages from being processed. Hooks into onBeforeReceive and returns false to silently drop duplicate message IDs.
| Option | Type | Default | Description |
|---|---|---|---|
redis | RedisLike | required | Redis client instance (ioredis or node-redis v4). |
redisStyle | "positional" | "options" | "positional" | Redis SET command argument style. |
ttlMs | number | 300000 | TTL for dedup keys (5 minutes default). |
keyPrefix | string | "ocpp:dedup" | Redis key prefix. |
redisStyle explained: Redis clients differ in how they accept SET arguments.
"positional"(ioredis):redis.set(key, value, "PX", ttl, "NX")— arguments passed as positional parameters."options"(node-redis v4):redis.set(key, value, { PX: ttl, NX: true })— arguments passed as an options object.
The plugin defaults to "positional" (ioredis). If you use @redis/client (node-redis v4), set redisStyle: "options".
import { messageDedupPlugin } from "ocpp-ws-io/plugins";
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL);
// With ioredis (default)
server.plugin(messageDedupPlugin({ redis }));
// With node-redis v4
import { createClient } from "redis";
const nodeRedis = createClient({ url: process.env.REDIS_URL });
await nodeRedis.connect();
server.plugin(messageDedupPlugin({
redis: nodeRedis,
redisStyle: "options",
ttlMs: 60_000,
}));Fail-open design: If Redis is unreachable, the plugin logs an error and allows the message through rather than crashing the server.
replayBufferPlugin(options)
Persistent offline command queue backed by Redis. When the server needs to send a command to a station that's disconnected, the message is queued in Redis. When the station reconnects, buffered commands are automatically flushed.
| Option | Type | Default | Description |
|---|---|---|---|
redis | RedisLike | required | Redis client instance. |
keyPrefix | string | "ocpp:replay" | Redis list key prefix. |
flushConcurrency | number | 5 | Max concurrent flush calls per client. |
flushDelayMs | number | 50 | Delay between individual flush calls (throttle). |
import { replayBufferPlugin } from "ocpp-ws-io/plugins";
server.plugin(replayBufferPlugin({
redis,
flushConcurrency: 10,
flushDelayMs: 100,
}));
// Queue a command for a disconnected station
await redis.rpush(
"ocpp:replay:CP-001",
JSON.stringify([2, "queued-1", "UnlockConnector", { connectorId: 1 }]),
);
// When CP-001 reconnects, the plugin automatically calls:
// client.call("UnlockConnector", { connectorId: 1 })Level 2: Lifecycle Controller Plugins
connectionGuardPlugin(options)
Enforces a hard limit on concurrent connections. Exceeding clients are force-closed with code 4029.
| Option | Type | Required | Description |
|---|---|---|---|
maxConnections | number | ✅ | Maximum allowed concurrent connections. |
import { connectionGuardPlugin } from "ocpp-ws-io/plugins";
server.plugin(connectionGuardPlugin({ maxConnections: 50_000 }));anomalyPlugin(options?)
Detects anomalous connection patterns (rapid reconnections, fuzzing attempts). Emits securityEvent when thresholds are breached.
| Option | Type | Default | Description |
|---|---|---|---|
reconnectThreshold | number | 5 | Max reconnects within window before alert. |
windowMs | number | 60000 | Sliding window duration in ms. |
import { anomalyPlugin } from "ocpp-ws-io/plugins";
server.plugin(anomalyPlugin({ reconnectThreshold: 10 }));
server.on("securityEvent", (evt) => {
if (evt.type === "ANOMALY_RAPID_RECONNECT") {
console.warn(`Reconnect storm: ${evt.identity}`);
}
});circuitBreakerPlugin(options?)
Per-client circuit breaker for flapping or unreliable charging stations. Prevents a misbehaving client from degrading system performance by fast-failing outgoing calls after repeated errors.
| Option | Type | Default | Description |
|---|---|---|---|
failureThreshold | number | 5 | Consecutive failures before circuit opens. |
resetTimeoutMs | number | 30000 | Duration circuit stays OPEN before probe (ms). |
maxConcurrent | number | 20 | Max concurrent outgoing calls per client. |
onStateChange | function | — | Callback: (identity, from, to) => void. |
State Machine:
CLOSED ─(failures ≥ threshold)─→ OPEN
OPEN ─(resetTimeout expires)──→ HALF_OPEN
HALF_OPEN ─(success)───────────→ CLOSED
HALF_OPEN ─(failure)───────────→ OPENimport { circuitBreakerPlugin } from "ocpp-ws-io/plugins";
server.plugin(circuitBreakerPlugin({
failureThreshold: 3,
resetTimeoutMs: 15_000,
maxConcurrent: 10,
onStateChange: (identity, from, to) => {
console.log(`Circuit ${identity}: ${from} → ${to}`);
if (to === "OPEN") {
// Alert ops team — this station is flapping
alerting.fire({ station: identity, event: "circuit_open" });
}
},
}));Level 1: Passive Hook Plugins
heartbeatPlugin()
Auto-responds to OCPP Heartbeat calls with { currentTime }. Drop-in replacement for manual Heartbeat handlers.
import { heartbeatPlugin } from "ocpp-ws-io/plugins";
server.plugin(heartbeatPlugin());metricsPlugin(options?)
Tracks real-time server metrics: active/peak connections, average duration, uptime, validation failures, and security events. Exports Prometheus-format metrics via getCustomMetrics().
| Option | Type | Default | Description |
|---|---|---|---|
intervalMs | number | 30000 | Snapshot emit interval. 0 to disable. |
onSnapshot | function | — | Callback fired each interval with metrics. |
import { metricsPlugin } from "ocpp-ws-io/plugins";
const metrics = metricsPlugin({
intervalMs: 10_000,
onSnapshot: (snap) => console.log(`Active: ${snap.activeConnections}`),
});
server.plugin(metrics);
// On-demand access
const snap = metrics.getMetrics();
// { activeConnections, peakConnections, totalConnections, totalValidationFailures, totalSecurityEvents, ... }
// Prometheus export
const lines = await metrics.getCustomMetrics();
// ["ocpp_connections_active 42", "ocpp_connections_peak 128", ...]sessionLogPlugin(options?)
Logs connect/disconnect events with identity, IP, protocol, and connection duration.
| Option | Type | Default | Description |
|---|---|---|---|
logger | { info, warn } | console | Custom logger instance. |
logLevel | "minimal" | "standard" | "verbose" | "standard" | Verbosity level. |
import { sessionLogPlugin } from "ocpp-ws-io/plugins";
server.plugin(sessionLogPlugin());
// => [session] connected { identity: "CP-101", ip: "192.168.1.1", protocol: "ocpp1.6" }
// => [session] disconnected { identity: "CP-101", durationSec: 3600, code: 1000 }otelPlugin(options?)
OpenTelemetry integration — creates spans for connection lifecycle events, records validation failures, rate limit events, backpressure, and server telemetry. Requires @opentelemetry/api as an optional peer dependency. If not installed, the plugin silently disables itself.
| Option | Type | Default | Description |
|---|---|---|---|
serviceName | string | "ocpp-server" | Tracer service name. |
tracer | TracerLike | — | Custom tracer instance (overrides auto-detection). |
import { otelPlugin } from "ocpp-ws-io/plugins";
server.plugin(otelPlugin({ serviceName: "my-csms" }));Instrumented hooks: onConnection, onDisconnect, onError, onBadMessage, onValidationFailure, onRateLimitExceeded, onBackpressure, onPongTimeout, onSecurityEvent, onAuthFailed, onEviction, onTelemetry, onClosing, onClose.
webhookPlugin(options)
Sends HTTP POST webhooks on server lifecycle events with HMAC-SHA256 signing and retry support.
| Option | Type | Default | Description |
|---|---|---|---|
url | string | required | Webhook endpoint URL. |
events | string[] | All | Filter: "init", "connect", "disconnect", "close". |
headers | Record | — | Custom HTTP headers. |
secret | string | — | HMAC-SHA256 secret (sent as X-Signature). |
timeout | number | 5000 | Fetch timeout in ms. |
retries | number | 1 | Retry count on failure. |
import { webhookPlugin } from "ocpp-ws-io/plugins";
server.plugin(webhookPlugin({
url: "https://api.example.com/ocpp-events",
secret: process.env.WEBHOOK_SECRET,
events: ["connect", "disconnect"],
headers: { Authorization: "Bearer token123" },
}));kafkaPlugin(options)
High-throughput Kafka event streaming sink. Publishes OCPP lifecycle events to Kafka topics using a user-provided producer (e.g., kafkajs).
| Option | Type | Default | Description |
|---|---|---|---|
producer | KafkaProducerLike | required | Kafka producer instance. |
topic | string | "ocpp.events" | Base topic name. |
events | KafkaEvent[] | All | Filter which events to publish. |
topicRouting | boolean | false | Route events to topic.{event} subtopics. |
import { kafkaPlugin } from "ocpp-ws-io/plugins";
import { Kafka } from "kafkajs";
const kafka = new Kafka({ brokers: ["kafka:9092"] });
const producer = kafka.producer();
await producer.connect();
server.plugin(kafkaPlugin({
producer,
topic: "ocpp.events",
topicRouting: true, // → ocpp.events.connect, ocpp.events.message, etc.
events: ["connect", "disconnect", "message", "security"],
}));rateLimitNotifierPlugin(options)
Fires alerts to an external sink (webhook URL or custom object) when clients exceed rate limits. Implements per-identity cooldown and threshold-based alerting to prevent alert storms.
| Option | Type | Default | Description |
|---|---|---|---|
sink | string | AlertSink | required | Webhook URL or custom sink with send(payload). |
cooldownMs | number | 60000 | Min interval between alerts per identity. |
threshold | number | 1 | Events before alert fires. |
windowMs | number | 300000 | Sliding window for counting events. |
headers | Record | — | Custom HTTP headers for webhook sink. |
import { rateLimitNotifierPlugin } from "ocpp-ws-io/plugins";
// Simple webhook
server.plugin(rateLimitNotifierPlugin({
sink: "https://slack.example.com/webhook",
cooldownMs: 120_000,
threshold: 3,
}));
// Custom sink (e.g., Kafka, PagerDuty, etc.)
server.plugin(rateLimitNotifierPlugin({
sink: {
send: async (alert) => {
await pagerduty.trigger({
summary: `Rate limit: ${alert.identity} (${alert.count} events)`,
severity: "warning",
});
},
},
threshold: 5,
}));Creating Custom Plugins
Use createPlugin for type-safe custom plugins:
import { createPlugin } from "ocpp-ws-io";
const myPlugin = createPlugin({
name: "my-advanced-plugin",
onInit(server) {
console.log(`Plugin loaded on server listening at ${server.options.port}`);
},
onSecurityEvent(evt) {
if (evt.type === "RATE_LIMIT_EXCEEDED") {
console.warn(`[Security] Rate limit hit by ${evt.identity}`);
}
},
onBeforeReceive(client, rawData) {
// Arbitrary size limits or binary magic number checks
if (rawData.length > 1024 * 1024) {
console.error(`Payload too large from ${client.identity}`);
return false; // Drops the message
}
},
onMessage(client, payload) {
// Unified observability: both outgoing and incoming messages
const { message, direction, ctx } = payload;
console.log(`[${direction}] ${ctx.action || ctx.type} at ${ctx.timestamp.toISOString()}`);
},
onTelemetry(stats) {
// Push stats to your external monitoring system
console.log(`Active Connections: ${stats.activeConnections}`);
}
});
server.plugin(myPlugin);