import type { WSContext } from 'hono/ws' import dayjs from 'dayjs' import { eq } from 'drizzle-orm' import { isSupportedOCPP } from '@/constants.js' import { useDrizzle } from '@/lib/db.js' import { chargePoint } from '@/db/schema.js' import { OCPP_MESSAGE_TYPE, type OcppCall, type OcppCallErrorMessage, type OcppCallResultMessage, type OcppErrorCode, type OcppMessage, type OcppConnectionContext, type CommandChannelStatus, type AuthorizeRequest, type AuthorizeResponse, type BootNotificationRequest, type BootNotificationResponse, type HeartbeatRequest, type HeartbeatResponse, type MeterValuesRequest, type MeterValuesResponse, type StartTransactionRequest, type StartTransactionResponse, type StatusNotificationRequest, type StatusNotificationResponse, type StopTransactionRequest, type StopTransactionResponse, } from './types.ts' /** * Global registry of active OCPP WebSocket connections. * Key = chargePointIdentifier, Value = connection entry */ export type OcppConnectionEntry = { ws: WSContext sessionId: string openedAt: Date lastMessageAt: Date } type PendingCall = | { chargePointIdentifier: string action: string resolve: (payload: unknown) => void reject: (error: Error) => void timeout: ReturnType } export const ocppConnections = new Map() const pendingCalls = new Map() import { handleAuthorize } from './actions/authorize.ts' import { handleBootNotification } from './actions/boot-notification.ts' import { handleHeartbeat } from './actions/heartbeat.ts' import { handleMeterValues } from './actions/meter-values.ts' import { handleStartTransaction } from './actions/start-transaction.ts' import { handleStatusNotification } from './actions/status-notification.ts' import { handleStopTransaction } from './actions/stop-transaction.ts' // Typed dispatch map — only registered actions are accepted type ActionHandlerMap = { Authorize: ( payload: AuthorizeRequest, ctx: OcppConnectionContext, ) => Promise BootNotification: ( payload: BootNotificationRequest, ctx: OcppConnectionContext, ) => Promise Heartbeat: ( payload: HeartbeatRequest, ctx: OcppConnectionContext, ) => Promise MeterValues: ( payload: MeterValuesRequest, ctx: OcppConnectionContext, ) => Promise StartTransaction: ( payload: StartTransactionRequest, ctx: OcppConnectionContext, ) => Promise StatusNotification: ( payload: StatusNotificationRequest, ctx: OcppConnectionContext, ) => Promise StopTransaction: ( payload: StopTransactionRequest, ctx: OcppConnectionContext, ) => Promise } const actionHandlers: ActionHandlerMap = { Authorize: handleAuthorize, BootNotification: handleBootNotification, Heartbeat: handleHeartbeat, MeterValues: handleMeterValues, StartTransaction: handleStartTransaction, StatusNotification: handleStatusNotification, StopTransaction: handleStopTransaction, } function shortId(value: string): string { return value.length <= 12 ? value : `${value.slice(0, 8)}...${value.slice(-4)}` } function summarizeCallPayload(action: string, payload: unknown): Record { const p = payload as Record switch (action) { case 'BootNotification': return { chargePointVendor: p.chargePointVendor, chargePointModel: p.chargePointModel, firmwareVersion: p.firmwareVersion ?? null, } case 'Authorize': return { idTag: typeof p.idTag === 'string' ? shortId(p.idTag) : undefined, } case 'Heartbeat': return {} case 'StatusNotification': return { connectorId: p.connectorId, status: p.status, errorCode: p.errorCode, timestamp: p.timestamp ?? null, } case 'StartTransaction': return { connectorId: p.connectorId, idTag: typeof p.idTag === 'string' ? shortId(p.idTag) : undefined, meterStart: p.meterStart, timestamp: p.timestamp, } case 'StopTransaction': return { transactionId: p.transactionId, meterStop: p.meterStop, reason: p.reason ?? null, timestamp: p.timestamp, } case 'MeterValues': return { connectorId: p.connectorId, transactionId: p.transactionId ?? null, meterValueCount: Array.isArray(p.meterValue) ? p.meterValue.length : 0, } default: return { keys: p && typeof p === 'object' ? Object.keys(p).slice(0, 20) : [], } } } function sendCallResult(ws: WSContext, uniqueId: string, payload: unknown): void { ws.send(JSON.stringify([OCPP_MESSAGE_TYPE.CALLRESULT, uniqueId, payload])) } function sendCallError( ws: WSContext, uniqueId: string, errorCode: OcppErrorCode, errorDescription: string, ): void { ws.send( JSON.stringify([OCPP_MESSAGE_TYPE.CALLERROR, uniqueId, errorCode, errorDescription, {}]), ) } async function updateTransportState( chargePointIdentifier: string, values: Partial, ): Promise { const db = useDrizzle() await db .update(chargePoint) .set({ ...values, updatedAt: dayjs().toDate(), }) .where(eq(chargePoint.chargePointIdentifier, chargePointIdentifier)) } async function getRegistrationStatus(chargePointIdentifier: string) { const db = useDrizzle() const [cp] = await db .select({ registrationStatus: chargePoint.registrationStatus }) .from(chargePoint) .where(eq(chargePoint.chargePointIdentifier, chargePointIdentifier)) .limit(1) return cp?.registrationStatus ?? null } function getCommandChannelStatus(chargePointIdentifier: string): CommandChannelStatus { return ocppConnections.has(chargePointIdentifier) ? 'online' : 'unavailable' } export async function sendOcppCall, TResult = unknown>( chargePointIdentifier: string, action: string, payload: TPayload, timeoutOrOptions: number | { timeoutMs?: number; uniqueId?: string } = 15000, ): Promise { const entry = ocppConnections.get(chargePointIdentifier) if (!entry) { await updateTransportState(chargePointIdentifier, { transportStatus: 'unavailable' }) throw new Error('TransportUnavailable') } const timeoutMs = typeof timeoutOrOptions === 'number' ? timeoutOrOptions : (timeoutOrOptions.timeoutMs ?? 15000) const uniqueId = typeof timeoutOrOptions === 'number' ? crypto.randomUUID() : (timeoutOrOptions.uniqueId ?? crypto.randomUUID()) console.info( `[OCPP][TX][CALL] cp=${chargePointIdentifier} action=${action} uniqueId=${uniqueId} timeoutMs=${timeoutMs}`, summarizeCallPayload(action, payload), ) const resultPromise = new Promise((resolve, reject) => { const timeout = setTimeout(async () => { pendingCalls.delete(uniqueId) console.warn( `[OCPP][TX][TIMEOUT] cp=${chargePointIdentifier} action=${action} uniqueId=${uniqueId} timeoutMs=${timeoutMs}`, ) await updateTransportState(chargePointIdentifier, { transportStatus: getCommandChannelStatus(chargePointIdentifier), lastCommandStatus: 'Timeout', lastCommandAt: dayjs().toDate(), }) reject(new Error('CommandTimeout')) }, timeoutMs) pendingCalls.set(uniqueId, { chargePointIdentifier, action, resolve: (response) => { console.info( `[OCPP][TX][CALLRESULT] cp=${chargePointIdentifier} action=${action} uniqueId=${uniqueId}`, ) resolve(response as TResult) }, reject, timeout, }) }) try { entry.ws.send(JSON.stringify([OCPP_MESSAGE_TYPE.CALL, uniqueId, action, payload])) } catch (error) { const pending = pendingCalls.get(uniqueId) if (pending) { clearTimeout(pending.timeout) pendingCalls.delete(uniqueId) } await updateTransportState(chargePointIdentifier, { transportStatus: 'unavailable', lastCommandStatus: 'Error', lastCommandAt: dayjs().toDate(), }) console.error( `[OCPP][TX][SEND_ERROR] cp=${chargePointIdentifier} action=${action} uniqueId=${uniqueId}`, error, ) throw error instanceof Error ? error : new Error('CommandSendFailed') } return resultPromise } /** * Factory that produces a hono-ws event handler object for a single * OCPP WebSocket connection. * * Usage in route: * upgradeWebSocket((c) => createOcppHandler(c.req.param('chargePointId'), remoteAddr)) */ export function createOcppHandler(chargePointIdentifier: string, remoteAddr?: string) { const ctx: OcppConnectionContext = { chargePointIdentifier, isRegistered: false, } const sessionId = crypto.randomUUID() return { async onOpen(_evt: Event, ws: WSContext) { const subProtocol = ws.protocol ?? 'unknown' if (!isSupportedOCPP(subProtocol)) { console.warn( `[OCPP][WS][OPEN_REJECT] cp=${chargePointIdentifier} session=${sessionId} subProtocol=${subProtocol} reason=unsupported_subprotocol`, ) ws.close(1002, 'Unsupported subprotocol') return } const registrationStatus = await getRegistrationStatus(chargePointIdentifier) ctx.isRegistered = registrationStatus === 'Accepted' const previous = ocppConnections.get(chargePointIdentifier) if (previous && previous.sessionId !== sessionId) { try { previous.ws.close(1012, 'Replaced by newer connection') } catch { // Ignore close race when the old socket is already gone. } } ocppConnections.set(chargePointIdentifier, { ws, sessionId, openedAt: new Date(), lastMessageAt: new Date(), }) await updateTransportState(chargePointIdentifier, { transportStatus: 'online', connectionSessionId: sessionId, lastWsConnectedAt: dayjs().toDate(), }) console.info( `[OCPP][WS][OPEN] cp=${chargePointIdentifier} session=${sessionId} subProtocol=${subProtocol} registered=${ctx.isRegistered}` + (remoteAddr ? ` remote=${remoteAddr}` : ''), ) if (previous && previous.sessionId !== sessionId) { console.info( `[OCPP][WS][REPLACE] cp=${chargePointIdentifier} oldSession=${previous.sessionId} newSession=${sessionId}`, ) } }, async onMessage(evt: MessageEvent, ws: WSContext) { let uniqueId = '(unknown)' try { const current = ocppConnections.get(chargePointIdentifier) if (!current || current.sessionId !== sessionId) { console.warn( `[OCPP][WS][STALE] cp=${chargePointIdentifier} session=${sessionId} activeSession=${current?.sessionId ?? 'none'}`, ) try { ws.close(1008, 'Stale connection') } catch { // Ignore close errors on stale sockets. } return } current.lastMessageAt = new Date() const raw = evt.data if (typeof raw !== 'string') return let message: OcppMessage try { message = JSON.parse(raw) as OcppMessage } catch { sendCallError(ws, uniqueId, 'FormationViolation', 'Invalid JSON') return } if (!Array.isArray(message) || message.length < 3) { sendCallError(ws, uniqueId, 'FormationViolation', 'Message must be a JSON array') return } const [messageType, msgUniqueId] = message uniqueId = String(msgUniqueId) if (messageType === OCPP_MESSAGE_TYPE.CALL) { const [, , action, payload] = message as OcppCall console.info( `[OCPP][RX][CALL] cp=${chargePointIdentifier} session=${sessionId} action=${action} uniqueId=${uniqueId}`, summarizeCallPayload(action, payload), ) } else if (messageType === OCPP_MESSAGE_TYPE.CALLRESULT) { console.info( `[OCPP][RX][CALLRESULT] cp=${chargePointIdentifier} session=${sessionId} uniqueId=${uniqueId}`, ) } else if (messageType === OCPP_MESSAGE_TYPE.CALLERROR) { const [, , errorCode, errorDescription] = message as OcppCallErrorMessage console.warn( `[OCPP][RX][CALLERROR] cp=${chargePointIdentifier} session=${sessionId} uniqueId=${uniqueId} code=${errorCode} desc=${errorDescription}`, ) } if (messageType === OCPP_MESSAGE_TYPE.CALLRESULT) { const [, responseUniqueId, payload] = message as OcppCallResultMessage const pending = pendingCalls.get(responseUniqueId) if (!pending) return clearTimeout(pending.timeout) pendingCalls.delete(responseUniqueId) await updateTransportState(pending.chargePointIdentifier, { transportStatus: getCommandChannelStatus(pending.chargePointIdentifier), lastCommandStatus: 'Accepted', lastCommandAt: dayjs().toDate(), }) pending.resolve(payload) return } if (messageType === OCPP_MESSAGE_TYPE.CALLERROR) { const [, responseUniqueId, errorCode, errorDescription] = message as OcppCallErrorMessage const pending = pendingCalls.get(responseUniqueId) if (!pending) return clearTimeout(pending.timeout) pendingCalls.delete(responseUniqueId) await updateTransportState(pending.chargePointIdentifier, { transportStatus: getCommandChannelStatus(pending.chargePointIdentifier), lastCommandStatus: errorCode === 'InternalError' ? 'Error' : 'Rejected', lastCommandAt: dayjs().toDate(), }) console.warn( `[OCPP][TX][REJECTED] cp=${pending.chargePointIdentifier} action=${pending.action} uniqueId=${responseUniqueId} code=${errorCode} desc=${errorDescription}`, ) pending.reject(new Error(`${errorCode}:${errorDescription}`)) return } if (messageType !== OCPP_MESSAGE_TYPE.CALL) return const [, , action, payload] = message as OcppCall // Enforce BootNotification before any other action if (!ctx.isRegistered && action !== 'BootNotification') { console.warn( `[OCPP][RX][REJECT] cp=${chargePointIdentifier} session=${sessionId} action=${action} uniqueId=${uniqueId} reason=boot_notification_required`, ) sendCallError( ws, uniqueId, 'SecurityError', 'Charge point must send BootNotification first', ) return } const handler = actionHandlers[action as keyof ActionHandlerMap] if (!handler) { console.warn( `[OCPP][RX][NOT_IMPLEMENTED] cp=${chargePointIdentifier} session=${sessionId} action=${action} uniqueId=${uniqueId}`, ) sendCallError(ws, uniqueId, 'NotImplemented', `Action '${action}' is not implemented`) return } const response = await ( handler as (payload: unknown, ctx: OcppConnectionContext) => Promise )(payload, ctx) console.info( `[OCPP][TX][CALLRESULT] cp=${chargePointIdentifier} session=${sessionId} action=${action} uniqueId=${uniqueId}`, ) sendCallResult(ws, uniqueId, response) } catch (err) { console.error( `[OCPP][RX][ERROR] cp=${chargePointIdentifier} session=${sessionId} uniqueId=${uniqueId}`, err, ) sendCallError(ws, uniqueId, 'InternalError', 'Internal server error') } }, async onClose(evt: CloseEvent, _ws: WSContext) { const current = ocppConnections.get(chargePointIdentifier) if (current?.sessionId === sessionId) { ocppConnections.delete(chargePointIdentifier) await updateTransportState(chargePointIdentifier, { transportStatus: 'offline', lastWsDisconnectedAt: dayjs().toDate(), }) } console.info( `[OCPP][WS][CLOSE] cp=${chargePointIdentifier} session=${sessionId} code=${evt.code} activeSession=${current?.sessionId ?? 'none'}`, ) }, } }