Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(LSG): get rid of async in handlers; extract shared logic to base classes #1367

Open
wants to merge 3 commits into
base: release53
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions packages/live-status-gateway/src/collectionBase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { CorelibPubSubCollections, CorelibPubSubTypes } from '@sofie-automation/corelib/dist/pubsub'
import {
StudioId,
CoreConnection,
ProtectedString,
Collection as CoreCollection,
CollectionDocCheck,
} from '@sofie-automation/server-core-integration'
import throttleToNextTick from '@sofie-automation/shared-lib/dist/lib/throttleToNextTick'
import * as _ from 'underscore'
import { Logger } from 'winston'
import { CoreHandler } from './coreHandler'
import { arePropertiesShallowEqual } from './helpers/equality'
import { CollectionHandlers } from './liveStatusServer'
import { ObserverCallback } from './wsHandler'

export const DEFAULT_THROTTLE_PERIOD_MS = 20

export abstract class CollectionBase<T, TCollection extends keyof CorelibPubSubCollections> {
protected _name: string
protected _collectionName: TCollection
protected _logger: Logger
protected _coreHandler: CoreHandler
protected _studioId!: StudioId
protected _observers: Map<
ObserverCallback<T, keyof T>,
{ keysToPick: readonly (keyof T)[] | undefined; lastData: T | undefined }
> = new Map()
protected _collectionData: T | undefined

protected get _core(): CoreConnection<CorelibPubSubTypes, CorelibPubSubCollections> {
return this._coreHandler.core
}
protected throttledChanged: () => void

constructor(
collection: TCollection,
logger: Logger,
coreHandler: CoreHandler,
throttlePeriodMs = DEFAULT_THROTTLE_PERIOD_MS
) {
this._name = this.constructor.name
this._collectionName = collection
this._logger = logger
this._coreHandler = coreHandler

this.throttledChanged = throttleToNextTick(
throttlePeriodMs > 0
? _.throttle(() => this.changed(), throttlePeriodMs, { leading: true, trailing: true })
: () => this.changed()
)

this._logger.info(`Starting ${this._name} handler`)
}

init(_handlers: CollectionHandlers): void {
if (!this._coreHandler.studioId) throw new Error('StudioId is not defined')
this._studioId = this._coreHandler.studioId
}

close(): void {
this._logger.info(`Closing ${this._name} handler`)
}

subscribe<K extends keyof T>(callback: ObserverCallback<T, K>, keysToPick?: readonly K[]): void {
//this._logger.info(`${name}' added observer for '${this._name}'`)
if (this._collectionData) callback(this._collectionData)
this._observers.set(callback, { keysToPick, lastData: this.shallowClone(this._collectionData) })
}

/**
* Called after a batch of updates to documents in the collection
*/
protected changed(): void {
// override me
}

notify(data: T | undefined): void {
for (const [observer, o] of this._observers) {
if (
!o.lastData ||
!o.keysToPick ||
!data ||
!arePropertiesShallowEqual(o.lastData, data, undefined, o.keysToPick)
) {
observer(data)
o.lastData = this.shallowClone(data)
}
}
}

protected shallowClone(data: T | undefined): T | undefined {
if (data === undefined) return undefined
if (Array.isArray(data)) return [...data] as T
if (typeof data === 'object') return { ...data }
return data
}

protected logDocumentChange(documentId: string | ProtectedString<any>, changeType: string): void {
this._logger.silly(`${this._name} ${changeType} ${documentId}`)
}

protected logUpdateReceived(collectionName: string, updateCount: number | undefined): void
protected logUpdateReceived(collectionName: string, extraInfo?: string): void
protected logUpdateReceived(
collectionName: string,
extraInfoOrUpdateCount: string | number | undefined | null = null
): void {
let message = `${this._name} received ${collectionName} update`
if (typeof extraInfoOrUpdateCount === 'string') {
message += `, ${extraInfoOrUpdateCount}`
} else if (extraInfoOrUpdateCount !== null) {
message += `(${extraInfoOrUpdateCount})`
}
this._logger.debug(message)
}

protected logNotifyingUpdate(updateCount: number | undefined): void {
this._logger.debug(`${this._name} notifying update with ${updateCount} ${this._collectionName}`)
}

protected getCollectionOrFail(): CoreCollection<CollectionDocCheck<CorelibPubSubCollections[TCollection]>> {
const collection = this._core.getCollection<TCollection>(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
return collection
}
}
Original file line number Diff line number Diff line change
@@ -1,79 +1,58 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { Collection } from '../wsHandler'
import { PublicationCollection } from '../publicationCollection'
import { AdLibAction } from '@sofie-automation/corelib/dist/dataModel/AdlibAction'
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { AdLibActionId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'
import { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
import { CollectionHandlers } from '../liveStatusServer'
import { PickKeys } from '@sofie-automation/shared-lib/dist/lib/types'

const PLAYLIST_KEYS = ['currentPartInfo', 'nextPartInfo'] as const
type Playlist = PickKeys<DBRundownPlaylist, typeof PLAYLIST_KEYS>

export class AdLibActionsHandler
extends CollectionBase<AdLibAction[], CorelibPubSub.adLibActions, CollectionName.AdLibActions>
implements Collection<AdLibAction[]>, CollectionObserver<SelectedPartInstances>
extends PublicationCollection<AdLibAction[], CorelibPubSub.adLibActions, CollectionName.AdLibActions>
implements Collection<AdLibAction[]>
{
public observerName: string
private _curRundownId: RundownId | undefined
private _curPartInstance: DBPartInstance | undefined
private _currentRundownId: RundownId | undefined

constructor(logger: Logger, coreHandler: CoreHandler) {
super(AdLibActionsHandler.name, CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler)
this.observerName = this._name
super(CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler)
}

init(handlers: CollectionHandlers): void {
super.init(handlers)

handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS)
}

async changed(id: AdLibActionId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._curRundownId })
await this.notify(this._collectionData)
protected changed(): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This member is protected, while a similar member in globalAdLibActionsHandler isn't. Would it make sense to standarize them?

this.updateAndNotify()
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
const prevRundownId = this._curRundownId
this._curPartInstance = data ? data.current ?? data.next : undefined
this._curRundownId = this._curPartInstance ? this._curPartInstance.rundownId : undefined
private onPlaylistUpdate = (data: Playlist | undefined): void => {
this.logUpdateReceived('playlist')
const prevRundownId = this._currentRundownId

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._curRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._curRundownId && this._curPartInstance) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._curRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}
const rundownPlaylist = data

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({
rundownId: this._curRundownId,
})
await this.notify(this._collectionData)
this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId

if (prevRundownId !== this._currentRundownId) {
this.stopSubscription()
if (this._currentRundownId) {
this.setupSubscription([this._currentRundownId])
}
// no need to trigger updateAndNotify() because the subscription will take care of this
}
}

// override notify to implement empty array handling
async notify(data: AdLibAction[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
private updateAndNotify(): void {
const col = this.getCollectionOrFail()
this._collectionData = col.find({ rundownId: this._currentRundownId })
this.notify(this._collectionData)
}
}
89 changes: 33 additions & 56 deletions packages/live-status-gateway/src/collections/adLibsHandler.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,57 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { Collection } from '../wsHandler'
import { PublicationCollection } from '../publicationCollection'
import { AdLibPiece } from '@sofie-automation/corelib/dist/dataModel/AdLibPiece'
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { PieceId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'
import { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
import { CollectionHandlers } from '../liveStatusServer'
import { PickKeys } from '@sofie-automation/shared-lib/dist/lib/types'

const PLAYLIST_KEYS = ['currentPartInfo', 'nextPartInfo'] as const
type Playlist = PickKeys<DBRundownPlaylist, typeof PLAYLIST_KEYS>

export class AdLibsHandler
extends CollectionBase<AdLibPiece[], CorelibPubSub.adLibPieces, CollectionName.AdLibPieces>
implements Collection<AdLibPiece[]>, CollectionObserver<SelectedPartInstances>
extends PublicationCollection<AdLibPiece[], CorelibPubSub.adLibPieces, CollectionName.AdLibPieces>
implements Collection<AdLibPiece[]>
{
public observerName: string
// private _core: CoreConnection
private _currentRundownId: RundownId | undefined
private _currentPartInstance: DBPartInstance | undefined

constructor(logger: Logger, coreHandler: CoreHandler) {
super(AdLibsHandler.name, CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler)
this.observerName = this._name
super(CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler)
}

init(handlers: CollectionHandlers): void {
super.init(handlers)

handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS)
}

async changed(id: PieceId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._currentRundownId })
await this.notify(this._collectionData)
protected changed(): void {
this.updateAndNotify()
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
private onPlaylistUpdate = (data: Playlist | undefined): void => {
this.logUpdateReceived('playlist')
const prevRundownId = this._currentRundownId
this._currentPartInstance = data ? data.current ?? data.next : undefined
this._currentRundownId = this._currentPartInstance?.rundownId
const rundownPlaylist = data

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._currentRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._currentRundownId && this._currentPartInstance) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._currentRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}
this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({
rundownId: this._currentRundownId,
})
await this.notify(this._collectionData)
if (prevRundownId !== this._currentRundownId) {
this.stopSubscription()
if (this._currentRundownId) {
this.setupSubscription([this._currentRundownId])
}
// no need to trigger updateAndNotify() because the subscription will take care of this
}
}

// override notify to implement empty array handling
async notify(data: AdLibPiece[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
private updateAndNotify(): void {
const collection = this.getCollectionOrFail()
this._collectionData = collection.find({ rundownId: this._currentRundownId })
this.notify(this._collectionData)
}
}
Loading
Loading