Skip to content

Commit

Permalink
feat: introduce terminateProbe command (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickmichalina authored Oct 6, 2019
1 parent 26dc084 commit 6f9b7c7
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 34 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ $ camera-probe

## Programmatic Usage
```js
import { onvifDevices$ } from 'camera-probe'
import { onvifDevices$, terminateProbe } from 'camera-probe'

onvifDevices$.subscribe(console.log)

// be sure to close the socket connection when complete with your query
// This is a tad awkward until a better solution to stopping the inner observables is achieved.
terminateProbe()

// results
[ { name: 'Amcrest',
hardware: 'IP2M-8200',
Expand Down
55 changes: 32 additions & 23 deletions src/core/probe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createSocket, RemoteInfo } from 'dgram'
import { Strings, Numbers, IProbeConfig, DEFAULT_PROBE_CONFIG } from './interfaces'
import { Observable, Observer, fromEvent, timer } from 'rxjs'
import { first, shareReplay, map, distinctUntilChanged, mapTo, takeWhile, takeUntil, scan } from 'rxjs/operators'
import { shareReplay, map, distinctUntilChanged, mapTo, takeUntil, scan } from 'rxjs/operators'

type IMessage = readonly [Buffer, RemoteInfo]
type TimestampMessages = readonly TimestampedMessage[]
Expand All @@ -18,7 +18,7 @@ const distinctUntilObjectChanged = <T>(source: Observable<T>) => source.pipe(dis
const keys1 = Object.keys(a)
const keys2 = Object.keys(b)

return keys1.length === keys2.length &&
return keys1.length === keys2.length &&
keys1.reduce((acc: boolean, curr) => acc === false ? false : keys2.includes(curr) as boolean, true)
}))

Expand All @@ -41,25 +41,34 @@ export const flattenBuffersWithInfo =

export const probe =
(config?: Partial<IProbeConfig>) =>
(messages: Strings): Observable<Strings> =>
Observable.create((obs: Observer<Strings>) => {
const cfg = { ...DEFAULT_PROBE_CONFIG, ...(config || {}) }
const socket = createSocket({ type: 'udp4' })
const socketClosed$ = fromEvent<void>(socket, 'close').pipe(first(), shareReplay(1))
const socketMessages$ = fromEvent<IMessage>(socket, 'message').pipe(map(a => a[0]), shareReplay(1))
(messages: Strings) =>
(until: Observable<any>): Observable<Strings> =>
Observable.create((obs: Observer<Strings>) => {
const cfg = { ...DEFAULT_PROBE_CONFIG, ...(config || {}) }
const socket = createSocket({ type: 'udp4' })
const socketMessages$ = fromEvent<IMessage>(socket, 'message').pipe(map(a => a[0]), shareReplay(1))

socket.on('err', err => obs.error(err))
socket.on('close', () => obs.complete())

timer(0, cfg.PROBE_REQUEST_SAMPLE_RATE_MS).pipe(
mapTo(flattenBuffersWithInfo(cfg.PORTS)(cfg.MULTICAST_ADDRESS)(messages.map(mapStringToBuffer))),
takeUntil(until))
.subscribe(bfrPorts => {
bfrPorts.forEach(mdl => socket.send(mdl.buffer, 0, mdl.buffer.length, mdl.port, mdl.address))
})

timer(0, cfg.PROBE_REQUEST_SAMPLE_RATE_MS).pipe(
mapTo(flattenBuffersWithInfo(cfg.PORTS)(cfg.MULTICAST_ADDRESS)(messages.map(mapStringToBuffer))),
takeWhile(() => !obs.closed),
takeUntil(socketClosed$))
.subscribe(bfrPorts => bfrPorts.forEach(mdl => socket.send(mdl.buffer, 0, mdl.buffer.length, mdl.port, mdl.address)))

socketMessages$.pipe(
timestamp,
accumulateFreshMessages(cfg.PROBE_RESPONSE_FALLOUT_MS),
mapStrToDictionary(cfg.RESULT_DEDUPE_FN),
distinctUntilObjectChanged,
toArrayOfValues,
flattenDocumentStrings
).subscribe(msg => obs.next(msg))
})
socketMessages$.pipe(
timestamp,
accumulateFreshMessages(cfg.PROBE_RESPONSE_FALLOUT_MS),
mapStrToDictionary(cfg.RESULT_DEDUPE_FN),
distinctUntilObjectChanged,
toArrayOfValues,
flattenDocumentStrings,
takeUntil(until)
).subscribe(msg => obs.next(msg), undefined, () => {
setTimeout(() => {
socket.close()
}, 1000)
})
})
9 changes: 7 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@

import { map, shareReplay } from 'rxjs/operators'
import { onvifProbe } from './onvif/onvif-probe'
import { Subject } from 'rxjs'

export * from './onvif/device'

export const onvifProbe$ = onvifProbe().pipe(shareReplay(1))
const end = new Subject()
const end$ = end.asObservable()

export const onvifProbe$ = onvifProbe()(end$).pipe(shareReplay(1))
export const onvifDevices$ = onvifProbe$.pipe(map(a => a.map(b => b.device)))
export const onvifResponses$ = onvifProbe$.pipe(map(a => a.map(b => b.raw)))

export const terminateProbe = () => end.next()

export const cli = () => {
return onvifDevices$
.subscribe(res => {
Expand All @@ -25,7 +31,6 @@ export const cli = () => {
}))
})
}

// interface IReponse {
// devices: [
// {
Expand Down
9 changes: 7 additions & 2 deletions src/onvif/onvif-probe.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createSocket } from 'dgram'
import { onvifProbe } from './onvif-probe'
import { Subject } from 'rxjs'

const ipcam = '<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://www.w3.org/2003/05/soap-envelope" xmlns:SOAP-ENC="http://www.w3.org/2003/05/soap-encoding" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsdd="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:vfdis="http://www.onvif.org/ver10/network/wsdl/RemoteDiscoveryBinding" xmlns:vfdis2="http://www.onvif.org/ver10/network/wsdl/DiscoveryLookupBinding" xmlns:tdn="http://www.onvif.org/ver10/network/wsdl"><SOAP-ENV:Header><wsa:MessageID>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</wsa:MessageID><wsa:RelatesTo>uuid:NetworkVideoTransmitter</wsa:RelatesTo><wsa:ReplyTo SOAP-ENV:mustUnderstand="true"><wsa:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:Address></wsa:ReplyTo><wsa:To SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:To><wsa:Action SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</wsa:Action></SOAP-ENV:Header><SOAP-ENV:Body><wsdd:ProbeMatches><wsdd:ProbeMatch><wsa:EndpointReference><wsa:Address>urn:uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</wsa:Address><wsa:ReferenceProperties></wsa:ReferenceProperties><wsa:ReferenceParameters></wsa:ReferenceParameters><wsa:PortType>ttl</wsa:PortType></wsa:EndpointReference><wsdd:Types>tdn:4655721b-4e0e-4296-ba0b-3180423b5b0c</wsdd:Types><wsdd:Scopes>onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/Model/631GA onvif://www.onvif.org/Name/IPCAM onvif://www.onvif.org/location/country/china</wsdd:Scopes><wsdd:XAddrs>http://192.168.1.1:80/onvif/device_service</wsdd:XAddrs><wsdd:MetadataVersion>1</wsdd:MetadataVersion></wsdd:ProbeMatch></wsdd:ProbeMatches></SOAP-ENV:Body></SOAP-ENV:Envelope>'
const amcrest = '<?xml version="1.0" encoding="utf-8" standalone="yes" ?><s:Envelope xmlns:sc="http://www.w3.org/2003/05/soap-encoding" xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:dn="http://www.onvif.org/ver10/network/wsdl" xmlns:tds="http://www.onvif.org/ver10/device/wsdl" xmlns:d="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"><s:Header><a:MessageID>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</a:MessageID><a:To>urn:schemas-xmlsoap-org:ws:2005:04:discovery</a:To><a:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</a:Action><a:RelatesTo>uuid:Device</a:RelatesTo></s:Header><s:Body><d:ProbeMatches><d:ProbeMatch><a:EndpointReference><a:Address>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</a:Address></a:EndpointReference><d:Types>dn:NetworkVideoTransmitter tds:Device</d:Types><d:Scopes>onvif://www.onvif.org/location/country/china onvif://www.onvif.org/name/Amcrest onvif://www.onvif.org/hardware/IP2M-841B onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/type/Network_Video_Transmitter onvif://www.onvif.org/extension/unique_identifier</d:Scopes><d:XAddrs>http://192.168.1.235/onvif/device_service</d:XAddrs><d:MetadataVersion>1</d:MetadataVersion></d:ProbeMatch></d:ProbeMatches></s:Body></s:Envelope>'
Expand All @@ -25,9 +26,10 @@ const config = (port: number) => {
describe('onvif-probe', () => {
it('should handle IPCAM - 631GA', done => {
const port = 41241
const end = new Subject()

initTestServer(port)(ipcam)
onvifProbe(config(port))
onvifProbe(config(port))(end)
.subscribe(res => {
expect(res[0].device).toEqual({
name: 'IPCAM',
Expand All @@ -46,15 +48,17 @@ describe('onvif-probe', () => {
profiles: ['Streaming'],
xaddrs: ['http://192.168.1.1:80/onvif/device_service']
})
end.next()
done()
})
})

it('should handle AMCREST - IP2M-841B', done => {
const port = 41242
const end = new Subject()

initTestServer(port)(amcrest)
onvifProbe(config(port))
onvifProbe(config(port))(end)
.subscribe(res => {
expect(res[0].device).toEqual({
name: 'Amcrest',
Expand All @@ -74,6 +78,7 @@ describe('onvif-probe', () => {
profiles: ['Streaming'],
xaddrs: ['http://192.168.1.235/onvif/device_service']
})
end.next()
done()
})
})
Expand Down
4 changes: 2 additions & 2 deletions src/onvif/onvif-probe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ export type IOnvifProbeResponseModels = readonly IOnvifProbeResponseModel[]
export type IOnvifProbeResponse = Observable<IOnvifProbeResponseModels>

export const onvifProbe =
(config?: Partial<IWsProbeConfig>): IOnvifProbeResponse =>
wsProbe(config)
(config?: Partial<IWsProbeConfig>) => (until: Observable<any>): IOnvifProbeResponse =>
wsProbe(config)(until)
.pipe(map(res => res.map(a => {
return {
...a,
Expand Down
4 changes: 3 additions & 1 deletion src/upnp/upnp-probe.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createSocket } from 'dgram'
import { upnpProbe } from './upnp-probe'
import { Subject } from 'rxjs'

const initTestServer = (port: number) => {
const server = createSocket('udp4')
Expand All @@ -12,8 +13,9 @@ const initTestServer = (port: number) => {
describe.skip('upnp probe', () => {
it.skip('ddddd', done => {
const port = 1900
const end = new Subject()

upnpProbe({ PORTS: [port] })
upnpProbe({ PORTS: [port] })(end)
.subscribe(res => {
console.log(res)
})
Expand Down
6 changes: 5 additions & 1 deletion src/ws-discovery/ws-probe.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createSocket } from 'dgram'
import { wsProbe } from './ws-probe'
import { Subject } from 'rxjs'

const wsXmlResponse = '<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://www.w3.org/2003/05/soap-envelope" xmlns:SOAP-ENC="http://www.w3.org/2003/05/soap-encoding" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsdd="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:vfdis="http://www.onvif.org/ver10/network/wsdl/RemoteDiscoveryBinding" xmlns:vfdis2="http://www.onvif.org/ver10/network/wsdl/DiscoveryLookupBinding" xmlns:tdn="http://www.onvif.org/ver10/network/wsdl"><SOAP-ENV:Header><wsa:MessageID>uuid:2709d68a-7dc1-61c2-a205-X3018101811662</wsa:MessageID><wsa:RelatesTo>uuid:NetworkVideoTransmitter</wsa:RelatesTo><wsa:ReplyTo SOAP-ENV:mustUnderstand="true"><wsa:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:Address></wsa:ReplyTo><wsa:To SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:To><wsa:Action SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</wsa:Action></SOAP-ENV:Header><SOAP-ENV:Body><wsdd:ProbeMatches><wsdd:ProbeMatch><wsa:EndpointReference><wsa:Address>urn:uuid:2419d68a-2dd2-21b2-a205-X2018101811779</wsa:Address><wsa:ReferenceProperties></wsa:ReferenceProperties><wsa:ReferenceParameters></wsa:ReferenceParameters><wsa:PortType>ttl</wsa:PortType></wsa:EndpointReference><wsdd:Types>tdn:4655721b-4e0e-4296-ba0b-3180423b5b0c</wsdd:Types><wsdd:Scopes>onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/Model/631GA onvif://www.onvif.org/Name/IPCAM onvif://www.onvif.org/location/country/china</wsdd:Scopes><wsdd:XAddrs>http://192.168.1.1:80/onvif/device_service</wsdd:XAddrs><wsdd:MetadataVersion>1</wsdd:MetadataVersion></wsdd:ProbeMatch></wsdd:ProbeMatches></SOAP-ENV:Body></SOAP-ENV:Envelope>'

Expand All @@ -24,14 +25,17 @@ const config = (port: number) => {
describe('ws probe', () => {
it('should probe basic, distinct', done => {
const port = 41251
const end = new Subject()
const end$ = end.asObservable()

initTestServer(port)
wsProbe(config(port))
wsProbe(config(port))(end$)
.subscribe(res => {
const res1 = res[0]
expect(res.length).toEqual(1)
expect(res1).toBeDefined()
expect(res1.doc).toBeDefined()
end.next()
done()
})
})
Expand Down
5 changes: 3 additions & 2 deletions src/ws-discovery/ws-probe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import { map } from 'rxjs/operators'
import { probe } from '../core/probe'
import { IWsProbeConfig, IWsResponse } from './ws-probe.interfaces'
import { DEFAULT_WS_PROBE_CONFIG } from './config'
import { Observable } from 'rxjs'

const mapDeviceStrToPayload = (str: string) => generateWsDiscoveryProbePayload(str)(generateGuid())
const mapDevicesToPayloads = (devices: readonly string[]) => devices.map(mapDeviceStrToPayload)

export const wsProbe =
(config?: Partial<IWsProbeConfig>): IWsResponse => {
(config?: Partial<IWsProbeConfig>) => (until: Observable<any>): IWsResponse => {
const cfg = { ...DEFAULT_WS_PROBE_CONFIG, ...config } as IWsProbeConfig
return probe(cfg)(mapDevicesToPayloads(cfg.DEVICES))
return probe(cfg)(mapDevicesToPayloads(cfg.DEVICES))(until)
.pipe(map(b => {
return b.map(raw => {
return {
Expand Down

0 comments on commit 6f9b7c7

Please sign in to comment.