Compare commits

2 Commits

5 changed files with 48 additions and 107 deletions

View File

@@ -85,12 +85,8 @@ app.get(
'/ocpp/:chargePointId', '/ocpp/:chargePointId',
upgradeWebSocket((c) => { upgradeWebSocket((c) => {
const chargePointId = c.req.param('chargePointId') const chargePointId = c.req.param('chargePointId')
if (!chargePointId) {
throw new Error('Missing chargePointId route param')
}
const connInfo = getConnInfo(c) const connInfo = getConnInfo(c)
const requestedProtocol = c.req.header('sec-websocket-protocol') return createOcppHandler(chargePointId, connInfo.remote.address)
return createOcppHandler(chargePointId, connInfo.remote.address, requestedProtocol)
}), }),
) )

View File

@@ -9,8 +9,6 @@ export const useDrizzle = () => {
if (!pgPoolInstance || !drizzleInstance) { if (!pgPoolInstance || !drizzleInstance) {
pgPoolInstance = new Pool({ pgPoolInstance = new Pool({
connectionString: process.env.DATABASE_CONNECTION_STRING, connectionString: process.env.DATABASE_CONNECTION_STRING,
connectionTimeoutMillis: 3000,
idleTimeoutMillis: 10000,
}) })
drizzleInstance = drizzle({ client: pgPoolInstance, schema }) drizzleInstance = drizzle({ client: pgPoolInstance, schema })
} }

View File

@@ -8,29 +8,14 @@ import type {
} from '../types.ts' } from '../types.ts'
const DEFAULT_HEARTBEAT_INTERVAL = 60 const DEFAULT_HEARTBEAT_INTERVAL = 60
const BOOT_NOTIFICATION_DB_TIMEOUT_MS = 3000
function withTimeout<T>(promise: Promise<T>, timeoutMs: number, label: string) {
return Promise.race<T>([
promise,
new Promise<T>((_, reject) => {
setTimeout(() => reject(new Error(`${label} timed out after ${timeoutMs}ms`)), timeoutMs)
}),
])
}
export async function handleBootNotification( export async function handleBootNotification(
payload: BootNotificationRequest, payload: BootNotificationRequest,
ctx: OcppConnectionContext, ctx: OcppConnectionContext,
): Promise<BootNotificationResponse> { ): Promise<BootNotificationResponse> {
const db = useDrizzle() const db = useDrizzle()
const currentTime = dayjs().toISOString()
console.log(`[OCPP] BootNotification ${ctx.chargePointIdentifier} received`) const [cp] = await db
try {
const [cp] = await withTimeout(
db
.insert(chargePoint) .insert(chargePoint)
.values({ .values({
id: crypto.randomUUID(), id: crypto.randomUUID(),
@@ -65,10 +50,7 @@ export async function handleBootNotification(
updatedAt: dayjs().toDate(), updatedAt: dayjs().toDate(),
}, },
}) })
.returning(), .returning()
BOOT_NOTIFICATION_DB_TIMEOUT_MS,
`BootNotification persistence for ${ctx.chargePointIdentifier}`,
)
const status = cp.registrationStatus const status = cp.registrationStatus
ctx.isRegistered = status === 'Accepted' ctx.isRegistered = status === 'Accepted'
@@ -76,18 +58,8 @@ export async function handleBootNotification(
console.log(`[OCPP] BootNotification ${ctx.chargePointIdentifier} status=${status}`) console.log(`[OCPP] BootNotification ${ctx.chargePointIdentifier} status=${status}`)
return { return {
currentTime, currentTime: dayjs().toISOString(),
interval: DEFAULT_HEARTBEAT_INTERVAL, interval: DEFAULT_HEARTBEAT_INTERVAL,
status, status,
} }
} catch (error) {
ctx.isRegistered = false
console.error(`[OCPP] BootNotification ${ctx.chargePointIdentifier} persistence failed:`, error)
return {
currentTime,
interval: DEFAULT_HEARTBEAT_INTERVAL,
status: 'Pending',
}
}
} }

View File

@@ -92,26 +92,6 @@ function sendCallError(
) )
} }
function pickOcppSubprotocol(
negotiatedProtocol?: string | null,
requestedHeader?: string | null,
) {
if (negotiatedProtocol && isSupportedOCPP(negotiatedProtocol)) {
return negotiatedProtocol
}
if (!requestedHeader) {
return null
}
const requestedProtocols = requestedHeader
.split(',')
.map((value) => value.trim())
.filter(Boolean)
return requestedProtocols.find((protocol) => isSupportedOCPP(protocol)) ?? null
}
/** /**
* Factory that produces a hono-ws event handler object for a single * Factory that produces a hono-ws event handler object for a single
* OCPP WebSocket connection. * OCPP WebSocket connection.
@@ -119,11 +99,7 @@ function pickOcppSubprotocol(
* Usage in route: * Usage in route:
* upgradeWebSocket((c) => createOcppHandler(c.req.param('chargePointId'), remoteAddr)) * upgradeWebSocket((c) => createOcppHandler(c.req.param('chargePointId'), remoteAddr))
*/ */
export function createOcppHandler( export function createOcppHandler(chargePointIdentifier: string, remoteAddr?: string) {
chargePointIdentifier: string,
remoteAddr?: string,
requestedProtocolHeader?: string,
) {
const ctx: OcppConnectionContext = { const ctx: OcppConnectionContext = {
chargePointIdentifier, chargePointIdentifier,
isRegistered: false, isRegistered: false,
@@ -131,14 +107,14 @@ export function createOcppHandler(
return { return {
onOpen(_evt: Event, ws: WSContext) { onOpen(_evt: Event, ws: WSContext) {
const subProtocol = pickOcppSubprotocol(ws.protocol, requestedProtocolHeader) const subProtocol = ws.protocol ?? 'unknown'
if (!subProtocol) { if (!isSupportedOCPP(subProtocol)) {
ws.close(1002, 'Unsupported subprotocol') ws.close(1002, 'Unsupported subprotocol')
return return
} }
ocppConnections.set(chargePointIdentifier, ws) ocppConnections.set(chargePointIdentifier, ws)
console.log( console.log(
`[OCPP] ${chargePointIdentifier} connected (${subProtocol})` + `[OCPP] ${chargePointIdentifier} connected` +
(remoteAddr ? ` from ${remoteAddr}` : ''), (remoteAddr ? ` from ${remoteAddr}` : ''),
) )
}, },

View File

@@ -1,2 +1 @@
#define MG_ARCH MG_ARCH_ESP32 #define MG_ARCH MG_ARCH_ESP32
#define MG_TLS MG_TLS_MBED // Use ESP-IDF built-in mbedTLS for WSS support