diff --git a/.gitignore b/.gitignore index 2b0331f..8169a87 100755 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,4 @@ yarn-error.log !.yarn/plugins !.yarn/releases !.yarn/sdks -!.yarn/versions +!.yarn/versions \ No newline at end of file diff --git a/README.md b/README.md index 4268154..fc52d96 100644 --- a/README.md +++ b/README.md @@ -7,15 +7,14 @@ It has been tested with _Lawo Ruby_, _Lawo R3lay_, and _Lawo MxGUI_. The current version is very losely based on the original library and Mr Gilles Dufour's rewrites. It is however rewritten almost completely from scratch and bears little to no resemblance to earlier libraries. ### Repository-specific Info for Developers -* [Developer Info](DEVELOPER.md) -* [Contribution Guidelines](CONTRIBUTING.md) - -### General Sofie System Info -* [Documentation](https://nrkno.github.io/sofie-core/) -* [Releases](https://nrkno.github.io/sofie-core/releases) +- [Developer Info](DEVELOPER.md) +- [Contribution Guidelines](CONTRIBUTING.md) +### General Sofie System Info +- [Documentation](https://nrkno.github.io/sofie-core/) +- [Releases](https://nrkno.github.io/sofie-core/releases) --- @@ -26,24 +25,36 @@ The current version is very losely based on the original library and Mr Gilles D Get Full tree: ```javascript -const { EmberClient } = require('emberplus-connection'); -const client = new EmberClient("10.9.8.7", 9000); -client.on("error", e => { - console.log(e); -}); +const { EmberClient, StreamManager } = require('emberplus-connection') +const client = new EmberClient('10.9.8.7', 9000) +client.on('error', (e) => { + console.log(e) +}) await client.connect() + +// If you want to listen to stream updates - you can do it like this: +client.on('streamUpdate', (internalNodePath, value) => { + console.log('Stream Update:', { + path: internalNodePath, + value: value, + }) + // You can get the internal node path, the internal path can be different from the path you requested, + // depending on wheter you request a numbered node or via the description + // the client has a client.getInternalNodePath(node) that you can request and use as reference when subsribing to a node +}) + // Get Root info const req = await client.getDirectory(client.tree) await req.response // Get a Specific Node -const node = await client.getElementByPath("0.0.2") -console.log(node); +const node = await client.getElementByPath('0.0.2') +console.log(node) // Get a node by its path identifiers -const node2 = await client.getElementByPath("path.to.node")) -console.log(node2); +const node2 = await client.getElementByPath('path.to.node') +console.log(node2) // Get a node by its path descriptions -const node3 = await client.getElementByPath("descr1.descr2.descr3")) -console.log(node3); +const node3 = await client.getElementByPath('descr1.descr2.descr3') +console.log(node3) // Expand entire tree under node 0 await client.expand(client.tree) console.log(client.tree) @@ -70,6 +81,10 @@ client }) .then(() => client.getElementByPath('0.2')) .then(async (node) => { + // You can get the internal node path, the internal path can be different from the requested, + // depending on wheter you request a numbered node or via the description + console.log('This is the internal node path :', client.getInternalNodePath(node)) + // For non-streams a getDirectory will automatically subscribe for update return ( await client.getDirectory(node, (update) => { @@ -84,6 +99,12 @@ client console.log(update) }) ) +client.on('streamUpdate', (internalNodePath, value) => { + console.log('Stream Update:', { + path: internalNodePath, + value: value, + }) +}) ``` ### Setting New Value @@ -91,7 +112,9 @@ client ```javascript client = new EmberClient(LOCALHOST, PORT) await client.connect() -await (await client.getDirectory()).response +await ( + await client.getDirectory() +).response const req = await client.setValue(await client.getElementByPath('0.0.1'), 'gdnet') await req.response console.log('result', req.response) @@ -107,7 +130,9 @@ const { EmberClient, EmberLib } = require('node-emberplus') const client = new EmberClient(HOST, PORT) await client.connect() -await (await client.getDirectory()).response +await ( + await client.getDirectory() +).response const fn = await client.getElementByPath('path.to.function') const req = await client.invoke(fn, 1, 2, 3) console.log('result', await req.response) @@ -126,7 +151,7 @@ const { ParameterAccess, MatrixImpl, MatrixType, - MatrixAddressingMode + MatrixAddressingMode, } = require('emberplus-connection') const s = new EmberServer(9000) // start server on port 9000 @@ -187,14 +212,14 @@ const tree = { undefined, ParameterAccess.ReadWrite ) - ) + ), }), 2: new NumberedTreeNodeImpl(2, new EmberNodeImpl('Functions', undefined, undefined, true), { 1: new NumberedTreeNodeImpl( 1, new EmberFunctionImpl(undefined, undefined) //, [{ type: ParameterType.Boolean, name: 'Test' }]) - ) + ), }), 3: new NumberedTreeNodeImpl(3, new EmberNodeImpl('Matrices', undefined, undefined, true), { @@ -211,13 +236,14 @@ const tree = { 5, 5 ) - ) - }) - }) + ), + }), + }), } s.init(tree) // initiate the provider with the tree ``` + --- _The NRK logo is a registered trademark of Norsk rikskringkasting AS. The license does not grant any right to use, in any way, any trademarks, service marks or logos of Norsk rikskringkasting AS._ diff --git a/examples/lawo_mc2_fader_metering_example.js b/examples/lawo_mc2_fader_metering_example.js new file mode 100644 index 0000000..1ee00dd --- /dev/null +++ b/examples/lawo_mc2_fader_metering_example.js @@ -0,0 +1,74 @@ +// Setting the environment variable DEBUG=emberplus-connection:* +// will show debug information from the emberplus-connection module + +process.env.DEBUG = 'emberplus-connection:*' +// Note: it's also possible to only log parts of the module by using a subset of the debug name, +// 'emberplus-connection:S101Client' // for the S101Client class +// 'emberplus-connection:S101Codec' // for the S101Codec class +// 'emberplus-connection:StreamManager' // for the StreamManager class + +const { EmberClient } = require('../dist/index') + +//------------------------------------------------------------------------- +// Client +// log output from lawo_mc2_fader_metering_mock.js +// ------------------------------------------------------------------------ + +const client = new EmberClient('192.168.1.67', 9000) +let node1InternalNodePath = '' + +client.on('disconnected', () => { + console.error('Client Lost Ember connection') + client.tree = [] +}) + +// Handle successful connection +client.on('connected', () => { + console.log('Client Found Ember connection') + client.tree = [] + + client + .getDirectory(client.tree) + .then((req) => { + console.log(' Req:', req) + return req.response + }) + .then(() => { + console.log(' Getting node...') + + const path_1 = 'Channels.Inputs._1.Metering.Main Level' + return client.getElementByPath(path_1) + }) + .then((node1) => { + if (!node1) { + throw new Error(' Could not find node 1') + } + console.log('Found node:', node1) + node1InternalNodePath = client.getInternalNodePath(node1) + + // Subscribe to changes + client.subscribe(node1, (node1) => { + const value = node1.contents + console.log('Node 1 subscription :', value) + }) + }) + .catch((error) => { + console.error(' Error:', error) + }) +}) +client.on('streamUpdate', (internalNodePath, value) => { + if (internalNodePath !== node1InternalNodePath) { + return + } + console.log('Stream Update:', { + path: internalNodePath, + value: value, + }) +}) + +console.log('-----------------------------------------------------------------------------') +console.log('log output from mc2_fader_metering_example.js') +console.log('Connecting to Client...') +client.connect().catch((error) => { + console.error('Client 2 Error when connecting:', error) +}) diff --git a/src/Ember/Client/StreamManager.ts b/src/Ember/Client/StreamManager.ts new file mode 100644 index 0000000..c6604de --- /dev/null +++ b/src/Ember/Client/StreamManager.ts @@ -0,0 +1,151 @@ +import { EventEmitter } from 'eventemitter3' +import { Parameter, ParameterType } from '../../model/Parameter' +import { EmberValue } from '../../types' +import { Collection } from '../../types/types' +import { StreamEntry } from '../../model' + +import Debug from 'debug' + +const debug = Debug('emberplus-connection:StreamManager') + +export type StreamManagerEvents = { + streamUpdate: [path: string, value: EmberValue] +} + +interface StreamInfo { + parameter: Parameter + path: string + streamIdentifier: number + offset: number +} + +export class StreamManager extends EventEmitter { + private registeredStreams: Map = new Map() + // Lookup by identifier for O(1) access + private streamsByIdentifier: Map> = new Map() + + constructor() { + super() + } + + public registerParameter(parameter: Parameter, path: string): void { + if (!parameter.streamIdentifier) { + debug('Warning: Attempted to register parameter without streamIdentifier') + return + } + // Check if already registered + if (this.registeredStreams.has(path)) { + debug('Stream already registered:', { + path, + identifier: parameter.streamIdentifier, + }) + return + } + + const streamInfo: StreamInfo = { + parameter, + path, + streamIdentifier: parameter.streamIdentifier, + offset: parameter.streamDescriptor?.offset || 0, + } + + // Store both mappings + this.registeredStreams.set(path, streamInfo) + + // Add to identifier lookup + if (!this.streamsByIdentifier.has(parameter.streamIdentifier)) { + this.streamsByIdentifier.set(parameter.streamIdentifier, new Set()) + debug('Registered new stream identifier and adding set:', parameter.streamIdentifier) + } + this.streamsByIdentifier.get(parameter.streamIdentifier)?.add(path) + + debug('Registered new stream:', { + path, + identifier: parameter.streamIdentifier, + totalRegistered: this.registeredStreams.size, + }) + } + + public unregisterParameter(path: string): void { + const streamInfo = this.registeredStreams.get(path) + if (streamInfo?.streamIdentifier) { + // Clean up both maps + this.registeredStreams.delete(path) + const paths = this.streamsByIdentifier.get(streamInfo.streamIdentifier) + if (paths) { + paths.delete(path) + if (paths.size === 0) { + this.streamsByIdentifier.delete(streamInfo.streamIdentifier) + } + } + + debug('Unregistered stream:', { + path: path, + identifier: streamInfo.parameter.identifier, + }) + } + } + + public getStreamInfoByPath(path: string): StreamInfo | undefined { + return this.registeredStreams.get(path) + } + + public hasStream(identifier: string): boolean { + return this.registeredStreams.has(identifier) + } + + public updateStreamValues(streamEntries: Collection): void { + Object.values(streamEntries).forEach((streamEntry) => { + // O(1) lookup by identifier + const paths = this.streamsByIdentifier.get(streamEntry.identifier) + + if (!paths) { + debug('Received update for unregistered stream:', streamEntry.identifier) + return + } + + // Process each matching stream + paths.forEach((path) => { + const streamInfo = this.registeredStreams.get(path) + if (!streamInfo || !streamEntry.value) return + + if (streamEntry.value.type === ParameterType.Integer) { + this.updateStreamValue(path, streamEntry.value.value) + } else if (streamEntry.value.type === ParameterType.Octets && Buffer.isBuffer(streamEntry.value.value)) { + const buffer = streamEntry.value.value + if (buffer.length >= streamInfo.offset + 4) { + const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.length) + const decodedValue = view.getFloat32(streamInfo.offset, true) + this.updateStreamValue(path, decodedValue) + } + } + }) + }) + } + + public updateStreamValue(path: string, value: EmberValue): void { + if (path) { + const streamInfo = this.registeredStreams.get(path) + if (streamInfo) { + streamInfo.parameter.value = value + this.emit('streamUpdate', path, value) + } + } + } + + public getAllRegisteredPaths(): string[] { + return Array.from(this.registeredStreams.keys()) + } + + // Debug helper + public printStreamState(): void { + debug('\nCurrent Stream State:') + debug('Registered Streams:') + this.registeredStreams.forEach((info, path) => { + debug(` Path: ${path}`) + debug(` Identifier: ${info.parameter.identifier}`) + debug(` StreamId: ${info.parameter.streamIdentifier}`) + debug(` Current Value: ${info.parameter.value}`) + }) + } +} diff --git a/src/Ember/Client/__tests__/index.spec.ts b/src/Ember/Client/__tests__/index.spec.ts index acc15f2..8ac637f 100644 --- a/src/Ember/Client/__tests__/index.spec.ts +++ b/src/Ember/Client/__tests__/index.spec.ts @@ -6,11 +6,14 @@ import { ParameterImpl, ParameterType, QualifiedElementImpl, + StreamFormat, } from '../../../model' -import { Collection, Root, RootElement } from '../../../types/types' +import { Collection, EmberTypedValue, Root, RootElement } from '../../../types/types' import { EmberClient } from '../' import S101ClientMock from '../../../__mocks__/S101Client' import { DecodeResult } from '../../../encodings/ber/decoder/DecodeResult' +import { StreamDescriptionImpl } from '../../../model/StreamDescription' +import { StreamEntry, StreamEntryImpl } from '../../../model/StreamEntry' // import { EmberTreeNode, RootElement } from '../../../types/types' // import { ElementType, EmberElement } from '../../../model/EmberElement' // import { Parameter, ParameterType } from '../../../model/Parameter' @@ -107,6 +110,40 @@ describe('client', () => { } } + function createStreamParameter(opts: { + identifier: string + streamId: number + value?: number + offset?: number + format?: StreamFormat + }) { + return new ParameterImpl( + ParameterType.Real, + opts.identifier, + undefined, // description + opts.value ?? 0.0, + undefined, // maximum + undefined, // minimum + undefined, // access + undefined, // format + undefined, // enumeration + undefined, // factor + undefined, // isOnline + undefined, // formula + undefined, // step + undefined, // defaultValue + opts.streamId, + undefined, // enumMap + new StreamDescriptionImpl(opts.format ?? StreamFormat.Float32LE, opts.offset ?? 0) + ) + } + + function createStreamEntryResponse(entries: Array<{ identifier: number; value: EmberTypedValue }>) { + return { + value: entries.map((entry) => new StreamEntryImpl(entry.identifier, entry.value)), + } + } + it('getDirectory resolves', async () => { await runWithConnection(async (client, socket) => { // Do initial load @@ -382,4 +419,227 @@ describe('client', () => { expect(res).toBeTruthy() }) }) + + describe('StreamManager Integration', () => { + it('registers stream parameter when subscribing', async () => { + await runWithConnection(async (client, socket) => { + const streamParam = createStreamParameter({ + identifier: 'test-stream', + streamId: 1, + value: 0.5, + offset: 0, + }) + + const paramNode = new NumberedTreeNodeImpl(1, streamParam) + + // Subscribe to parameter + const subscribeReq = await client.subscribe(paramNode) + subscribeReq.response?.catch(() => null) + + expect(onSocketWrite).toHaveBeenCalledTimes(1) + + // Mock successful subscription + socket.mockData(createQualifiedNodeResponse('1', streamParam, undefined)) + + // Wait for registration to complete + await new Promise(setImmediate) + + // Get StreamManager instance and check registration + //@ts-expect-error - private method + const streamManager = client._streamManager + const streamInfo = streamManager.getStreamInfoByPath('1') + + expect(streamInfo).toBeDefined() + expect(streamInfo?.parameter.streamIdentifier).toBe(1) + expect(streamInfo?.parameter.value).toBe(0.5) + }) + }) + + it('deregisters stream parameter when unsubscribing', async () => { + await runWithConnection(async (client, socket) => { + const streamParam = createStreamParameter({ + identifier: 'test-stream', + streamId: 1, + }) + + const paramNode = new NumberedTreeNodeImpl(1, streamParam) + + // First subscribe + const subscribeReq = await client.subscribe(paramNode) + subscribeReq.response?.catch(() => null) + + socket.mockData(createQualifiedNodeResponse('1', streamParam, undefined)) + + await new Promise(setImmediate) + + // Then unsubscribe + const unsubscribeReq = await client.unsubscribe(paramNode) + unsubscribeReq.response?.catch(() => null) + + socket.mockData(createQualifiedNodeResponse('1', streamParam, undefined)) + + // Mock receiving stream data + const streamData = createStreamEntryResponse([ + { + identifier: 1, + value: { type: ParameterType.Octets, value: 42.5 }, + }, + ]) + socket.mockData(streamData) + + await new Promise(setImmediate) + + // Check parameter was deregistered + //@ts-expect-error - private method + const streamManager = client._streamManager + const streamInfo = streamManager.getStreamInfoByPath('1') + + expect(streamInfo).toBeUndefined() + }) + }) + + it('processes stream data with specific offsets', async () => { + await runWithConnection(async (client, socket) => { + // Create test parameters with specific offsets + const streamParam1 = createStreamParameter({ + identifier: 'test-stream1', + streamId: 1, + offset: 64, + format: StreamFormat.Float32LE, + }) + + const streamParam2 = createStreamParameter({ + identifier: 'test-stream2', + streamId: 1, + offset: 68, + format: StreamFormat.Float32LE, + }) + + const path1 = '1.3.17.3' + const path2 = '1.3.18.3' + + // Create qualified element wrappers for the parameters + const param1Element = new QualifiedElementImpl(path1, streamParam1) + const param2Element = new QualifiedElementImpl(path2, streamParam2) + + // Subscribe to parameters using qualified elements + const subscribe1 = await client.subscribe(param1Element) + const subscribe2 = await client.subscribe(param2Element) + + subscribe1.response?.catch(() => null) + subscribe2.response?.catch(() => null) + + // Mock successful subscriptions with qualified paths + socket.mockData({ + value: { + 1: param1Element, + }, + }) + socket.mockData({ + value: { + 1: param2Element, + }, + }) + + await new Promise(setImmediate) + + // Create the buffer with repeating values except last 8 bytes + const buffer = Buffer.from([ + 0x00, + 0x00, + 0x48, + 0xc3, // -200.0 repeated multiple times + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x00, + 0x00, + 0x48, + 0xc3, + 0x74, + 0xb7, + 0x1e, + 0xc2, // -39.67915344238281 at offset 64 + 0xb6, + 0xe1, + 0xbe, + 0xc1, // -23.860210418701172 at offset 68 + ]) + + // Get StreamManager instance and verify values + //@ts-expect-error - private method + const streamManager = client._streamManager + + const decoded: Collection = [ + { + identifier: 1, + value: { + type: ParameterType.Octets, + value: buffer, + }, + }, + ] + + streamManager.updateStreamValues(decoded) + const stream1 = streamManager.getStreamInfoByPath(path1) + const stream2 = streamManager.getStreamInfoByPath(path2) + + expect(stream1?.parameter.value).toBeCloseTo(-39.67915344238281) + expect(stream2?.parameter.value).toBeCloseTo(-23.860210418701172) + }) + }) + }) }) diff --git a/src/Ember/Client/index.ts b/src/Ember/Client/index.ts index 460475f..7f947e1 100644 --- a/src/Ember/Client/index.ts +++ b/src/Ember/Client/index.ts @@ -30,10 +30,12 @@ import { EmberNode } from '../../model/EmberNode' import { EventEmitter } from 'eventemitter3' import { S101Client } from '../Socket' import { getPath, assertQualifiedEmberNode, insertCommand, updateProps, isEmptyNode } from '../Lib/util' -import { berEncode } from '../..' +import { berEncode } from '../../encodings/ber' import { NumberedTreeNodeImpl } from '../../model/Tree' import { EmberFunction } from '../../model/EmberFunction' import { DecodeResult } from '../../encodings/ber/decoder/DecodeResult' +import { StreamEntry } from '../../model/StreamEntry' +import { StreamManager } from './StreamManager' export type RequestPromise = Promise> export interface RequestPromiseArguments { @@ -86,6 +88,7 @@ export type EmberClientEvents = { connected: [] disconnected: [] + streamUpdate: [path: string, value: EmberValue] } export class EmberClient extends EventEmitter { @@ -93,6 +96,7 @@ export class EmberClient extends EventEmitter { port: number tree: Collection> = [] + private _streamManager: StreamManager private _requests = new Map() private _lastInvocation = 0 private _client: S101Client @@ -111,6 +115,12 @@ export class EmberClient extends EventEmitter { this._timeout = timeout this._resendTimeout = resendTimeout this._resends = enableResends + this._streamManager = new StreamManager() + + // Forward stream events from StreamManager + this._streamManager.on('streamUpdate', (path, value) => { + this.emit('streamUpdate', path, value) + }) // resend timer runs at greatest common divisor of timeouts and resends const findGcd = (a: number, b: number) => { @@ -125,7 +135,15 @@ export class EmberClient extends EventEmitter { this._timer = setInterval(() => this._resendTimer(), findGcd(this._timeout, this._resendTimeout)) this._client = new S101Client(this.host, this.port) - this._client.on('emberTree', (tree: DecodeResult) => this._handleIncoming(tree)) + this._client.on('emberTree', (tree: DecodeResult) => { + // Regular ember tree + this._handleIncoming(tree) + }) + this._client.on('emberStreamTree', (tree: DecodeResult) => { + // Ember Tree with Stream + const entries = tree.value as Collection + this._streamManager.updateStreamValues(entries) + }) this._client.on('error', (e) => this.emit('error', e)) this._client.on('connected', () => this.emit('connected')) @@ -232,6 +250,14 @@ export class EmberClient extends EventEmitter { return this._sendRequest(new NumberedTreeNodeImpl(0, command), ExpectResponse.Any) } + // Check if this is a Parameter with streamIdentifier + if (node.contents.type === ElementType.Parameter) { + const parameter = node.contents + if (parameter.streamIdentifier !== undefined) { + this._streamManager.registerParameter(parameter, getPath(node)) + } + } + if (cb) this._subscriptions.push({ path: getPath(node), @@ -248,12 +274,22 @@ export class EmberClient extends EventEmitter { const command: Unsubscribe = new UnsubscribeImpl() const path = Array.isArray(node) ? '' : getPath(node) + + // Clean up subscriptions for (const i in this._subscriptions) { if (this._subscriptions[i].path === path) { this._subscriptions.splice(Number(i), 1) } } + // Deregister from StreamManager if this was a Parameter with streamIdentifier + if (!Array.isArray(node) && node.contents.type === ElementType.Parameter) { + const parameter = node.contents + if (parameter.streamIdentifier !== undefined) { + this._streamManager.unregisterParameter(path) + } + } + if (Array.isArray(node)) { return this._sendRequest(new NumberedTreeNodeImpl(0, command), ExpectResponse.Any) } @@ -293,7 +329,8 @@ export class EmberClient extends EventEmitter { const qualifiedParam = assertQualifiedEmberNode(node) as QualifiedElement // TODO - validate value - // TODO - should other properties be scrapped? + // TODO - should other properties be scrapped + qualifiedParam.contents.value = value return this._sendRequest>( @@ -418,6 +455,31 @@ export class EmberClient extends EventEmitter { return tree } + // This function handles the fact that the path in the Ember+ tree is not always the same as the path in requested from the provider + getInternalNodePath(node: TreeElement): string | undefined { + if ('path' in node && typeof node.path === 'string') { + // QualifiedElement case + return node.path + } else if ('number' in node) { + // NumberedTreeNode case + const numbers: number[] = [] + let current: NumberedTreeNode | undefined = node as NumberedTreeNode + + while (current) { + numbers.unshift(current.number) + if (current.parent && 'number' in current.parent) { + current = current.parent as NumberedTreeNode + } else { + current = undefined + } + } + + return numbers.join('.') + } + + return undefined + } + private async _matrixMutation( matrix: QualifiedElement | NumberedTreeNode, target: number, @@ -496,6 +558,7 @@ export class EmberClient extends EventEmitter { private _handleIncoming(incoming: DecodeResult) { const node = incoming.value + // update tree: const changes = this._applyRootToTree(node) diff --git a/src/Ember/Lib/util.ts b/src/Ember/Lib/util.ts index ba571e6..3c2e7eb 100644 --- a/src/Ember/Lib/util.ts +++ b/src/Ember/Lib/util.ts @@ -91,17 +91,15 @@ export function isEmptyNode(node: TreeElement): boolean { return false } - if ( - node.contents.description ?? - node.contents.identifier ?? - node.contents.isOnline ?? - node.contents.isRoot ?? - node.contents.schemaIdentifiers ?? - node.contents.templateReference - ) { - return false - } - - // node is a node, node has no children, node has no properties set => node must be empty - return true + // Check if any of these properties have a value, including empty strings as a node with an empty description is not empty) + const notEmpty = [ + node.contents.description, + node.contents.identifier, + node.contents.isOnline, + node.contents.isRoot, + node.contents.schemaIdentifiers, + node.contents.templateReference, + ].some((value) => value !== undefined && value !== null) + + return !notEmpty } diff --git a/src/Ember/Socket/S101Client.ts b/src/Ember/Socket/S101Client.ts index b5247bb..306e78d 100644 --- a/src/Ember/Socket/S101Client.ts +++ b/src/Ember/Socket/S101Client.ts @@ -3,6 +3,9 @@ import S101Socket from './S101Socket' import { ConnectionStatus } from '../Client' import { normalizeError } from '../Lib/util' +import Debug from 'debug' +const debug = Debug('emberplus-connection:S101Client') + const DEFAULT_PORT = 9000 const RECONNECT_ATTEMPTS = 60 const AUTO_RECONNECT_DELAY = 5000 @@ -59,6 +62,12 @@ export default class S101Client extends S101Socket { this.socket.on('close', (hadError) => this._onClose(hadError)) this.socket.on('connect', () => this._onConnect()) this.socket.on('data', (data) => { + debug('Data from Ember connection received:', { + address: this.socket?.remoteAddress, + port: this.socket?.remotePort, + dataLength: data.length, + data: data.toString('hex'), + }) try { this.codec.dataIn(data) } catch (e) { diff --git a/src/Ember/Socket/S101Socket.ts b/src/Ember/Socket/S101Socket.ts index f86efa2..d7db5ad 100644 --- a/src/Ember/Socket/S101Socket.ts +++ b/src/Ember/Socket/S101Socket.ts @@ -12,8 +12,8 @@ export type Request = any export type S101SocketEvents = { error: [Error] - emberPacket: [packet: Buffer] emberTree: [root: DecodeResult] + emberStreamTree: [root: DecodeResult] connecting: [] connected: [] disconnected: [] @@ -44,7 +44,6 @@ export default class S101Socket extends EventEmitter { }) this.codec.on('emberPacket', (packet) => { - this.emit('emberPacket', packet) try { const root = berDecode(packet) if (root != null) { @@ -54,6 +53,16 @@ export default class S101Socket extends EventEmitter { this.emit('error', normalizeError(e)) } }) + this.codec.on('emberStreamPacket', (packet) => { + try { + const root = berDecode(packet) + if (root != null) { + this.emit('emberStreamTree', root) + } + } catch (e) { + this.emit('error', normalizeError(e)) + } + }) this._initSocket() } diff --git a/src/S101/S101Codec.ts b/src/S101/S101Codec.ts index 616dddb..9636499 100644 --- a/src/S101/S101Codec.ts +++ b/src/S101/S101Codec.ts @@ -2,6 +2,8 @@ import { EventEmitter } from 'eventemitter3' import { SmartBuffer } from 'smart-buffer' import Debug from 'debug' import { format } from 'util' +import { berDecode } from '../encodings/ber' + const debug = Debug('emberplus-connection:S101Codec') const S101_BOF = 0xfe @@ -54,34 +56,103 @@ const CRC_TABLE = [ export type S101CodecEvents = { emberPacket: [packet: Buffer] + emberStreamPacket: [packet: Buffer] keepaliveReq: [] keepaliveResp: [] } +// This is enough for typical size of Ember data, but buffer is Dynamic to allow for larger data if needed +const BUFFER_FRAME_SIZE = 64 * 1024 + export default class S101Codec extends EventEmitter { - inbuf = new SmartBuffer() - emberbuf = new SmartBuffer() + inbuf = new SmartBuffer({ size: BUFFER_FRAME_SIZE }) + private frameBuffer?: Buffer escaped = false + private multiPacketBuffer?: SmartBuffer + private isMultiPacket = false + dataIn(buf: Buffer): void { - for (let i = 0; i < buf.length; i++) { - const b = buf.readUInt8(i) - if (this.escaped) { - this.inbuf.writeUInt8(b ^ S101_XOR) - this.escaped = false - } else if (b === S101_CE) { - this.escaped = true - } else if (b === S101_BOF) { - this.inbuf.clear() - this.escaped = false - } else if (b === S101_EOF) { - this.inbuf.moveTo(0) - this.handleFrame(this.inbuf) - this.inbuf.clear() + // console.log('TimeStamp: ', Date.now(), 'dataIn', buf.toString('hex')) + + // If we have leftover data from a previous incomplete frame, prepend it + if (this.frameBuffer) { + buf = Buffer.concat([this.frameBuffer, buf]) + this.frameBuffer = undefined + } + + let frameOffset = 0 + while (frameOffset < buf.length) { + const frameStart = buf.indexOf(S101_BOF, frameOffset) + if (frameStart === -1) break + + const frameEnd = buf.indexOf(S101_EOF, frameStart + 1) + if (frameEnd === -1 || frameEnd - frameStart < 4) { + //console.log('Parsing frameEnd to next chunk') + this.frameBuffer = buf.subarray(frameStart) + break + } + + this.inbuf.clear() + let chunkOffset = frameStart + 1 + // Reset escaped state at frame start + this.escaped = false + + while (chunkOffset < frameEnd) { + const b = buf[chunkOffset] + + if (this.escaped) { + this.inbuf.writeUInt8(b ^ S101_XOR) + this.escaped = false + chunkOffset++ + } else if (b === S101_CE) { + this.escaped = true + chunkOffset++ + } else { + // Find next escape or end + let nextSpecial = chunkOffset + 1 + while (nextSpecial < frameEnd) { + const nb = buf[nextSpecial] + if (nb === S101_CE || nb === S101_EOF) break + nextSpecial++ + } + + // Write the chunk up to the next special character + this.inbuf.writeBuffer(buf.subarray(chunkOffset, nextSpecial)) + chunkOffset = nextSpecial + } + } + + this.escaped = false // Reset escaped state at frame end + this.inbuf.moveTo(0) + + // console.log('Buffer 00-16', this.inbuf.toString('hex').substring(0, 40)) + this.handleFrame(this.inbuf) + frameOffset = frameEnd + 1 + } + } + + private isEmberStreamPacket(buffer: Buffer): boolean { + // Fast skip if not minimum stream structure + if (buffer.length < 3) return false + + // Check for stream structure + if (buffer[0] === 0x60) { + // Check the length byte + const lengthByte = buffer[1] + + if (lengthByte < 0x80) { + // Simple length, next byte should be 0x66 + return buffer[2] === 0x66 } else { - this.inbuf.writeUInt8(b) + // Complex length encoding + const numLengthBytes = lengthByte & 0x7f + if (buffer.length >= 2 + numLengthBytes) { + return buffer[2 + numLengthBytes] === 0x66 + } } } + return false } handleFrame(frame: SmartBuffer): void { @@ -105,7 +176,9 @@ export default class S101Codec extends EventEmitter { debug('received keepalive response') this.emit('keepaliveResp') } else if (command === CMD_EMBER) { - this.handleEmberFrame(frame) + const remainingData = frame.readBuffer() + const emberFrame = SmartBuffer.fromBuffer(remainingData) + this.handleEmberFrame(emberFrame) } else { throw new Error(format('dropping frame of length %d with unknown command %d', frame.length, command)) } @@ -122,15 +195,16 @@ export default class S101Codec extends EventEmitter { } if (dtd !== DTD_GLOW) { - throw new Error('Dropping frame with non-Glow DTD') + // Don't throw, just warn and continue processing + debug('Warning: Received frame with DTD %d, expected %d', dtd, DTD_GLOW) } if (appBytes < 2) { debug('Warning: Frame missing Glow DTD version') frame.skip(appBytes) } else { - frame.readUInt8() // glowMinor - frame.readUInt8() // glowMajor + frame.skip(1) // Skip minor version + frame.skip(1) // Skip major version appBytes -= 2 if (appBytes > 0) { frame.skip(appBytes) @@ -139,26 +213,67 @@ export default class S101Codec extends EventEmitter { } let payload = frame.readBuffer() - payload = payload.slice(0, payload.length - 2) - if (flags & FLAG_FIRST_MULTI_PACKET) { - debug('multi ember packet start') - this.emberbuf.clear() + payload = payload.slice(0, payload.length - 2) // Remove CRC + + if ((flags & FLAG_SINGLE_PACKET) === FLAG_SINGLE_PACKET) { + if ((flags & FLAG_EMPTY_PACKET) === 0) { + // Check if this is a metering packet + if (this.isEmberStreamPacket(payload)) { + this.handleEmberStreamPacket(payload) + } else { + this.handleEmberPacket(payload) + } + } + } else { + // Multi-packet handling + if ((flags & FLAG_FIRST_MULTI_PACKET) === FLAG_FIRST_MULTI_PACKET) { + debug('multi ember packet start') + this.multiPacketBuffer = new SmartBuffer() + this.isMultiPacket = true + this.multiPacketBuffer.writeBuffer(payload) + } else if (this.isMultiPacket && this.multiPacketBuffer) { + this.multiPacketBuffer.writeBuffer(payload) + + if ((flags & FLAG_LAST_MULTI_PACKET) === FLAG_LAST_MULTI_PACKET) { + debug('multi ember packet end') + const completeData = this.multiPacketBuffer.toBuffer() + // Check if this is a stream packet, can also be a normal packet + if (completeData[0] === 0x60 && completeData[2] === 0x66) { + this.handleEmberStreamPacket(completeData) + } else { + this.handleEmberPacket(completeData) + } + this.resetMultiPacketBuffer() + } + } } - if ((flags & FLAG_EMPTY_PACKET) === 0) { - // not empty, save the payload - this.emberbuf.writeBuffer(payload) + } + private handleEmberPacket(data: Buffer): void { + try { + const decoded = berDecode(data) + if (data[0] === 0x60) { + // Root tag check + if (decoded.value) { + this.emit('emberPacket', data) + } + } + } catch (error) { + console.error('Error decoding packet:', error) } - if (flags & FLAG_LAST_MULTI_PACKET) { - debug('multi ember packet end') - this.emberbuf.moveTo(0) - this.handleEmberPacket(this.emberbuf) - this.emberbuf.clear() + } + + private handleEmberStreamPacket(data: Buffer): void { + try { + this.emit('emberStreamPacket', data) + } catch (error) { + console.error('Error decoding stream packet:', error) + this.resetMultiPacketBuffer() } } - handleEmberPacket(packet: SmartBuffer): void { - debug('ember packet') - this.emit('emberPacket', packet.toBuffer()) + resetMultiPacketBuffer(): void { + this.multiPacketBuffer = undefined + this.isMultiPacket = false } encodeBER(data: Buffer): Buffer[] { diff --git a/src/encodings/ber/decoder/StreamEntry.ts b/src/encodings/ber/decoder/StreamEntry.ts index 84ea65f..d8a4ce8 100644 --- a/src/encodings/ber/decoder/StreamEntry.ts +++ b/src/encodings/ber/decoder/StreamEntry.ts @@ -54,6 +54,7 @@ function decodeStreamEntry(reader: Ber.Reader, options: DecodeOptions = defaultD break case Ber.CONTEXT(1): value = reader.readValue() + // return the full stream for later processing break case 0: break // indefinite length