Files
helios-evcs/apps/csms/src/ocpp/handler.ts

366 lines
12 KiB
TypeScript

import type { WSContext } from 'hono/ws'
import dayjs from 'dayjs'
import { eq } from 'drizzle-orm'
import { isSupportedOCPP } from '@/constants.js'
import { useDrizzle } from '@/lib/db.js'
import { chargePoint } from '@/db/schema.js'
import {
OCPP_MESSAGE_TYPE,
type OcppCall,
type OcppCallErrorMessage,
type OcppCallResultMessage,
type OcppErrorCode,
type OcppMessage,
type OcppConnectionContext,
type CommandChannelStatus,
type AuthorizeRequest,
type AuthorizeResponse,
type BootNotificationRequest,
type BootNotificationResponse,
type HeartbeatRequest,
type HeartbeatResponse,
type MeterValuesRequest,
type MeterValuesResponse,
type StartTransactionRequest,
type StartTransactionResponse,
type StatusNotificationRequest,
type StatusNotificationResponse,
type StopTransactionRequest,
type StopTransactionResponse,
} from './types.ts'
/**
* Global registry of active OCPP WebSocket connections.
* Key = chargePointIdentifier, Value = connection entry
*/
export type OcppConnectionEntry = {
ws: WSContext
sessionId: string
openedAt: Date
lastMessageAt: Date
}
type PendingCall =
| {
chargePointIdentifier: string
action: string
resolve: (payload: unknown) => void
reject: (error: Error) => void
timeout: ReturnType<typeof setTimeout>
}
export const ocppConnections = new Map<string, OcppConnectionEntry>()
const pendingCalls = new Map<string, PendingCall>()
import { handleAuthorize } from './actions/authorize.ts'
import { handleBootNotification } from './actions/boot-notification.ts'
import { handleHeartbeat } from './actions/heartbeat.ts'
import { handleMeterValues } from './actions/meter-values.ts'
import { handleStartTransaction } from './actions/start-transaction.ts'
import { handleStatusNotification } from './actions/status-notification.ts'
import { handleStopTransaction } from './actions/stop-transaction.ts'
// Typed dispatch map — only registered actions are accepted
type ActionHandlerMap = {
Authorize: (
payload: AuthorizeRequest,
ctx: OcppConnectionContext,
) => Promise<AuthorizeResponse>
BootNotification: (
payload: BootNotificationRequest,
ctx: OcppConnectionContext,
) => Promise<BootNotificationResponse>
Heartbeat: (
payload: HeartbeatRequest,
ctx: OcppConnectionContext,
) => Promise<HeartbeatResponse>
MeterValues: (
payload: MeterValuesRequest,
ctx: OcppConnectionContext,
) => Promise<MeterValuesResponse>
StartTransaction: (
payload: StartTransactionRequest,
ctx: OcppConnectionContext,
) => Promise<StartTransactionResponse>
StatusNotification: (
payload: StatusNotificationRequest,
ctx: OcppConnectionContext,
) => Promise<StatusNotificationResponse>
StopTransaction: (
payload: StopTransactionRequest,
ctx: OcppConnectionContext,
) => Promise<StopTransactionResponse>
}
const actionHandlers: ActionHandlerMap = {
Authorize: handleAuthorize,
BootNotification: handleBootNotification,
Heartbeat: handleHeartbeat,
MeterValues: handleMeterValues,
StartTransaction: handleStartTransaction,
StatusNotification: handleStatusNotification,
StopTransaction: handleStopTransaction,
}
function sendCallResult(ws: WSContext, uniqueId: string, payload: unknown): void {
ws.send(JSON.stringify([OCPP_MESSAGE_TYPE.CALLRESULT, uniqueId, payload]))
}
function sendCallError(
ws: WSContext,
uniqueId: string,
errorCode: OcppErrorCode,
errorDescription: string,
): void {
ws.send(
JSON.stringify([OCPP_MESSAGE_TYPE.CALLERROR, uniqueId, errorCode, errorDescription, {}]),
)
}
async function updateTransportState(
chargePointIdentifier: string,
values: Partial<typeof chargePoint.$inferInsert>,
): Promise<void> {
const db = useDrizzle()
await db
.update(chargePoint)
.set({
...values,
updatedAt: dayjs().toDate(),
})
.where(eq(chargePoint.chargePointIdentifier, chargePointIdentifier))
}
async function getRegistrationStatus(chargePointIdentifier: string) {
const db = useDrizzle()
const [cp] = await db
.select({ registrationStatus: chargePoint.registrationStatus })
.from(chargePoint)
.where(eq(chargePoint.chargePointIdentifier, chargePointIdentifier))
.limit(1)
return cp?.registrationStatus ?? null
}
function getCommandChannelStatus(chargePointIdentifier: string): CommandChannelStatus {
return ocppConnections.has(chargePointIdentifier) ? 'online' : 'unavailable'
}
export async function sendOcppCall<TPayload extends Record<string, unknown>, TResult = unknown>(
chargePointIdentifier: string,
action: string,
payload: TPayload,
timeoutOrOptions: number | { timeoutMs?: number; uniqueId?: string } = 15000,
): Promise<TResult> {
const entry = ocppConnections.get(chargePointIdentifier)
if (!entry) {
await updateTransportState(chargePointIdentifier, { transportStatus: 'unavailable' })
throw new Error('TransportUnavailable')
}
const timeoutMs =
typeof timeoutOrOptions === 'number' ? timeoutOrOptions : (timeoutOrOptions.timeoutMs ?? 15000)
const uniqueId =
typeof timeoutOrOptions === 'number' ? crypto.randomUUID() : (timeoutOrOptions.uniqueId ?? crypto.randomUUID())
const resultPromise = new Promise<TResult>((resolve, reject) => {
const timeout = setTimeout(async () => {
pendingCalls.delete(uniqueId)
await updateTransportState(chargePointIdentifier, {
transportStatus: getCommandChannelStatus(chargePointIdentifier),
lastCommandStatus: 'Timeout',
lastCommandAt: dayjs().toDate(),
})
reject(new Error('CommandTimeout'))
}, timeoutMs)
pendingCalls.set(uniqueId, {
chargePointIdentifier,
action,
resolve: (response) => resolve(response as TResult),
reject,
timeout,
})
})
try {
entry.ws.send(JSON.stringify([OCPP_MESSAGE_TYPE.CALL, uniqueId, action, payload]))
} catch (error) {
const pending = pendingCalls.get(uniqueId)
if (pending) {
clearTimeout(pending.timeout)
pendingCalls.delete(uniqueId)
}
await updateTransportState(chargePointIdentifier, {
transportStatus: 'unavailable',
lastCommandStatus: 'Error',
lastCommandAt: dayjs().toDate(),
})
throw error instanceof Error ? error : new Error('CommandSendFailed')
}
return resultPromise
}
/**
* Factory that produces a hono-ws event handler object for a single
* OCPP WebSocket connection.
*
* Usage in route:
* upgradeWebSocket((c) => createOcppHandler(c.req.param('chargePointId'), remoteAddr))
*/
export function createOcppHandler(chargePointIdentifier: string, remoteAddr?: string) {
const ctx: OcppConnectionContext = {
chargePointIdentifier,
isRegistered: false,
}
const sessionId = crypto.randomUUID()
return {
async onOpen(_evt: Event, ws: WSContext) {
const subProtocol = ws.protocol ?? 'unknown'
if (!isSupportedOCPP(subProtocol)) {
ws.close(1002, 'Unsupported subprotocol')
return
}
const registrationStatus = await getRegistrationStatus(chargePointIdentifier)
ctx.isRegistered = registrationStatus === 'Accepted'
const previous = ocppConnections.get(chargePointIdentifier)
if (previous && previous.sessionId !== sessionId) {
try {
previous.ws.close(1012, 'Replaced by newer connection')
} catch {
// Ignore close race when the old socket is already gone.
}
}
ocppConnections.set(chargePointIdentifier, {
ws,
sessionId,
openedAt: new Date(),
lastMessageAt: new Date(),
})
await updateTransportState(chargePointIdentifier, {
transportStatus: 'online',
connectionSessionId: sessionId,
lastWsConnectedAt: dayjs().toDate(),
})
console.log(
`[OCPP] ${chargePointIdentifier} connected` +
(remoteAddr ? ` from ${remoteAddr}` : ''),
)
if (previous && previous.sessionId !== sessionId) {
console.log(`[OCPP] ${chargePointIdentifier} replaced previous connection`)
}
},
async onMessage(evt: MessageEvent, ws: WSContext) {
let uniqueId = '(unknown)'
try {
const current = ocppConnections.get(chargePointIdentifier)
if (!current || current.sessionId !== sessionId) {
try {
ws.close(1008, 'Stale connection')
} catch {
// Ignore close errors on stale sockets.
}
return
}
current.lastMessageAt = new Date()
const raw = evt.data
if (typeof raw !== 'string') return
let message: OcppMessage
try {
message = JSON.parse(raw) as OcppMessage
} catch {
sendCallError(ws, uniqueId, 'FormationViolation', 'Invalid JSON')
return
}
if (!Array.isArray(message) || message.length < 3) {
sendCallError(ws, uniqueId, 'FormationViolation', 'Message must be a JSON array')
return
}
const [messageType, msgUniqueId] = message
uniqueId = String(msgUniqueId)
if (messageType === OCPP_MESSAGE_TYPE.CALLRESULT) {
const [, responseUniqueId, payload] = message as OcppCallResultMessage
const pending = pendingCalls.get(responseUniqueId)
if (!pending) return
clearTimeout(pending.timeout)
pendingCalls.delete(responseUniqueId)
await updateTransportState(pending.chargePointIdentifier, {
transportStatus: getCommandChannelStatus(pending.chargePointIdentifier),
lastCommandStatus: 'Accepted',
lastCommandAt: dayjs().toDate(),
})
pending.resolve(payload)
return
}
if (messageType === OCPP_MESSAGE_TYPE.CALLERROR) {
const [, responseUniqueId, errorCode, errorDescription] = message as OcppCallErrorMessage
const pending = pendingCalls.get(responseUniqueId)
if (!pending) return
clearTimeout(pending.timeout)
pendingCalls.delete(responseUniqueId)
await updateTransportState(pending.chargePointIdentifier, {
transportStatus: getCommandChannelStatus(pending.chargePointIdentifier),
lastCommandStatus: errorCode === 'InternalError' ? 'Error' : 'Rejected',
lastCommandAt: dayjs().toDate(),
})
pending.reject(new Error(`${errorCode}:${errorDescription}`))
return
}
if (messageType !== OCPP_MESSAGE_TYPE.CALL) return
const [, , action, payload] = message as OcppCall
// Enforce BootNotification before any other action
if (!ctx.isRegistered && action !== 'BootNotification') {
sendCallError(
ws,
uniqueId,
'SecurityError',
'Charge point must send BootNotification first',
)
return
}
const handler = actionHandlers[action as keyof ActionHandlerMap]
if (!handler) {
sendCallError(ws, uniqueId, 'NotImplemented', `Action '${action}' is not implemented`)
return
}
const response = await (
handler as (payload: unknown, ctx: OcppConnectionContext) => Promise<unknown>
)(payload, ctx)
sendCallResult(ws, uniqueId, response)
} catch (err) {
console.error(`[OCPP] Error handling message from ${chargePointIdentifier} (uniqueId=${uniqueId}):`, err)
sendCallError(ws, uniqueId, 'InternalError', 'Internal server error')
}
},
async onClose(evt: CloseEvent, _ws: WSContext) {
const current = ocppConnections.get(chargePointIdentifier)
if (current?.sessionId === sessionId) {
ocppConnections.delete(chargePointIdentifier)
await updateTransportState(chargePointIdentifier, {
transportStatus: 'offline',
lastWsDisconnectedAt: dayjs().toDate(),
})
}
console.log(`[OCPP] ${chargePointIdentifier} disconnected (code=${evt.code})`)
},
}
}