Skip to content

Commit

Permalink
feat: automatic dispose of probe on unsubscribe (#40)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: removes terminateProbe function and closes subscription via rxjs unsubscribe function. closes #39
  • Loading branch information
patrickmichalina authored Nov 10, 2019
1 parent 4fffeac commit c6a0f3f
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 37 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ $ camera-probe

## Programmatic Usage
```js
import { onvifDevices$, terminateProbe } from 'camera-probe'
import { onvifDevices$ } from 'camera-probe'
import { takeUntil } from 'rxjs/operators'

onvifDevices$().subscribe(console.log)
const subscription = onvifDevices$().subscribe(console.log)

// be sure to close the socket connection when complete with your query
// This is a tad awkward until a better solution to stopping the inner observables is achieved.
terminateProbe()
// by unsubscribing from the observable.
subscription.unsubscribe()

// or using an rxjs operator like take
onvifDevices$().pipe(takeUntil(someObservaleFires)).subscribe(console.log)

// results
[ { name: 'Amcrest',
Expand Down
22 changes: 12 additions & 10 deletions src/core/probe.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createSocket, RemoteInfo } from 'dgram'
import { Strings, Numbers, IProbeConfig, DEFAULT_PROBE_CONFIG } from './interfaces'
import { Observable, Observer, fromEvent, timer } from 'rxjs'
import { Observable, Observer, fromEvent, timer, Subject } from 'rxjs'
import { shareReplay, map, distinctUntilChanged, mapTo, takeUntil, scan } from 'rxjs/operators'

type IMessage = readonly [Buffer, RemoteInfo]
Expand Down Expand Up @@ -41,19 +41,19 @@ export const flattenBuffersWithInfo =

export const probe =
(config?: Partial<IProbeConfig>) =>
(messages: Strings) =>
(until: Observable<any>): Observable<Strings> =>
(messages: Strings): Observable<Strings> =>
Observable.create((obs: Observer<Strings>) => {
const cfg = { ...DEFAULT_PROBE_CONFIG, ...(config || {}) }
const socket = createSocket({ type: 'udp4' })
const socketMessages$ = fromEvent<IMessage>(socket, 'message').pipe(map(a => a[0]), shareReplay(1))
const internalLimit = new Subject()

socket.on('err', err => obs.error(err))
socket.on('close', () => obs.complete())

timer(0, cfg.PROBE_REQUEST_SAMPLE_RATE_MS).pipe(
mapTo(flattenBuffersWithInfo(cfg.PORTS)(cfg.MULTICAST_ADDRESS)(messages.map(mapStringToBuffer))),
takeUntil(until))
takeUntil(internalLimit))
.subscribe(bfrPorts => {
bfrPorts.forEach(mdl => socket.send(mdl.buffer, 0, mdl.buffer.length, mdl.port, mdl.address))
})
Expand All @@ -65,10 +65,12 @@ export const probe =
distinctUntilObjectChanged,
toArrayOfValues,
flattenDocumentStrings,
takeUntil(until)
).subscribe(msg => obs.next(msg), undefined, () => {
setTimeout(() => {
socket.close()
}, 1000)
})
takeUntil(internalLimit)
).subscribe(msg => obs.next(msg))

return function unsubscribe() {
internalLimit.next()
internalLimit.complete()
socket.close()
}
})
9 changes: 2 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@

import { map, shareReplay } from 'rxjs/operators'
import { map, share } from 'rxjs/operators'
import { onvifProbe } from './onvif/onvif-probe'
import { Subject } from 'rxjs'

export * from './onvif/device'

const end = new Subject()
const end$ = end.asObservable()

export const terminateProbe = () => end.next()
export const onvifProbe$ = () => onvifProbe()(end$).pipe(shareReplay(1))
export const onvifProbe$ = () => onvifProbe().pipe(share())
export const onvifDevices$ = () => onvifProbe$().pipe(map(a => a.map(b => b.device)))
export const onvifResponses$ = () => onvifProbe$().pipe(map(a => a.map(b => b.raw)))

Expand Down
12 changes: 5 additions & 7 deletions src/onvif/onvif-probe.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createSocket } from 'dgram'
import { onvifProbe } from './onvif-probe'
import { Subject } from 'rxjs'
import { take } from 'rxjs/operators'

const ipcam = '<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://www.w3.org/2003/05/soap-envelope" xmlns:SOAP-ENC="http://www.w3.org/2003/05/soap-encoding" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsdd="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:vfdis="http://www.onvif.org/ver10/network/wsdl/RemoteDiscoveryBinding" xmlns:vfdis2="http://www.onvif.org/ver10/network/wsdl/DiscoveryLookupBinding" xmlns:tdn="http://www.onvif.org/ver10/network/wsdl"><SOAP-ENV:Header><wsa:MessageID>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</wsa:MessageID><wsa:RelatesTo>uuid:NetworkVideoTransmitter</wsa:RelatesTo><wsa:ReplyTo SOAP-ENV:mustUnderstand="true"><wsa:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:Address></wsa:ReplyTo><wsa:To SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:To><wsa:Action SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</wsa:Action></SOAP-ENV:Header><SOAP-ENV:Body><wsdd:ProbeMatches><wsdd:ProbeMatch><wsa:EndpointReference><wsa:Address>urn:uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</wsa:Address><wsa:ReferenceProperties></wsa:ReferenceProperties><wsa:ReferenceParameters></wsa:ReferenceParameters><wsa:PortType>ttl</wsa:PortType></wsa:EndpointReference><wsdd:Types>tdn:4655721b-4e0e-4296-ba0b-3180423b5b0c</wsdd:Types><wsdd:Scopes>onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/Model/631GA onvif://www.onvif.org/Name/IPCAM onvif://www.onvif.org/location/country/china</wsdd:Scopes><wsdd:XAddrs>http://192.168.1.1:80/onvif/device_service</wsdd:XAddrs><wsdd:MetadataVersion>1</wsdd:MetadataVersion></wsdd:ProbeMatch></wsdd:ProbeMatches></SOAP-ENV:Body></SOAP-ENV:Envelope>'
const amcrest = '<?xml version="1.0" encoding="utf-8" standalone="yes" ?><s:Envelope xmlns:sc="http://www.w3.org/2003/05/soap-encoding" xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:dn="http://www.onvif.org/ver10/network/wsdl" xmlns:tds="http://www.onvif.org/ver10/device/wsdl" xmlns:d="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"><s:Header><a:MessageID>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</a:MessageID><a:To>urn:schemas-xmlsoap-org:ws:2005:04:discovery</a:To><a:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</a:Action><a:RelatesTo>uuid:Device</a:RelatesTo></s:Header><s:Body><d:ProbeMatches><d:ProbeMatch><a:EndpointReference><a:Address>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</a:Address></a:EndpointReference><d:Types>dn:NetworkVideoTransmitter tds:Device</d:Types><d:Scopes>onvif://www.onvif.org/location/country/china onvif://www.onvif.org/name/Amcrest onvif://www.onvif.org/hardware/IP2M-841B onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/type/Network_Video_Transmitter onvif://www.onvif.org/extension/unique_identifier</d:Scopes><d:XAddrs>http://192.168.1.235/onvif/device_service</d:XAddrs><d:MetadataVersion>1</d:MetadataVersion></d:ProbeMatch></d:ProbeMatches></s:Body></s:Envelope>'
Expand All @@ -26,10 +26,10 @@ const config = (port: number) => {
describe('onvif-probe', () => {
it('should handle IPCAM - 631GA', done => {
const port = 41241
const end = new Subject()

initTestServer(port)(ipcam)
onvifProbe(config(port))(end)
onvifProbe(config(port))
.pipe(take(1))
.subscribe(res => {
expect(res[0].device).toEqual({
name: 'IPCAM',
Expand All @@ -48,17 +48,16 @@ describe('onvif-probe', () => {
profiles: ['Streaming'],
xaddrs: ['http://192.168.1.1:80/onvif/device_service']
})
end.next()
done()
})
})

it('should handle AMCREST - IP2M-841B', done => {
const port = 41242
const end = new Subject()

initTestServer(port)(amcrest)
onvifProbe(config(port))(end)
onvifProbe(config(port))
.pipe(take(1))
.subscribe(res => {
expect(res[0].device).toEqual({
name: 'Amcrest',
Expand All @@ -78,7 +77,6 @@ describe('onvif-probe', () => {
profiles: ['Streaming'],
xaddrs: ['http://192.168.1.235/onvif/device_service']
})
end.next()
done()
})
})
Expand Down
4 changes: 2 additions & 2 deletions src/onvif/onvif-probe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ export type IOnvifProbeResponseModels = readonly IOnvifProbeResponseModel[]
export type IOnvifProbeResponse = Observable<IOnvifProbeResponseModels>

export const onvifProbe =
(config?: Partial<IWsProbeConfig>) => (until: Observable<any>): IOnvifProbeResponse =>
wsProbe(config)(until)
(config?: Partial<IWsProbeConfig>): IOnvifProbeResponse =>
wsProbe(config)
.pipe(map(res => res.map(a => {
return {
...a,
Expand Down
4 changes: 1 addition & 3 deletions src/upnp/upnp-probe.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { createSocket } from 'dgram'
import { upnpProbe } from './upnp-probe'
import { Subject } from 'rxjs'

const initTestServer = (port: number) => {
const server = createSocket('udp4')
Expand All @@ -13,9 +12,8 @@ const initTestServer = (port: number) => {
describe.skip('upnp probe', () => {
it.skip('ddddd', done => {
const port = 1900
const end = new Subject()

upnpProbe({ PORTS: [port] })(end)
upnpProbe({ PORTS: [port] })
.subscribe(res => {
console.log(res)
})
Expand Down
2 changes: 1 addition & 1 deletion src/ws-discovery/ws-probe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('ws probe', () => {
const end$ = end.asObservable()

initTestServer(port)
wsProbe(config(port))(end$)
wsProbe(config(port))
.subscribe(res => {
const res1 = res[0]
expect(res.length).toEqual(1)
Expand Down
5 changes: 2 additions & 3 deletions src/ws-discovery/ws-probe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import { map } from 'rxjs/operators'
import { probe } from '../core/probe'
import { IWsProbeConfig, IWsResponse } from './ws-probe.interfaces'
import { DEFAULT_WS_PROBE_CONFIG } from './config'
import { Observable } from 'rxjs'

const mapDeviceStrToPayload = (str: string) => generateWsDiscoveryProbePayload(str)(generateGuid())
const mapDevicesToPayloads = (devices: readonly string[]) => devices.map(mapDeviceStrToPayload)

export const wsProbe =
(config?: Partial<IWsProbeConfig>) => (until: Observable<any>): IWsResponse => {
(config?: Partial<IWsProbeConfig>): IWsResponse => {
const cfg = { ...DEFAULT_WS_PROBE_CONFIG, ...config } as IWsProbeConfig
return probe(cfg)(mapDevicesToPayloads(cfg.DEVICES))(until)
return probe(cfg)(mapDevicesToPayloads(cfg.DEVICES))
.pipe(map(b => {
return b.map(raw => {
return {
Expand Down

0 comments on commit c6a0f3f

Please sign in to comment.