Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support for subscription of streams/metering #43

Open
wants to merge 67 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
d46a5a3
fix: some Ember implementations has an empty string as identifier
olzzon Dec 18, 2024
b9f9cdd
wip: implementing stream subsciption
olzzon Dec 18, 2024
de3b2ef
wip: emit via normal subscription - meter data currently hardcoded
olzzon Dec 19, 2024
2b638fd
feat: StreamManager class for handling stream subscriptions
olzzon Dec 26, 2024
a396b0d
feat: use path to register streams and handle decoding in StreamManager
olzzon Dec 26, 2024
78c1660
fix: handle offset=0
olzzon Dec 26, 2024
03f609f
wip: test of streaming manager
olzzon Dec 26, 2024
60a0e8f
fix: streamEntry.value can be zero
olzzon Dec 26, 2024
63dea41
fix: StreamManager test use real values.
olzzon Dec 26, 2024
1284b28
feat: cleaup code from first stream implementation
olzzon Dec 26, 2024
5cbbac4
docs: update README.md with StreamManager
olzzon Dec 26, 2024
bb3ca20
fix: handle integer stream type
olzzon Dec 26, 2024
99541a0
feat: getInternalNodePath() to handle that there's no path on numbere…
olzzon Dec 28, 2024
3ac2fbe
doc: include getInternalNodePath() in README.md
olzzon Dec 28, 2024
22d61b9
fix: StreamManager should not be a singleton
olzzon Jan 6, 2025
c00bb10
feat: separate emberPackets and emberStreamPackets
olzzon Jan 6, 2025
e2d5072
feat: StreamManager is only used internally
olzzon Jan 7, 2025
616d8cc
feat: update README.md
olzzon Jan 7, 2025
bb60778
feat: update tests to client's internal StreamManager
olzzon Jan 7, 2025
c586ac9
chore: gitignore allow dist folder
olzzon Jan 7, 2025
684609a
fix: revert check if node is a parameter
olzzon Jan 15, 2025
7346f7d
fix: multiPacket can also be non stream packets
olzzon Jan 15, 2025
a532c3b
feat: add a MC2_mock.js in examples folder
olzzon Jan 15, 2025
e8052fc
dist: build latest
ianshade Jan 15, 2025
3b95449
feat: add debugging in S101Client
olzzon Jan 16, 2025
2a421d2
dist: build latest
olzzon Jan 16, 2025
2e8d917
fix: handle single stream packets
olzzon Jan 16, 2025
3e0aff9
fix: emit emberStreamTree if stream packet
olzzon Jan 16, 2025
70e1317
dist update to latest
olzzon Jan 16, 2025
2de55f5
feat: example of r3lay patchbay streaming
olzzon Jan 17, 2025
477e0f8
feat: Streamanager log if unregistered update
olzzon Jan 17, 2025
be86a3d
feat: optimise StreamManage with identifier lookup table
olzzon Jan 23, 2025
180bbce
feat: update mc2 mocks
olzzon Jan 23, 2025
3b7c2ae
dist: build latest
olzzon Jan 23, 2025
eb0f9e7
fix: only register a parameter once
olzzon Jan 23, 2025
cf0eb5b
dist: build latest
olzzon Jan 23, 2025
1c85dac
fix: ensure value on offset
olzzon Jan 23, 2025
5ad9b2b
DEBUG: add hardcoded version for debugging
olzzon Jan 23, 2025
affce67
dist: build latest
olzzon Jan 23, 2025
e6d9fc8
fix: remove unwanted extensive log in client
olzzon Jan 23, 2025
77249d7
dist: build latest
olzzon Jan 23, 2025
48d50bc
feat: ratelimit meter updates
olzzon Jan 24, 2025
9443ec7
dist: build latest
olzzon Jan 24, 2025
212a5b5
dist: build latest & bump debug version
olzzon Jan 24, 2025
560fd54
feat: Simplify stream handling to parse meter data without BER decoding
olzzon Jan 27, 2025
7fd760a
dist: build latest & bump debug version
olzzon Jan 27, 2025
e79cc5f
fix: read correct streamIdentifier in parseStreamPacket
olzzon Jan 27, 2025
091de84
fix: extract correct values from raw streamPackage
olzzon Jan 28, 2025
11f79de
dist: build latest & bump debug version
olzzon Jan 28, 2025
6bcda7c
reset to
olzzon Jan 28, 2025
01fe8b8
Revert "fix: read correct streamIdentifier in parseStreamPacket"
olzzon Jan 28, 2025
afb4903
Revert "feat: Simplify stream handling to parse meter data without BE…
olzzon Jan 28, 2025
c862446
dist: build revert
olzzon Jan 28, 2025
1d5ba59
feat: cap limit on streams in dataIn()
olzzon Jan 29, 2025
630e586
dist: build latest
olzzon Jan 29, 2025
5847f0a
feat: optimisation in dataIn() - iterate over frames instead of each …
olzzon Jan 29, 2025
ac17dc3
dist: build latest & bump debug version
olzzon Jan 29, 2025
2ddaeda
feat: remove ratelimit in streamManager (as one is added in dataIn()
olzzon Jan 30, 2025
74dca29
feat: treat data in chunks handling only escaped characters
olzzon Jan 30, 2025
03ae393
dist: bump debug version
olzzon Jan 30, 2025
e49c42f
fix: look for streaming data in the correct posistion
olzzon Jan 30, 2025
143d6ca
fix: add stream cap limit in new chunk based structure
olzzon Jan 31, 2025
1022745
fix: isEmberStreampacket() function to ensure different size of strea…
olzzon Jan 31, 2025
c30b120
feat: cleanup console.logs prior to PR
olzzon Feb 7, 2025
e9fe8ba
feat: revert gitignore to non dist branch
olzzon Feb 7, 2025
eaa5aa4
Remove dist folder from git tracking
olzzon Feb 7, 2025
cfb7c5f
doc: update metering example and README
olzzon Feb 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ yarn-error.log
!.yarn/plugins
!.yarn/releases
!.yarn/sdks
!.yarn/versions
!.yarn/versions
76 changes: 51 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ It has been tested with _Lawo Ruby_, _Lawo R3lay_, and _Lawo MxGUI_.
The current version is very losely based on the original library and Mr Gilles Dufour's rewrites. It is however rewritten almost completely from scratch and bears little to no resemblance to earlier libraries.

### Repository-specific Info for Developers
* [Developer Info](DEVELOPER.md)
* [Contribution Guidelines](CONTRIBUTING.md)

### General Sofie System Info
* [Documentation](https://nrkno.github.io/sofie-core/)
* [Releases](https://nrkno.github.io/sofie-core/releases)

- [Developer Info](DEVELOPER.md)
- [Contribution Guidelines](CONTRIBUTING.md)

### General Sofie System Info

- [Documentation](https://nrkno.github.io/sofie-core/)
- [Releases](https://nrkno.github.io/sofie-core/releases)

---

Expand All @@ -26,24 +25,36 @@ The current version is very losely based on the original library and Mr Gilles D
Get Full tree:

```javascript
const { EmberClient } = require('emberplus-connection');
const client = new EmberClient("10.9.8.7", 9000);
client.on("error", e => {
console.log(e);
});
const { EmberClient, StreamManager } = require('emberplus-connection')
const client = new EmberClient('10.9.8.7', 9000)
client.on('error', (e) => {
console.log(e)
})
await client.connect()

// If you want to listen to stream updates - you can do it like this:
client.on('streamUpdate', (internalNodePath, value) => {
console.log('Stream Update:', {
path: internalNodePath,
value: value,
})
// You can get the internal node path, the internal path can be different from the path you requested,
// depending on wheter you request a numbered node or via the description
// the client has a client.getInternalNodePath(node) that you can request and use as reference when subsribing to a node
})

// Get Root info
const req = await client.getDirectory(client.tree)
await req.response
// Get a Specific Node
const node = await client.getElementByPath("0.0.2")
console.log(node);
const node = await client.getElementByPath('0.0.2')
console.log(node)
// Get a node by its path identifiers
const node2 = await client.getElementByPath("path.to.node"))
console.log(node2);
const node2 = await client.getElementByPath('path.to.node')
console.log(node2)
// Get a node by its path descriptions
const node3 = await client.getElementByPath("descr1.descr2.descr3"))
console.log(node3);
const node3 = await client.getElementByPath('descr1.descr2.descr3')
console.log(node3)
// Expand entire tree under node 0
await client.expand(client.tree)
console.log(client.tree)
Expand All @@ -70,6 +81,10 @@ client
})
.then(() => client.getElementByPath('0.2'))
.then(async (node) => {
// You can get the internal node path, the internal path can be different from the requested,
// depending on wheter you request a numbered node or via the description
console.log('This is the internal node path :', client.getInternalNodePath(node))

// For non-streams a getDirectory will automatically subscribe for update
return (
await client.getDirectory(node, (update) => {
Expand All @@ -84,14 +99,22 @@ client
console.log(update)
})
)
client.on('streamUpdate', (internalNodePath, value) => {
console.log('Stream Update:', {
path: internalNodePath,
value: value,
})
})
```

### Setting New Value

```javascript
client = new EmberClient(LOCALHOST, PORT)
await client.connect()
await (await client.getDirectory()).response
await (
await client.getDirectory()
).response
const req = await client.setValue(await client.getElementByPath('0.0.1'), 'gdnet')
await req.response
console.log('result', req.response)
Expand All @@ -107,7 +130,9 @@ const { EmberClient, EmberLib } = require('node-emberplus')

const client = new EmberClient(HOST, PORT)
await client.connect()
await (await client.getDirectory()).response
await (
await client.getDirectory()
).response
const fn = await client.getElementByPath('path.to.function')
const req = await client.invoke(fn, 1, 2, 3)
console.log('result', await req.response)
Expand All @@ -126,7 +151,7 @@ const {
ParameterAccess,
MatrixImpl,
MatrixType,
MatrixAddressingMode
MatrixAddressingMode,
} = require('emberplus-connection')

const s = new EmberServer(9000) // start server on port 9000
Expand Down Expand Up @@ -187,14 +212,14 @@ const tree = {
undefined,
ParameterAccess.ReadWrite
)
)
),
}),

2: new NumberedTreeNodeImpl(2, new EmberNodeImpl('Functions', undefined, undefined, true), {
1: new NumberedTreeNodeImpl(
1,
new EmberFunctionImpl(undefined, undefined) //, [{ type: ParameterType.Boolean, name: 'Test' }])
)
),
}),

3: new NumberedTreeNodeImpl(3, new EmberNodeImpl('Matrices', undefined, undefined, true), {
Expand All @@ -211,13 +236,14 @@ const tree = {
5,
5
)
)
})
})
),
}),
}),
}

s.init(tree) // initiate the provider with the tree
```

---

_The NRK logo is a registered trademark of Norsk rikskringkasting AS. The license does not grant any right to use, in any way, any trademarks, service marks or logos of Norsk rikskringkasting AS._
74 changes: 74 additions & 0 deletions examples/lawo_mc2_fader_metering_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Setting the environment variable DEBUG=emberplus-connection:*
// will show debug information from the emberplus-connection module

process.env.DEBUG = 'emberplus-connection:*'
// Note: it's also possible to only log parts of the module by using a subset of the debug name,
// 'emberplus-connection:S101Client' // for the S101Client class
// 'emberplus-connection:S101Codec' // for the S101Codec class
// 'emberplus-connection:StreamManager' // for the StreamManager class

const { EmberClient } = require('../dist/index')

//-------------------------------------------------------------------------
// Client
// log output from lawo_mc2_fader_metering_mock.js
// ------------------------------------------------------------------------

const client = new EmberClient('192.168.1.67', 9000)
let node1InternalNodePath = ''

client.on('disconnected', () => {
console.error('Client Lost Ember connection')
client.tree = []
})

// Handle successful connection
client.on('connected', () => {
console.log('Client Found Ember connection')
client.tree = []

client
.getDirectory(client.tree)
.then((req) => {
console.log(' Req:', req)
return req.response
})
.then(() => {
console.log(' Getting node...')

const path_1 = 'Channels.Inputs._1.Metering.Main Level'
return client.getElementByPath(path_1)
})
.then((node1) => {
if (!node1) {
throw new Error(' Could not find node 1')
}
console.log('Found node:', node1)
node1InternalNodePath = client.getInternalNodePath(node1)

// Subscribe to changes
client.subscribe(node1, (node1) => {
const value = node1.contents
console.log('Node 1 subscription :', value)
})
})
.catch((error) => {
console.error(' Error:', error)
})
})
client.on('streamUpdate', (internalNodePath, value) => {
if (internalNodePath !== node1InternalNodePath) {
return
}
console.log('Stream Update:', {
path: internalNodePath,
value: value,
})
})

console.log('-----------------------------------------------------------------------------')
console.log('log output from mc2_fader_metering_example.js')
console.log('Connecting to Client...')
client.connect().catch((error) => {
console.error('Client 2 Error when connecting:', error)
})
151 changes: 151 additions & 0 deletions src/Ember/Client/StreamManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { EventEmitter } from 'eventemitter3'
import { Parameter, ParameterType } from '../../model/Parameter'
import { EmberValue } from '../../types'
import { Collection } from '../../types/types'
import { StreamEntry } from '../../model'

import Debug from 'debug'

const debug = Debug('emberplus-connection:StreamManager')

export type StreamManagerEvents = {
streamUpdate: [path: string, value: EmberValue]
}

interface StreamInfo {
parameter: Parameter
path: string
streamIdentifier: number
offset: number
}

export class StreamManager extends EventEmitter<StreamManagerEvents> {
private registeredStreams: Map<string, StreamInfo> = new Map()
// Lookup by identifier for O(1) access
private streamsByIdentifier: Map<number, Set<string>> = new Map()

constructor() {
super()
}

public registerParameter(parameter: Parameter, path: string): void {
if (!parameter.streamIdentifier) {
debug('Warning: Attempted to register parameter without streamIdentifier')
return
}
// Check if already registered
if (this.registeredStreams.has(path)) {
debug('Stream already registered:', {
path,
identifier: parameter.streamIdentifier,
})
return
}

const streamInfo: StreamInfo = {
parameter,
path,
streamIdentifier: parameter.streamIdentifier,
offset: parameter.streamDescriptor?.offset || 0,
}

// Store both mappings
this.registeredStreams.set(path, streamInfo)

// Add to identifier lookup
if (!this.streamsByIdentifier.has(parameter.streamIdentifier)) {
this.streamsByIdentifier.set(parameter.streamIdentifier, new Set())
debug('Registered new stream identifier and adding set:', parameter.streamIdentifier)
}
this.streamsByIdentifier.get(parameter.streamIdentifier)?.add(path)

debug('Registered new stream:', {
path,
identifier: parameter.streamIdentifier,
totalRegistered: this.registeredStreams.size,
})
}

public unregisterParameter(path: string): void {
const streamInfo = this.registeredStreams.get(path)
if (streamInfo?.streamIdentifier) {
// Clean up both maps
this.registeredStreams.delete(path)
const paths = this.streamsByIdentifier.get(streamInfo.streamIdentifier)
if (paths) {
paths.delete(path)
if (paths.size === 0) {
this.streamsByIdentifier.delete(streamInfo.streamIdentifier)
}
}

debug('Unregistered stream:', {
path: path,
identifier: streamInfo.parameter.identifier,
})
}
}

public getStreamInfoByPath(path: string): StreamInfo | undefined {
return this.registeredStreams.get(path)
}

public hasStream(identifier: string): boolean {
return this.registeredStreams.has(identifier)
}

public updateStreamValues(streamEntries: Collection<StreamEntry>): void {
Object.values<StreamEntry>(streamEntries).forEach((streamEntry) => {
// O(1) lookup by identifier
const paths = this.streamsByIdentifier.get(streamEntry.identifier)

if (!paths) {
debug('Received update for unregistered stream:', streamEntry.identifier)
return
}

// Process each matching stream
paths.forEach((path) => {
const streamInfo = this.registeredStreams.get(path)
if (!streamInfo || !streamEntry.value) return

if (streamEntry.value.type === ParameterType.Integer) {
this.updateStreamValue(path, streamEntry.value.value)
} else if (streamEntry.value.type === ParameterType.Octets && Buffer.isBuffer(streamEntry.value.value)) {
const buffer = streamEntry.value.value
if (buffer.length >= streamInfo.offset + 4) {
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.length)
const decodedValue = view.getFloat32(streamInfo.offset, true)
this.updateStreamValue(path, decodedValue)
}
}
})
})
}

public updateStreamValue(path: string, value: EmberValue): void {
if (path) {
const streamInfo = this.registeredStreams.get(path)
if (streamInfo) {
streamInfo.parameter.value = value
this.emit('streamUpdate', path, value)
}
}
}

public getAllRegisteredPaths(): string[] {
return Array.from(this.registeredStreams.keys())
}

// Debug helper
public printStreamState(): void {
debug('\nCurrent Stream State:')
debug('Registered Streams:')
this.registeredStreams.forEach((info, path) => {
debug(` Path: ${path}`)
debug(` Identifier: ${info.parameter.identifier}`)
debug(` StreamId: ${info.parameter.streamIdentifier}`)
debug(` Current Value: ${info.parameter.value}`)
})
}
}
Loading