diff --git a/packages/server-admin-ui/src/views/Webapps/Embedded.js b/packages/server-admin-ui/src/views/Webapps/Embedded.js index b61348fbc..518019de0 100644 --- a/packages/server-admin-ui/src/views/Webapps/Embedded.js +++ b/packages/server-admin-ui/src/views/Webapps/Embedded.js @@ -51,7 +51,7 @@ class Embedded extends Component { return r }), openWebsocket: (params) => { - const knownParams = ['subscribe', 'sendCachedValues'] + const knownParams = ['subscribe', 'sendCachedValues', 'events'] const queryParam = knownParams .map((p, i) => [i, params[p]]) .filter((x) => x[1] !== undefined) diff --git a/packages/streams/canboatjs.js b/packages/streams/canboatjs.js index 8e18f3955..209c28446 100644 --- a/packages/streams/canboatjs.js +++ b/packages/streams/canboatjs.js @@ -29,10 +29,12 @@ function CanboatJs(options) { this.fromPgn.on('warning', (pgn, warning) => { debug(`[warning] ${pgn.pgn} ${warning}`) + options.app.emit(`canboatjs:warning`, warning) }) this.fromPgn.on('error', (pgn, err) => { console.error(pgn.input, err.message) + options.app.emit(`canboatjs:error`, err) }) this.app = options.app @@ -48,12 +50,16 @@ CanboatJs.prototype._transform = function (chunk, encoding, done) { pgnData.timestamp = new Date(Number(chunk.timestamp)).toISOString() this.push(pgnData) this.app.emit(this.analyzerOutEvent, pgnData) + } else { + this.app.emit('canboatjs:unparsed:object', chunk) } } else { const pgnData = this.fromPgn.parse(chunk) if (pgnData) { this.push(pgnData) this.app.emit(this.analyzerOutEvent, pgnData) + } else { + this.app.emit('canboatjs:unparsed:data', chunk) } } done() diff --git a/src/events.ts b/src/events.ts new file mode 100644 index 000000000..9d6ac6730 --- /dev/null +++ b/src/events.ts @@ -0,0 +1,266 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { EventEmitter } from 'node:events' +import { createDebug } from './debug' +import { Debugger } from 'debug' +import { Brand } from '@signalk/server-api' + +export function startEvents( + app: any, + spark: any, + onEvent: (data: any) => void, + eventsFromQuery = '' +) { + const events = eventsFromQuery.split(',') + events.forEach((event) => { + app.on(event, (data: any) => onEvent({ event, data })) + spark.onDisconnects.push(() => app.removeListener(event, onEvent)) + }) +} + +export function startServerEvents(app: any, spark: any, onServerEvent: any) { + app.on('serverevent', onServerEvent) + spark.onDisconnects.push(() => { + app.removeListener('serverevent', onServerEvent) + }) + try { + spark.write({ + type: 'VESSEL_INFO', + data: { + name: app.config.vesselName, + mmsi: app.config.vesselMMSI, + uuid: app.config.vesselUUID + } + }) + } catch (e: any) { + if (e.code !== 'ENOENT') { + console.error(e) + } + } + Object.keys(app.lastServerEvents).forEach((propName) => { + spark.write(app.lastServerEvents[propName]) + }) + spark.write({ + type: 'DEBUG_SETTINGS', + data: app.logging.getDebugSettings() + }) + if (app.securityStrategy.canAuthorizeWS()) { + spark.write({ + type: 'RECEIVE_LOGIN_STATUS', + data: app.securityStrategy.getLoginStatus(spark.request) + }) + } + spark.write({ + type: 'SOURCEPRIORITIES', + data: app.config.settings.sourcePriorities || {} + }) +} + +type Handler = (...args: any[]) => void +interface EventMap { + [k: string]: Handler | Handler[] | undefined +} + +function safeApply( + handler: (this: T, ..._args: A) => void, + context: T, + args: A +): void { + try { + Reflect.apply(handler, context, args) + } catch (err) { + // Throw error after timeout so as not to interrupt the stack + setTimeout(() => { + throw err + }) + } +} + +function arrayClone(arr: T[]): T[] { + const n = arr.length + const copy = new Array(n) + for (let i = 0; i < n; i += 1) { + copy[i] = arr[i] + } + return copy +} + +export type EventName = Brand +export type EmitterId = Brand +export type ListenerId = Brand +export type EventsActorId = EmitterId & ListenerId + +export interface WrappedEmitter { + getEmittedCount: () => number + getEventRoutingData: () => { + events: { + event: string + emitters: any + listeners: any + }[] + } + + emit: (this: any, eventName: string, ...args: any[]) => boolean + addListener: ( + eventName: EventName, + listener: (...args: any[]) => void + ) => EventEmitter + + bindMethodsById: (eventsId: EventsActorId) => { + emit: (this: any, eventName: string, ...args: any[]) => boolean + addListener: ( + eventName: EventName, + listener: (...args: any[]) => void + ) => void + on: (eventName: EventName, listener: (...args: any[]) => void) => void + } +} + +export interface WithWrappedEmitter { + wrappedEmitter: WrappedEmitter +} + +export function wrapEmitter(targetEmitter: EventEmitter): WrappedEmitter { + const targetAddListener = targetEmitter.addListener.bind(targetEmitter) + + const eventDebugs: { [key: string]: Debugger } = {} + const eventsData: { + [eventName: EventName]: { + emitters: { + [emitterId: EmitterId]: number + } + listeners: { + [listenerId: ListenerId]: boolean + } + } + } = {} + + let emittedCount = 0 + + function safeEmit(this: any, eventName: string, ...args: any[]): boolean { + if (eventName !== 'serverlog') { + let eventDebug = eventDebugs[eventName] + if (!eventDebug) { + eventDebugs[eventName] = eventDebug = createDebug( + `signalk-server:events:${eventName}` + ) + } + if (eventDebug.enabled) { + //there is ever only one rest argument, outputting args results in a 1 element array + eventDebug(args[0]) + } + } + + // from https://github.com/MetaMask/safe-event-emitter/blob/main/index.t + let doError = eventName === 'error' + + const events: EventMap = (targetEmitter as any)._events + if (events !== undefined) { + doError = doError && events.error === undefined + } else if (!doError) { + return false + } + + // If there is no 'error' event listener then throw. + if (doError) { + let er + if (args.length > 0) { + ;[er] = args + } + if (er instanceof Error) { + // Note: The comments on the `throw` lines are intentional, they show + // up in Node's output if this results in an unhandled exception. + throw er // Unhandled 'error' event + } + // At least give some kind of context to the user + const err = new Error(`Unhandled error.${er ? ` (${er.message})` : ''}`) + + ;(err as any).context = er + throw err // Unhandled 'error' event + } + + const handler = events[eventName] + + if (handler === undefined) { + return false + } + + emittedCount++ + if (typeof handler === 'function') { + safeApply(handler, this, args) + } else { + const len = handler.length + const listeners = arrayClone(handler) + for (let i = 0; i < len; i += 1) { + safeApply(listeners[i], this, args) + } + } + + return true + } + + function emitWithEmitterId( + emitterId: EmitterId, + eventName: EventName, + ...args: any[] + ) { + const emittersForEvent = ( + eventsData[eventName] ?? + (eventsData[eventName] = { emitters: {}, listeners: {} }) + ).emitters + if (!emittersForEvent[emitterId]) { + emittersForEvent[emitterId] = 0 + } + emittersForEvent[emitterId]++ + safeEmit(`${emitterId}:${eventName}`, ...args) + return safeEmit(eventName, ...args) + } + + const addListenerWithId = function ( + listenerId: ListenerId, + eventName: EventName, + listener: (...args: any[]) => void + ) { + const listenersForEvent = ( + eventsData[eventName] ?? + (eventsData[eventName] = { emitters: {}, listeners: {} }) + ).listeners + if (!listenersForEvent[listenerId]) { + listenersForEvent[listenerId] = true + } + return targetAddListener(eventName, listener) + } + + return { + getEmittedCount: () => emittedCount, + getEventRoutingData: () => ({ + events: Object.entries(eventsData).map(([event, data]) => ({ + event, + ...data + })) + }), + + emit: function (this: any, eventName: string, ...args: any[]): boolean { + return emitWithEmitterId( + 'NO_EMITTER_ID' as EmitterId, + eventName as EventName, + ...args + ) + }, + addListener: (eventName: EventName, listener: (...args: any[]) => void) => + addListenerWithId('NO_LISTENER_ID' as ListenerId, eventName, listener), + + bindMethodsById: (actorId: EventsActorId) => { + const addListener = ( + eventName: EventName, + listener: (...args: any[]) => void + ) => addListenerWithId(actorId, eventName, listener) + return { + emit: function (this: any, eventName: string, ...args: any[]): boolean { + return emitWithEmitterId(actorId, eventName as EventName, ...args) + }, + addListener, + on: addListener + } + } + } +} diff --git a/src/index.ts b/src/index.ts index b7f3f575e..843e66544 100644 --- a/src/index.ts +++ b/src/index.ts @@ -32,7 +32,6 @@ import { Update } from '@signalk/server-api' import { FullSignalK, getSourceId } from '@signalk/signalk-schema' -import { Debugger } from 'debug' import express, { IRouter, Request, Response } from 'express' import http from 'http' import https from 'https' @@ -60,6 +59,8 @@ import SubscriptionManager from './subscriptionmanager' import { PluginId, PluginManager } from './interfaces/plugins' import { OpenApiDescription, OpenApiRecord } from './api/swagger' import { WithProviderStatistics } from './deltastats' +import { pipedProviders } from './pipedproviders' +import { EventsActorId, WithWrappedEmitter, wrapEmitter } from './events' const debug = createDebug('signalk-server') const { StreamBundle } = require('./streambundle') @@ -76,7 +77,8 @@ class Server { PluginManager & WithSecurityStrategy & IRouter & - WithProviderStatistics + WithProviderStatistics & + WithWrappedEmitter constructor(opts: ServerOptions) { const FILEUPLOADSIZELIMIT = process.env.FILEUPLOADSIZELIMIT || '10mb' const bodyParser = require('body-parser') @@ -316,23 +318,10 @@ class Server { const self = this const app = this.app - const eventDebugs: { [key: string]: Debugger } = {} - const expressAppEmit = app.emit.bind(app) - app.emit = (eventName: string, ...args: any[]) => { - if (eventName !== 'serverlog') { - let eventDebug = eventDebugs[eventName] - if (!eventDebug) { - eventDebugs[eventName] = eventDebug = createDebug( - `signalk-server:events:${eventName}` - ) - } - if (eventDebug.enabled) { - eventDebug(args) - } - } - expressAppEmit(eventName, ...args) - return true - } + app.wrappedEmitter = wrapEmitter(app) + app.emit = app.wrappedEmitter.emit + app.on = app.wrappedEmitter.addListener as any + app.addListener = app.wrappedEmitter.addListener as any this.app.intervals = [] @@ -432,7 +421,7 @@ class Server { await startApis(app) startInterfaces(app) startMdns(app) - app.providers = require('./pipedproviders')(app).start() + app.providers = pipedProviders(app as any).start() const primaryPort = getPrimaryPort(app) debug(`primary port:${primaryPort}`) @@ -599,7 +588,7 @@ function startMdns(app: ServerApp & WithConfig) { } } -function startInterfaces(app: ServerApp & WithConfig) { +function startInterfaces(app: ServerApp & WithConfig & WithWrappedEmitter) { debug('Interfaces config:' + JSON.stringify(app.config.settings.interfaces)) const availableInterfaces = require('./interfaces') _.forIn(availableInterfaces, (theInterface: any, name: string) => { @@ -609,14 +598,34 @@ function startInterfaces(app: ServerApp & WithConfig) { (app.config.settings.interfaces || {})[name] ) { debug(`Loading interface '${name}'`) - app.interfaces[name] = theInterface(app) - if (app.interfaces[name] && _.isFunction(app.interfaces[name].start)) { + const boundEventMethods = app.wrappedEmitter.bindMethodsById( + `interface:${name}` as EventsActorId + ) + + const appCopy = { + ...app, + ...boundEventMethods + } + const handler = { + set(obj: any, prop: any, value: any) { + ;(app as any)[prop] = value + return true + }, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + get(target: any, prop: string | symbol, receiver: any) { + return (app as any)[prop] + } + } + const _interface = (appCopy.interfaces[name] = theInterface( + new Proxy(appCopy, handler) + )) + if (_interface && _.isFunction(_interface.start)) { if ( - _.isUndefined(app.interfaces[name].forceInactive) || - !app.interfaces[name].forceInactive + _.isUndefined(_interface.forceInactive) || + !_interface.forceInactive ) { debug(`Starting interface '${name}'`) - app.interfaces[name].data = app.interfaces[name].start() + _interface.data = _interface.start() } else { debug(`Not starting interface '${name}' by forceInactive`) } diff --git a/src/interfaces/plugins.ts b/src/interfaces/plugins.ts index 5465f7db9..006f63931 100644 --- a/src/interfaces/plugins.ts +++ b/src/interfaces/plugins.ts @@ -17,12 +17,12 @@ */ import { Brand, + PointDestination, PropertyValues, PropertyValuesCallback, ResourceProvider, - ServerAPI, - PointDestination, - RouteDestination + RouteDestination, + ServerAPI } from '@signalk/server-api' // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore @@ -31,19 +31,20 @@ import express, { Request, Response } from 'express' import fs from 'fs' import _ from 'lodash' import path from 'path' -import { ResourcesApi } from '../api/resources' import { CourseApi } from '../api/course' +import { ResourcesApi } from '../api/resources' import { SERVERROUTESPREFIX } from '../constants' import { createDebug } from '../debug' import { listAllSerialPorts } from '../serialports' const debug = createDebug('signalk-server:interfaces:plugins') -import { modulesWithKeyword } from '../modules' import { OpenApiDescription, OpenApiRecord } from '../api/swagger' import { CONNECTION_WRITE_EVENT_NAME, ConnectionWriteEvent } from '../deltastats' +import { EventsActorId } from '../events' +import { modulesWithKeyword } from '../modules' const put = require('../put') const _putPath = put.putPath @@ -568,6 +569,11 @@ module.exports = (theApp: any) => { } appCopy.handleMessage = handleMessageWrapper(app, plugin.id) + const boundEventMethods = (app as any).wrappedEmitter.bindMethodsById( + `plugin:${plugin.id}` as EventsActorId + ) + _.assign(appCopy, boundEventMethods) + appCopy.savePluginOptions = (configuration, cb) => { savePluginOptions( plugin.id, diff --git a/src/interfaces/ws.js b/src/interfaces/ws.js index f89e3e4a7..a1a0c5b02 100644 --- a/src/interfaces/ws.js +++ b/src/interfaces/ws.js @@ -26,6 +26,7 @@ const { const { putPath } = require('../put') import { createDebug } from '../debug' import { JsonWebTokenError, TokenExpiredError } from 'jsonwebtoken' +import { startEvents, startServerEvents } from '../events' const debug = createDebug('signalk-server:interfaces:ws') const debugConnection = createDebug('signalk-server:interfaces:ws:connections') const Primus = require('primus') @@ -678,7 +679,20 @@ function handleRealtimeConnection(app, spark, onChange) { if (spark.query.serverevents === 'all') { spark.hasServerEvents = true - startServerEvents(app, spark) + startServerEvents( + app, + spark, + wrapWithverifyWS(app.securityStrategy, spark, spark.write.bind(spark)) + ) + } + + if (spark.query.events) { + startEvents( + app, + spark, + wrapWithverifyWS(app.securityStrategy, spark, spark.write.bind(spark)), + spark.query.events + ) } } @@ -698,49 +712,6 @@ function sendLatestDeltas(app, deltaCache, selfContext, spark) { }) } -function startServerEvents(app, spark) { - const onServerEvent = wrapWithverifyWS( - app.securityStrategy, - spark, - spark.write.bind(spark) - ) - app.on('serverevent', onServerEvent) - spark.onDisconnects.push(() => { - app.removeListener('serverevent', onServerEvent) - }) - try { - spark.write({ - type: 'VESSEL_INFO', - data: { - name: app.config.vesselName, - mmsi: app.config.vesselMMSI, - uuid: app.config.vesselUUID - } - }) - } catch (e) { - if (e.code !== 'ENOENT') { - console.error(e) - } - } - Object.keys(app.lastServerEvents).forEach((propName) => { - spark.write(app.lastServerEvents[propName]) - }) - spark.write({ - type: 'DEBUG_SETTINGS', - data: app.logging.getDebugSettings() - }) - if (app.securityStrategy.canAuthorizeWS()) { - spark.write({ - type: 'RECEIVE_LOGIN_STATUS', - data: app.securityStrategy.getLoginStatus(spark.request) - }) - } - spark.write({ - type: 'SOURCEPRIORITIES', - data: app.config.settings.sourcePriorities || {} - }) -} - function startServerLog(app, spark) { const onServerLogEvent = wrapWithverifyWS( app.securityStrategy, diff --git a/src/pipedproviders.ts b/src/pipedproviders.ts index 4ccf22ea6..e39cac208 100644 --- a/src/pipedproviders.ts +++ b/src/pipedproviders.ts @@ -15,10 +15,11 @@ */ import { PropertyValues, PropertyValuesCallback } from '@signalk/server-api' -import { createDebug } from './debug' import _ from 'lodash' import { Duplex, Writable } from 'stream' import { SignalKMessageHub, WithConfig } from './app' +import { createDebug } from './debug' +import { EventsActorId, WithWrappedEmitter } from './events' class DevNull extends Writable { constructor() { @@ -59,9 +60,10 @@ interface PipedProviderConfig { class PipedProvider {} -module.exports = function ( +export function pipedProviders( app: SignalKMessageHub & - WithConfig & { + WithConfig & + WithWrappedEmitter & { propertyValues: PropertyValues setProviderError: (providerId: string, msg: string) => void } @@ -78,11 +80,15 @@ module.exports = function ( }) // eslint-disable-next-line @typescript-eslint/no-explicit-any const onPropertyValues = (name: string, cb: (value: any) => void) => - app.propertyValues.onPropertyValues(name, cb) + propertyValues.onPropertyValues(name, cb) + const boundEventMethods = app.wrappedEmitter.bindMethodsById( + `connection:${providerConfig.id}` as EventsActorId + ) const appFacade = { emitPropertyValue, onPropertyValues, ...sanitizedApp, + ...boundEventMethods, toJSON: () => 'appFacade' } diff --git a/src/security.ts b/src/security.ts index e63742647..0154c24e2 100644 --- a/src/security.ts +++ b/src/security.ts @@ -191,6 +191,8 @@ export interface SecurityStrategy { source: any, path: string ) => boolean + + addAdminMiddleware: (path: string) => void } export class InvalidTokenError extends Error { diff --git a/src/serverroutes.ts b/src/serverroutes.ts index c7689cf85..88f45104a 100644 --- a/src/serverroutes.ts +++ b/src/serverroutes.ts @@ -50,6 +50,7 @@ import { } from './security' import { listAllSerialPorts } from './serialports' import { StreamBundle } from './types' +import { WithWrappedEmitter } from './events' const readdir = util.promisify(fs.readdir) const debug = createDebug('signalk-server:serverroutes') // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -73,7 +74,8 @@ interface App WithSecurityStrategy, ConfigApp, IRouter, - PluginManager { + PluginManager, + WithWrappedEmitter { webapps: Package[] logging: { rememberDebug: (r: boolean) => void @@ -835,6 +837,17 @@ module.exports = function ( } ) + app.securityStrategy.addAdminMiddleware( + `${SERVERROUTESPREFIX}/eventsRoutingData` + ) + app.get( + `${SERVERROUTESPREFIX}/eventsRoutingData`, + (req: Request, res: Response) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + res.json(app.wrappedEmitter.getEventRoutingData()) + } + ) + app.get( `${SERVERROUTESPREFIX}/serialports`, (req: Request, res: Response, next: NextFunction) => {