import type { WSContext } from 'hono/ws' import dayjs from 'dayjs' import { eq } from 'drizzle-orm' import { isSupportedOCPP } from '@/constants.js' import { useDrizzle } from '@/lib/db.js' import { chargePoint } from '@/db/schema.js' import { OCPP_MESSAGE_TYPE, type OcppCall, type OcppCallErrorMessage, type OcppCallResultMessage, type OcppErrorCode, type OcppMessage, type OcppConnectionContext, type CommandChannelStatus, type AuthorizeRequest, type AuthorizeResponse, type BootNotificationRequest, type BootNotificationResponse, type HeartbeatRequest, type HeartbeatResponse, type MeterValuesRequest, type MeterValuesResponse, type StartTransactionRequest, type StartTransactionResponse, type StatusNotificationRequest, type StatusNotificationResponse, type StopTransactionRequest, type StopTransactionResponse, } from './types.ts' /** * Global registry of active OCPP WebSocket connections. * Key = chargePointIdentifier, Value = connection entry */ export type OcppConnectionEntry = { ws: WSContext sessionId: string openedAt: Date lastMessageAt: Date } type PendingCall = | { chargePointIdentifier: string action: string resolve: (payload: unknown) => void reject: (error: Error) => void timeout: ReturnType } export const ocppConnections = new Map() const pendingCalls = new Map() import { handleAuthorize } from './actions/authorize.ts' import { handleBootNotification } from './actions/boot-notification.ts' import { handleHeartbeat } from './actions/heartbeat.ts' import { handleMeterValues } from './actions/meter-values.ts' import { handleStartTransaction } from './actions/start-transaction.ts' import { handleStatusNotification } from './actions/status-notification.ts' import { handleStopTransaction } from './actions/stop-transaction.ts' // Typed dispatch map — only registered actions are accepted type ActionHandlerMap = { Authorize: ( payload: AuthorizeRequest, ctx: OcppConnectionContext, ) => Promise BootNotification: ( payload: BootNotificationRequest, ctx: OcppConnectionContext, ) => Promise Heartbeat: ( payload: HeartbeatRequest, ctx: OcppConnectionContext, ) => Promise MeterValues: ( payload: MeterValuesRequest, ctx: OcppConnectionContext, ) => Promise StartTransaction: ( payload: StartTransactionRequest, ctx: OcppConnectionContext, ) => Promise StatusNotification: ( payload: StatusNotificationRequest, ctx: OcppConnectionContext, ) => Promise StopTransaction: ( payload: StopTransactionRequest, ctx: OcppConnectionContext, ) => Promise } const actionHandlers: ActionHandlerMap = { Authorize: handleAuthorize, BootNotification: handleBootNotification, Heartbeat: handleHeartbeat, MeterValues: handleMeterValues, StartTransaction: handleStartTransaction, StatusNotification: handleStatusNotification, StopTransaction: handleStopTransaction, } function sendCallResult(ws: WSContext, uniqueId: string, payload: unknown): void { ws.send(JSON.stringify([OCPP_MESSAGE_TYPE.CALLRESULT, uniqueId, payload])) } function sendCallError( ws: WSContext, uniqueId: string, errorCode: OcppErrorCode, errorDescription: string, ): void { ws.send( JSON.stringify([OCPP_MESSAGE_TYPE.CALLERROR, uniqueId, errorCode, errorDescription, {}]), ) } async function updateTransportState( chargePointIdentifier: string, values: Partial, ): Promise { const db = useDrizzle() await db .update(chargePoint) .set({ ...values, updatedAt: dayjs().toDate(), }) .where(eq(chargePoint.chargePointIdentifier, chargePointIdentifier)) } function getCommandChannelStatus(chargePointIdentifier: string): CommandChannelStatus { return ocppConnections.has(chargePointIdentifier) ? 'online' : 'unavailable' } export async function sendOcppCall, TResult = unknown>( chargePointIdentifier: string, action: string, payload: TPayload, timeoutOrOptions: number | { timeoutMs?: number; uniqueId?: string } = 15000, ): Promise { const entry = ocppConnections.get(chargePointIdentifier) if (!entry) { await updateTransportState(chargePointIdentifier, { transportStatus: 'unavailable' }) throw new Error('TransportUnavailable') } const timeoutMs = typeof timeoutOrOptions === 'number' ? timeoutOrOptions : (timeoutOrOptions.timeoutMs ?? 15000) const uniqueId = typeof timeoutOrOptions === 'number' ? crypto.randomUUID() : (timeoutOrOptions.uniqueId ?? crypto.randomUUID()) const resultPromise = new Promise((resolve, reject) => { const timeout = setTimeout(async () => { pendingCalls.delete(uniqueId) await updateTransportState(chargePointIdentifier, { transportStatus: getCommandChannelStatus(chargePointIdentifier), lastCommandStatus: 'Timeout', lastCommandAt: dayjs().toDate(), }) reject(new Error('CommandTimeout')) }, timeoutMs) pendingCalls.set(uniqueId, { chargePointIdentifier, action, resolve: (response) => resolve(response as TResult), reject, timeout, }) }) try { entry.ws.send(JSON.stringify([OCPP_MESSAGE_TYPE.CALL, uniqueId, action, payload])) } catch (error) { const pending = pendingCalls.get(uniqueId) if (pending) { clearTimeout(pending.timeout) pendingCalls.delete(uniqueId) } await updateTransportState(chargePointIdentifier, { transportStatus: 'unavailable', lastCommandStatus: 'Error', lastCommandAt: dayjs().toDate(), }) throw error instanceof Error ? error : new Error('CommandSendFailed') } return resultPromise } /** * Factory that produces a hono-ws event handler object for a single * OCPP WebSocket connection. * * Usage in route: * upgradeWebSocket((c) => createOcppHandler(c.req.param('chargePointId'), remoteAddr)) */ export function createOcppHandler(chargePointIdentifier: string, remoteAddr?: string) { const ctx: OcppConnectionContext = { chargePointIdentifier, isRegistered: false, } const sessionId = crypto.randomUUID() return { async onOpen(_evt: Event, ws: WSContext) { const subProtocol = ws.protocol ?? 'unknown' if (!isSupportedOCPP(subProtocol)) { ws.close(1002, 'Unsupported subprotocol') return } ocppConnections.set(chargePointIdentifier, { ws, sessionId, openedAt: new Date(), lastMessageAt: new Date(), }) await updateTransportState(chargePointIdentifier, { transportStatus: 'online', connectionSessionId: sessionId, lastWsConnectedAt: dayjs().toDate(), }) console.log( `[OCPP] ${chargePointIdentifier} connected` + (remoteAddr ? ` from ${remoteAddr}` : ''), ) }, async onMessage(evt: MessageEvent, ws: WSContext) { let uniqueId = '(unknown)' try { const current = ocppConnections.get(chargePointIdentifier) if (current) { current.lastMessageAt = new Date() } const raw = evt.data if (typeof raw !== 'string') return let message: OcppMessage try { message = JSON.parse(raw) as OcppMessage } catch { sendCallError(ws, uniqueId, 'FormationViolation', 'Invalid JSON') return } if (!Array.isArray(message) || message.length < 3) { sendCallError(ws, uniqueId, 'FormationViolation', 'Message must be a JSON array') return } const [messageType, msgUniqueId] = message uniqueId = String(msgUniqueId) if (messageType === OCPP_MESSAGE_TYPE.CALLRESULT) { const [, responseUniqueId, payload] = message as OcppCallResultMessage const pending = pendingCalls.get(responseUniqueId) if (!pending) return clearTimeout(pending.timeout) pendingCalls.delete(responseUniqueId) await updateTransportState(pending.chargePointIdentifier, { transportStatus: getCommandChannelStatus(pending.chargePointIdentifier), lastCommandStatus: 'Accepted', lastCommandAt: dayjs().toDate(), }) pending.resolve(payload) return } if (messageType === OCPP_MESSAGE_TYPE.CALLERROR) { const [, responseUniqueId, errorCode, errorDescription] = message as OcppCallErrorMessage const pending = pendingCalls.get(responseUniqueId) if (!pending) return clearTimeout(pending.timeout) pendingCalls.delete(responseUniqueId) await updateTransportState(pending.chargePointIdentifier, { transportStatus: getCommandChannelStatus(pending.chargePointIdentifier), lastCommandStatus: errorCode === 'InternalError' ? 'Error' : 'Rejected', lastCommandAt: dayjs().toDate(), }) pending.reject(new Error(`${errorCode}:${errorDescription}`)) return } if (messageType !== OCPP_MESSAGE_TYPE.CALL) return const [, , action, payload] = message as OcppCall // Enforce BootNotification before any other action if (!ctx.isRegistered && action !== 'BootNotification') { sendCallError( ws, uniqueId, 'SecurityError', 'Charge point must send BootNotification first', ) return } const handler = actionHandlers[action as keyof ActionHandlerMap] if (!handler) { sendCallError(ws, uniqueId, 'NotImplemented', `Action '${action}' is not implemented`) return } const response = await ( handler as (payload: unknown, ctx: OcppConnectionContext) => Promise )(payload, ctx) sendCallResult(ws, uniqueId, response) } catch (err) { console.error(`[OCPP] Error handling message from ${chargePointIdentifier} (uniqueId=${uniqueId}):`, err) sendCallError(ws, uniqueId, 'InternalError', 'Internal server error') } }, async onClose(evt: CloseEvent, _ws: WSContext) { const current = ocppConnections.get(chargePointIdentifier) if (current?.sessionId === sessionId) { ocppConnections.delete(chargePointIdentifier) await updateTransportState(chargePointIdentifier, { transportStatus: 'offline', lastWsDisconnectedAt: dayjs().toDate(), }) } console.log(`[OCPP] ${chargePointIdentifier} disconnected (code=${evt.code})`) }, } }