From f559621d6b4fac76c397a0d7c751f59de7269419 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Wed, 8 Jan 2025 15:33:18 +0000 Subject: [PATCH 01/11] feat: rework mos statuses flow SOFIE-97 --- .github/workflows/node.yaml | 9 +- meteor/server/collections/collection.ts | 3 +- .../implementations/asyncCollection.ts | 7 +- meteor/server/publications/_publications.ts | 1 + .../ingestStatus/createIngestRundownStatus.ts | 134 +++++ .../publications/ingestStatus/publication.ts | 195 ++++++++ .../ingestStatus/reactiveContentCache.ts | 78 +++ .../ingestStatus/rundownContentObserver.ts | 150 ++++++ .../lib/__tests__/rundownsObserver.test.ts | 67 ++- .../publications/lib/rundownsObserver.ts | 38 +- .../partInstancesUI/publication.ts | 106 ++-- .../publications/partsUI/publication.ts | 98 ++-- .../rundown/publication.ts | 160 +++--- .../segmentPartNotesUI/publication.ts | 86 ++-- .../src/documents/part.ts | 6 + packages/corelib/src/dataModel/Rundown.ts | 3 - packages/corelib/src/worker/events.ts | 8 - .../job-worker/src/blueprints/context/lib.ts | 4 + packages/job-worker/src/events/handle.ts | 134 +---- .../model/implementation/IngestModelImpl.ts | 1 - .../src/playout/activePlaylistActions.ts | 2 - .../src/playout/model/PlayoutModel.ts | 7 - .../model/implementation/PlayoutModelImpl.ts | 26 - packages/job-worker/src/playout/snapshot.ts | 1 - packages/job-worker/src/playout/take.ts | 4 +- .../job-worker/src/workers/events/jobs.ts | 7 +- .../mos-gateway/src/$schemas/devices.json | 56 ++- .../mos-gateway/src/CoreMosDeviceHandler.ts | 44 +- packages/mos-gateway/src/connector.ts | 39 +- packages/mos-gateway/src/coreHandler.ts | 17 +- packages/mos-gateway/src/mosHandler.ts | 462 ++++++++++-------- .../src/mosStatus/__tests__/diff.spec.ts | 317 ++++++++++++ packages/mos-gateway/src/mosStatus/diff.ts | 161 ++++++ packages/mos-gateway/src/mosStatus/handler.ts | 163 ++++++ packages/mos-gateway/src/versions.ts | 2 +- .../src/generated/MosGatewayDevicesTypes.ts | 6 + .../shared-lib/src/ingest/rundownStatus.ts | 42 ++ .../shared-lib/src/pubsub/peripheralDevice.ts | 17 + 38 files changed, 1932 insertions(+), 729 deletions(-) create mode 100644 meteor/server/publications/ingestStatus/createIngestRundownStatus.ts create mode 100644 meteor/server/publications/ingestStatus/publication.ts create mode 100644 meteor/server/publications/ingestStatus/reactiveContentCache.ts create mode 100644 meteor/server/publications/ingestStatus/rundownContentObserver.ts create mode 100644 packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts create mode 100644 packages/mos-gateway/src/mosStatus/diff.ts create mode 100644 packages/mos-gateway/src/mosStatus/handler.ts create mode 100644 packages/shared-lib/src/ingest/rundownStatus.ts diff --git a/.github/workflows/node.yaml b/.github/workflows/node.yaml index 289f175634..d59a0e7ab3 100644 --- a/.github/workflows/node.yaml +++ b/.github/workflows/node.yaml @@ -500,16 +500,19 @@ jobs: - node-version: 22.x package-name: job-worker send-coverage: true - # No tests for the gateways yet + # No tests for some gateways yet # - node-version: 22.x # package-name: playout-gateway - # - node-version: 22.x - # package-name: mos-gateway + # send-coverage: true + - node-version: 22.x + package-name: mos-gateway + send-coverage: true - node-version: 22.x package-name: live-status-gateway send-coverage: true - node-version: 22.x package-name: webui + send-coverage: true # manual meteor-lib as it only needs a couple of versions - node-version: 22.x package-name: meteor-lib diff --git a/meteor/server/collections/collection.ts b/meteor/server/collections/collection.ts index e0617c77e7..393ab90353 100644 --- a/meteor/server/collections/collection.ts +++ b/meteor/server/collections/collection.ts @@ -246,7 +246,8 @@ export interface AsyncOnlyReadOnlyMongoCollection | DBInterface['_id'], callbacks: PromisifyCallbacks>, - options?: FindOptions + findOptions?: FindOptions, + callbackOptions?: { nonMutatingCallbacks?: boolean | undefined } ): Promise /** diff --git a/meteor/server/collections/implementations/asyncCollection.ts b/meteor/server/collections/implementations/asyncCollection.ts index 52bb47eca6..7a4349c26a 100644 --- a/meteor/server/collections/implementations/asyncCollection.ts +++ b/meteor/server/collections/implementations/asyncCollection.ts @@ -141,7 +141,8 @@ export class WrappedAsyncMongoCollection | DBInterface['_id'], callbacks: PromisifyCallbacks>, - options?: FindOptions + findOptions?: FindOptions, + callbackOptions?: { nonMutatingCallbacks?: boolean | undefined } ): Promise { const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`) if (span) { @@ -152,8 +153,8 @@ export class WrappedAsyncMongoCollection, + rundownId: RundownId +): IngestRundownStatus | null { + const rundown = cache.Rundowns.findOne(rundownId) + if (!rundown) return null + + const newDoc: IngestRundownStatus = { + _id: rundownId, + externalId: rundown.externalId, + + active: IngestRundownActiveStatus.INACTIVE, + + segments: [], + } + + const playlist = cache.Playlists.findOne({ + _id: rundown.playlistId, + activationId: { $exists: true }, + }) + + if (playlist) { + newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE + } + + // Find the most important part instance for each part + const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances) + + const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch() + for (const nrcsSegment of nrcsSegments) { + const nrcsParts = cache.NrcsIngestData.find({ + rundownId, + segmentId: nrcsSegment.segmentId, + type: NrcsIngestCacheType.PART, + }).fetch() + + newDoc.segments.push({ + externalId: nrcsSegment.data.externalId, + parts: _.compact( + nrcsParts.map((nrcsPart) => { + if (!nrcsPart.partId || !nrcsPart.segmentId) return null + + const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId }) + const partInstance = partInstanceMap.get(nrcsPart.partId) + + return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart) + }) + ), + }) + } + + return newDoc +} + +function findPartInstanceForEachPart( + playlist: Pick | undefined, + rundownId: RundownId, + partInstancesCache: ReadonlyDeep>> +) { + const partInstanceMap = new Map>() + if (!playlist) return partInstanceMap + + for (const partInstance of partInstancesCache.find({}).fetch()) { + if (partInstance.rundownId !== rundownId) continue + // Ignore the next partinstance + if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue + + // The current part instance is the most important + if (partInstance._id === playlist.currentPartInfo?.partInstanceId) { + partInstanceMap.set(partInstance.part._id, partInstance) + continue + } + + // Take the part with the highest takeCount + const existingEntry = partInstanceMap.get(partInstance.part._id) + if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) { + partInstanceMap.set(partInstance.part._id, partInstance) + } + } + + return partInstanceMap +} + +function createIngestPartStatus( + playlist: Pick | undefined, + partInstance: Pick | undefined, + part: Pick | undefined, + ingestPart: IngestPart +): IngestPartStatus { + // Determine the playback status from the PartInstance + let playbackStatus = IngestPartPlaybackStatus.UNKNOWN + if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) { + const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id + + if (isCurrentPartInstance) { + // If the current, it is playing + playbackStatus = IngestPartPlaybackStatus.PLAY + } else { + // If not the current, but has been played, it is stopped + playbackStatus = IngestPartPlaybackStatus.STOP + } + } + + // Determine the ready status from the PartInstance or Part + const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady + const itemsReady = partInstance ? partInstance.part.ingestNotifyItemsReady : part?.ingestNotifyItemsReady + + return { + externalId: ingestPart.externalId, + + isReady: isReady ?? null, + itemsReady: itemsReady ?? {}, + + playbackStatus, + } +} diff --git a/meteor/server/publications/ingestStatus/publication.ts b/meteor/server/publications/ingestStatus/publication.ts new file mode 100644 index 0000000000..59057b19fa --- /dev/null +++ b/meteor/server/publications/ingestStatus/publication.ts @@ -0,0 +1,195 @@ +import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { ReadonlyDeep } from 'type-fest' +import { + CustomPublishCollection, + meteorCustomPublish, + setUpCollectionOptimizedObserver, + SetupObserversResult, + TriggerUpdate, +} from '../../lib/customPublication' +import { logger } from '../../logging' +import { ContentCache, createReactiveContentCache } from './reactiveContentCache' +import { RundownsObserver } from '../lib/rundownsObserver' +import { RundownContentObserver } from './rundownContentObserver' +import { + PeripheralDevicePubSub, + PeripheralDevicePubSubCollectionsNames, +} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' +import { checkAccessAndGetPeripheralDevice } from '../../security/check' +import { check } from '../../lib/check' +import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' +import { protectString } from '@sofie-automation/corelib/dist/protectedString' +import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' +import { createIngestRundownStatus } from './createIngestRundownStatus' + +interface IngestRundownStatusArgs { + readonly deviceId: PeripheralDeviceId +} + +export interface IngestRundownStatusState { + contentCache: ReadonlyDeep +} + +interface IngestRundownStatusUpdateProps { + newCache: ContentCache + + invalidateRundownIds: RundownId[] + invalidatePlaylistIds: RundownPlaylistId[] +} + +async function setupIngestRundownStatusPublicationObservers( + args: ReadonlyDeep, + triggerUpdate: TriggerUpdate +): Promise { + const rundownsObserver = await RundownsObserver.createForPeripheralDevice(args.deviceId, async (rundownIds) => { + logger.silly(`Creating new RundownContentObserver`, rundownIds) + + // TODO - can this be done cheaper? + const cache = createReactiveContentCache(rundownIds) + + // Push update + triggerUpdate({ newCache: cache }) + + const contentObserver = await RundownContentObserver.create(rundownIds, cache) + + const innerQueries = [ + cache.Playlists.find({}).observeChanges( + { + added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }), + changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }), + removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }), + }, + { nonMutatingCallbacks: true } + ), + cache.Rundowns.find({}).observeChanges( + { + added: (docId) => { + triggerUpdate({ invalidateRundownIds: [protectString(docId)] }) + contentObserver.checkPlaylistIds() + }, + changed: (docId) => { + triggerUpdate({ invalidateRundownIds: [protectString(docId)] }) + contentObserver.checkPlaylistIds() + }, + removed: (docId) => { + triggerUpdate({ invalidateRundownIds: [protectString(docId)] }) + contentObserver.checkPlaylistIds() + }, + }, + { nonMutatingCallbacks: true } + ), + cache.Parts.find({}).observe({ + added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }), + changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }), + removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }), + }), + cache.PartInstances.find({}).observe({ + added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }), + changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }), + removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }), + }), + cache.NrcsIngestData.find({}).observe({ + added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }), + changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }), + removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }), + }), + ] + + return () => { + contentObserver.dispose() + + for (const query of innerQueries) { + query.stop() + } + } + }) + + // Set up observers: + return [rundownsObserver] +} + +async function manipulateIngestRundownStatusPublicationData( + _args: IngestRundownStatusArgs, + state: Partial, + collection: CustomPublishCollection, + updateProps: Partial> | undefined +): Promise { + // Prepare data for publication: + + if (updateProps?.newCache !== undefined) { + state.contentCache = updateProps.newCache ?? undefined + } + + if (!state.contentCache) { + // Remove all the notes + collection.remove(null) + + return + } + + const updateAll = !updateProps || !!updateProps?.newCache + if (updateAll) { + // Remove all the notes + collection.remove(null) + + const knownRundownIds = new Set(state.contentCache.RundownIds) + + for (const rundownId of knownRundownIds) { + const newDoc = createIngestRundownStatus(state.contentCache, rundownId) + if (newDoc) collection.replace(newDoc) + } + } else { + const regenerateForRundownIds = new Set(updateProps.invalidateRundownIds) + + // Include anything where the playlist has changed + if (updateProps.invalidatePlaylistIds && updateProps.invalidatePlaylistIds.length > 0) { + const rundownsToUpdate = state.contentCache.Rundowns.find( + { + playlistId: { $in: updateProps.invalidatePlaylistIds }, + }, + { + projection: { + _id: 1, + }, + } + ).fetch() as Pick[] + + for (const rundown of rundownsToUpdate) { + regenerateForRundownIds.add(rundown._id) + } + } + + for (const rundownId of regenerateForRundownIds) { + const newDoc = createIngestRundownStatus(state.contentCache, rundownId) + if (newDoc) { + collection.replace(newDoc) + } else { + collection.remove(rundownId) + } + } + } +} + +meteorCustomPublish( + PeripheralDevicePubSub.ingestDeviceRundownStatus, + PeripheralDevicePubSubCollectionsNames.ingestRundownStatus, + async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) { + check(deviceId, String) + + await checkAccessAndGetPeripheralDevice(deviceId, token, this) + + await setUpCollectionOptimizedObserver< + IngestRundownStatus, + IngestRundownStatusArgs, + IngestRundownStatusState, + IngestRundownStatusUpdateProps + >( + `pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`, + { deviceId }, + setupIngestRundownStatusPublicationObservers, + manipulateIngestRundownStatusPublicationData, + pub, + 100 + ) + } +) diff --git a/meteor/server/publications/ingestStatus/reactiveContentCache.ts b/meteor/server/publications/ingestStatus/reactiveContentCache.ts new file mode 100644 index 0000000000..9e2a026cea --- /dev/null +++ b/meteor/server/publications/ingestStatus/reactiveContentCache.ts @@ -0,0 +1,78 @@ +import type { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' +import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection' +import { literal } from '@sofie-automation/corelib/dist/lib' +import type { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo' +import type { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances' +import type { NrcsIngestDataCacheObj } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache' +import type { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import type { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' +import type { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' + +export type PlaylistFields = '_id' | 'activationId' | 'rehearsal' | 'currentPartInfo' | 'nextPartInfo' +export const playlistFieldSpecifier = literal>>({ + _id: 1, + activationId: 1, + rehearsal: 1, + currentPartInfo: 1, + nextPartInfo: 1, +}) + +export type RundownFields = '_id' | 'externalId' | 'playlistId' +export const rundownFieldSpecifier = literal>>({ + _id: 1, + externalId: 1, + playlistId: 1, +}) + +export type PartFields = + | '_id' + | 'rundownId' + | 'segmentId' + | 'externalId' + | 'shouldNotifyCurrentPlayingPart' + | 'ingestNotifyPartReady' + | 'ingestNotifyItemsReady' +export const partFieldSpecifier = literal>>({ + _id: 1, + rundownId: 1, + segmentId: 1, + externalId: 1, + shouldNotifyCurrentPlayingPart: 1, + ingestNotifyPartReady: 1, + ingestNotifyItemsReady: 1, +}) + +export type PartInstanceFields = '_id' | 'rundownId' | 'segmentId' | 'part' | 'takeCount' +export const partInstanceFieldSpecifier = literal< + MongoFieldSpecifierOnesStrict> +>({ + _id: 1, + rundownId: 1, + segmentId: 1, + part: 1, // This could be more granular, but it should be pretty stable + takeCount: 1, +}) + +export interface ContentCache { + RundownIds: RundownId[] + + Playlists: ReactiveCacheCollection> + Rundowns: ReactiveCacheCollection> + NrcsIngestData: ReactiveCacheCollection + Parts: ReactiveCacheCollection> + PartInstances: ReactiveCacheCollection> +} + +export function createReactiveContentCache(rundownIds: RundownId[]): ContentCache { + const cache: ContentCache = { + RundownIds: rundownIds, + + Playlists: new ReactiveCacheCollection>('playlists'), + Rundowns: new ReactiveCacheCollection>('rundowns'), + NrcsIngestData: new ReactiveCacheCollection('nrcsIngestData'), // TODO - is this needed? + Parts: new ReactiveCacheCollection>('parts'), + PartInstances: new ReactiveCacheCollection>('partInstances'), + } + + return cache +} diff --git a/meteor/server/publications/ingestStatus/rundownContentObserver.ts b/meteor/server/publications/ingestStatus/rundownContentObserver.ts new file mode 100644 index 0000000000..2aa048be36 --- /dev/null +++ b/meteor/server/publications/ingestStatus/rundownContentObserver.ts @@ -0,0 +1,150 @@ +import { Meteor } from 'meteor/meteor' +import { RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { logger } from '../../logging' +import { + ContentCache, + partFieldSpecifier, + partInstanceFieldSpecifier, + playlistFieldSpecifier, + rundownFieldSpecifier, + // segmentFieldSpecifier, +} from './reactiveContentCache' +import { NrcsIngestDataCache, PartInstances, Parts, RundownPlaylists, Rundowns } from '../../collections' +import { waitForAllObserversReady } from '../lib/lib' +import _ from 'underscore' +import { ReactiveMongoObserverGroup, ReactiveMongoObserverGroupHandle } from '../lib/observerGroup' +import { equivalentArrays } from '@sofie-automation/shared-lib/dist/lib/lib' + +const REACTIVITY_DEBOUNCE = 20 + +export class RundownContentObserver { + #observers: Meteor.LiveQueryHandle[] = [] + readonly #cache: ContentCache + + #playlistIds: RundownPlaylistId[] = [] + #playlistIdObserver!: ReactiveMongoObserverGroupHandle + + #disposed = false + + private constructor(cache: ContentCache) { + this.#cache = cache + } + + static async create(rundownIds: RundownId[], cache: ContentCache): Promise { + logger.silly(`Creating RundownContentObserver for rundowns "${rundownIds.join(',')}"`) + + const observer = new RundownContentObserver(cache) + + observer.#playlistIdObserver = await ReactiveMongoObserverGroup(async () => { + // Clear already cached data + cache.Playlists.remove({}) + + return [ + RundownPlaylists.observe( + { + // We can use the `this.#playlistIds` here, as this is restarted every time that property changes + _id: { $in: observer.#playlistIds }, + }, + { + added: (doc) => { + cache.Playlists.upsert(doc._id, doc) + }, + changed: (doc) => { + cache.Playlists.upsert(doc._id, doc) + }, + removed: (doc) => { + cache.Playlists.remove(doc._id) + }, + }, + { + projection: playlistFieldSpecifier, + } + ), + ] + }) + + observer.#observers = await waitForAllObserversReady([ + Rundowns.observeChanges( + { + _id: { + $in: rundownIds, + }, + }, + cache.Rundowns.link(), + { + projection: rundownFieldSpecifier, + }, + { + nonMutatingCallbacks: true, + } + ), + Parts.observeChanges( + { + rundownId: { + $in: rundownIds, + }, + }, + cache.Parts.link(), + { + projection: partFieldSpecifier, + }, + { + nonMutatingCallbacks: true, + } + ), + PartInstances.observeChanges( + { + rundownId: { $in: rundownIds }, + reset: { $ne: true }, + orphaned: { $exists: false }, + }, + cache.PartInstances.link(), + { fields: partInstanceFieldSpecifier }, + { + nonMutatingCallbacks: true, + } + ), + NrcsIngestDataCache.observeChanges( + { + rundownId: { + $in: rundownIds, + }, + }, + cache.NrcsIngestData.link(), + {}, + { + nonMutatingCallbacks: true, + } + ), + + observer.#playlistIdObserver, + ]) + + return observer + } + + public checkPlaylistIds = _.debounce( + Meteor.bindEnvironment(() => { + if (this.#disposed) return + + const playlistIds = Array.from(new Set(this.#cache.Rundowns.find({}).map((rundown) => rundown.playlistId))) + + if (!equivalentArrays(playlistIds, this.#playlistIds)) { + this.#playlistIds = playlistIds + // trigger the playlist group to restart + this.#playlistIdObserver.restart() + } + }), + REACTIVITY_DEBOUNCE + ) + + public get cache(): ContentCache { + return this.#cache + } + + public dispose = (): void => { + this.#disposed = true + + this.#observers.forEach((observer) => observer.stop()) + } +} diff --git a/meteor/server/publications/lib/__tests__/rundownsObserver.test.ts b/meteor/server/publications/lib/__tests__/rundownsObserver.test.ts index ffeb44577b..06760d6c94 100644 --- a/meteor/server/publications/lib/__tests__/rundownsObserver.test.ts +++ b/meteor/server/publications/lib/__tests__/rundownsObserver.test.ts @@ -1,4 +1,9 @@ -import { RundownId, RundownPlaylistId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { + PeripheralDeviceId, + RundownId, + RundownPlaylistId, + StudioId, +} from '@sofie-automation/corelib/dist/dataModel/Ids' import { protectString } from '@sofie-automation/corelib/dist/protectedString' import { Rundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' import { Rundowns } from '../../../collections' @@ -25,7 +30,7 @@ describe('RundownsObserver', () => { // should not be any observers yet expect(RundownsMock.observers).toHaveLength(0) - const observer = await RundownsObserver.create(studioId, playlistId, onChanged) + const observer = await RundownsObserver.createForPlaylist(studioId, playlistId, onChanged) try { // should now be an observer expect(RundownsMock.observers).toHaveLength(1) @@ -78,7 +83,7 @@ describe('RundownsObserver', () => { // should not be any observers yet expect(RundownsMock.observers).toHaveLength(0) - const observer = await RundownsObserver.create(studioId, playlistId, onChanged) + const observer = await RundownsObserver.createForPlaylist(studioId, playlistId, onChanged) try { // ensure starts correct await waitUntil(async () => { @@ -132,7 +137,7 @@ describe('RundownsObserver', () => { // should not be any observers yet expect(RundownsMock.observers).toHaveLength(0) - const observer = await RundownsObserver.create(studioId, playlistId, onChanged) + const observer = await RundownsObserver.createForPlaylist(studioId, playlistId, onChanged) try { // ensure starts correct // ensure starts correct @@ -186,7 +191,7 @@ describe('RundownsObserver', () => { // should not be any observers yet expect(RundownsMock.observers).toHaveLength(0) - const observer = await RundownsObserver.create(studioId, playlistId, onChanged) + const observer = await RundownsObserver.createForPlaylist(studioId, playlistId, onChanged) try { // ensure starts correct // ensure starts correct @@ -263,4 +268,56 @@ describe('RundownsObserver', () => { observer.stop() } }) + + test('create and destroy observer - for peripheraldevice', async () => { + const deviceId = protectString('device0') + + const onChangedCleanup = jest.fn() + const onChanged = jest.fn(async () => onChangedCleanup) + + // should not be any observers yet + expect(RundownsMock.observers).toHaveLength(0) + + const observer = await RundownsObserver.createForPeripheralDevice(deviceId, onChanged) + try { + // should now be an observer + expect(RundownsMock.observers).toHaveLength(1) + + // Before debounce + expect(onChanged).toHaveBeenCalledTimes(0) + + // After debounce + await waitUntil(async () => { + // Run timers, so that promises in the observer has a chance to resolve: + await runAllTimers() + expect(onChanged).toHaveBeenCalledTimes(1) + expect(onChangedCleanup).toHaveBeenCalledTimes(0) + }, MAX_WAIT_TIME) + + // still got an observer + expect(RundownsMock.observers).toHaveLength(1) + + // get the mock observer, and ensure to looks sane + expect(RundownsMock.observers).toHaveLength(1) + const mockObserver = RundownsMock.observers[0] + expect(mockObserver).toBeTruthy() + expect(mockObserver.callbacksChanges).toBeFalsy() + expect(mockObserver.callbacksObserve).toBeTruthy() + expect(mockObserver.callbacksObserve?.added).toBeTruthy() + expect(mockObserver.callbacksObserve?.changed).toBeTruthy() + expect(mockObserver.callbacksObserve?.removed).toBeTruthy() + expect(mockObserver.query).toEqual({ + 'source.peripheralDeviceId': 'device0', + 'source.type': 'nrcs', + }) + } finally { + // Make sure to cleanup + observer.stop() + + // Check it stopped + expect(onChanged).toHaveBeenCalledTimes(1) + expect(onChangedCleanup).toHaveBeenCalledTimes(1) + expect(RundownsMock.observers).toHaveLength(0) + } + }) }) diff --git a/meteor/server/publications/lib/rundownsObserver.ts b/meteor/server/publications/lib/rundownsObserver.ts index 7e87546a84..2c5efea890 100644 --- a/meteor/server/publications/lib/rundownsObserver.ts +++ b/meteor/server/publications/lib/rundownsObserver.ts @@ -1,7 +1,14 @@ import { Meteor } from 'meteor/meteor' -import { RundownId, RundownPlaylistId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import type { + PeripheralDeviceId, + RundownId, + RundownPlaylistId, + StudioId, +} from '@sofie-automation/corelib/dist/dataModel/Ids' import { Rundowns } from '../../collections' import { PromiseDebounce } from './PromiseDebounce' +import type { MongoQuery } from '@sofie-automation/corelib/dist/mongo' +import type { Rundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' const REACTIVITY_DEBOUNCE = 20 @@ -34,24 +41,39 @@ export class RundownsObserver implements Meteor.LiveQueryHandle { this.#changed = onChanged } - static async create( + static async createForPlaylist( studioId: StudioId, playlistId: RundownPlaylistId, onChanged: ChangedHandler ): Promise { const observer = new RundownsObserver(onChanged) - await observer.init(studioId, playlistId) + await observer.init({ + playlistId, + studioId, + }) return observer } - private async init(studioId: StudioId, playlistId: RundownPlaylistId) { + static async createForPeripheralDevice( + // studioId: StudioId, // TODO - this? + deviceId: PeripheralDeviceId, + onChanged: ChangedHandler + ): Promise { + const observer = new RundownsObserver(onChanged) + + await observer.init({ + 'source.type': 'nrcs', + 'source.peripheralDeviceId': deviceId, + }) + + return observer + } + + private async init(query: MongoQuery) { this.#rundownsLiveQuery = await Rundowns.observe( - { - playlistId, - studioId, - }, + query, { added: (doc) => { this.#rundownIds.add(doc._id) diff --git a/meteor/server/publications/partInstancesUI/publication.ts b/meteor/server/publications/partInstancesUI/publication.ts index ede1616f19..318b088f2a 100644 --- a/meteor/server/publications/partInstancesUI/publication.ts +++ b/meteor/server/publications/partInstancesUI/publication.ts @@ -66,60 +66,64 @@ async function setupUIPartInstancesPublicationObservers( )) as Pick | undefined if (!playlist) throw new Error(`RundownPlaylist with activationId="${args.playlistActivationId}" not found!`) - const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => { - logger.silly(`Creating new RundownContentObserver`) - - const cache = createReactiveContentCache() - - // Push update - triggerUpdate({ newCache: cache }) - - const obs1 = await RundownContentObserver.create( - playlist.studioId, - args.playlistActivationId, - rundownIds, - cache - ) + const rundownsObserver = await RundownsObserver.createForPlaylist( + playlist.studioId, + playlist._id, + async (rundownIds) => { + logger.silly(`Creating new RundownContentObserver`) + + const cache = createReactiveContentCache() + + // Push update + triggerUpdate({ newCache: cache }) + + const obs1 = await RundownContentObserver.create( + playlist.studioId, + args.playlistActivationId, + rundownIds, + cache + ) - const innerQueries = [ - cache.Segments.find({}).observeChanges({ - added: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - }), - cache.PartInstances.find({}).observe({ - added: (doc) => triggerUpdate({ invalidatePartInstanceIds: [doc._id] }), - changed: (doc, oldDoc) => { - if (doc.part._rank !== oldDoc.part._rank) { - // with part rank change we need to invalidate the entire segment, - // as the order may affect which unchanged parts are/aren't in quickLoop - triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }) - } else { - triggerUpdate({ invalidatePartInstanceIds: [doc._id] }) - } - }, - removed: (doc) => triggerUpdate({ invalidatePartInstanceIds: [doc._id] }), - }), - cache.RundownPlaylists.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateQuickLoop: true }), - changed: () => triggerUpdate({ invalidateQuickLoop: true }), - removed: () => triggerUpdate({ invalidateQuickLoop: true }), - }), - cache.StudioSettings.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateQuickLoop: true }), - changed: () => triggerUpdate({ invalidateQuickLoop: true }), - removed: () => triggerUpdate({ invalidateQuickLoop: true }), - }), - ] - - return () => { - obs1.dispose() - - for (const query of innerQueries) { - query.stop() + const innerQueries = [ + cache.Segments.find({}).observeChanges({ + added: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + }), + cache.PartInstances.find({}).observe({ + added: (doc) => triggerUpdate({ invalidatePartInstanceIds: [doc._id] }), + changed: (doc, oldDoc) => { + if (doc.part._rank !== oldDoc.part._rank) { + // with part rank change we need to invalidate the entire segment, + // as the order may affect which unchanged parts are/aren't in quickLoop + triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }) + } else { + triggerUpdate({ invalidatePartInstanceIds: [doc._id] }) + } + }, + removed: (doc) => triggerUpdate({ invalidatePartInstanceIds: [doc._id] }), + }), + cache.RundownPlaylists.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateQuickLoop: true }), + changed: () => triggerUpdate({ invalidateQuickLoop: true }), + removed: () => triggerUpdate({ invalidateQuickLoop: true }), + }), + cache.StudioSettings.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateQuickLoop: true }), + changed: () => triggerUpdate({ invalidateQuickLoop: true }), + removed: () => triggerUpdate({ invalidateQuickLoop: true }), + }), + ] + + return () => { + obs1.dispose() + + for (const query of innerQueries) { + query.stop() + } } } - }) + ) // Set up observers: return [rundownsObserver] diff --git a/meteor/server/publications/partsUI/publication.ts b/meteor/server/publications/partsUI/publication.ts index 24460ab13c..6776e6af31 100644 --- a/meteor/server/publications/partsUI/publication.ts +++ b/meteor/server/publications/partsUI/publication.ts @@ -57,55 +57,59 @@ async function setupUIPartsPublicationObservers( })) as Pick | undefined if (!playlist) throw new Error(`RundownPlaylist "${args.playlistId}" not found!`) - const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => { - logger.silly(`Creating new RundownContentObserver`) - - const cache = createReactiveContentCache() - - // Push update - triggerUpdate({ newCache: cache }) - - const obs1 = await RundownContentObserver.create(playlist.studioId, playlist._id, rundownIds, cache) - - const innerQueries = [ - cache.Segments.find({}).observeChanges({ - added: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - }), - cache.Parts.find({}).observe({ - added: (doc) => triggerUpdate({ invalidatePartIds: [doc._id] }), - changed: (doc, oldDoc) => { - if (doc._rank !== oldDoc._rank) { - // with part rank change we need to invalidate the entire segment, - // as the order may affect which unchanged parts are/aren't in quickLoop - triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }) - } else { - triggerUpdate({ invalidatePartIds: [doc._id] }) - } - }, - removed: (doc) => triggerUpdate({ invalidatePartIds: [doc._id] }), - }), - cache.RundownPlaylists.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateQuickLoop: true }), - changed: () => triggerUpdate({ invalidateQuickLoop: true }), - removed: () => triggerUpdate({ invalidateQuickLoop: true }), - }), - cache.StudioSettings.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateQuickLoop: true }), - changed: () => triggerUpdate({ invalidateQuickLoop: true }), - removed: () => triggerUpdate({ invalidateQuickLoop: true }), - }), - ] - - return () => { - obs1.dispose() - - for (const query of innerQueries) { - query.stop() + const rundownsObserver = await RundownsObserver.createForPlaylist( + playlist.studioId, + playlist._id, + async (rundownIds) => { + logger.silly(`Creating new RundownContentObserver`) + + const cache = createReactiveContentCache() + + // Push update + triggerUpdate({ newCache: cache }) + + const obs1 = await RundownContentObserver.create(playlist.studioId, playlist._id, rundownIds, cache) + + const innerQueries = [ + cache.Segments.find({}).observeChanges({ + added: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + }), + cache.Parts.find({}).observe({ + added: (doc) => triggerUpdate({ invalidatePartIds: [doc._id] }), + changed: (doc, oldDoc) => { + if (doc._rank !== oldDoc._rank) { + // with part rank change we need to invalidate the entire segment, + // as the order may affect which unchanged parts are/aren't in quickLoop + triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }) + } else { + triggerUpdate({ invalidatePartIds: [doc._id] }) + } + }, + removed: (doc) => triggerUpdate({ invalidatePartIds: [doc._id] }), + }), + cache.RundownPlaylists.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateQuickLoop: true }), + changed: () => triggerUpdate({ invalidateQuickLoop: true }), + removed: () => triggerUpdate({ invalidateQuickLoop: true }), + }), + cache.StudioSettings.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateQuickLoop: true }), + changed: () => triggerUpdate({ invalidateQuickLoop: true }), + removed: () => triggerUpdate({ invalidateQuickLoop: true }), + }), + ] + + return () => { + obs1.dispose() + + for (const query of innerQueries) { + query.stop() + } } } - }) + ) // Set up observers: return [rundownsObserver] diff --git a/meteor/server/publications/pieceContentStatusUI/rundown/publication.ts b/meteor/server/publications/pieceContentStatusUI/rundown/publication.ts index 0103802d91..6f78d87ed2 100644 --- a/meteor/server/publications/pieceContentStatusUI/rundown/publication.ts +++ b/meteor/server/publications/pieceContentStatusUI/rundown/publication.ts @@ -125,86 +125,90 @@ async function setupUIPieceContentStatusesPublicationObservers( })) as Pick | undefined if (!playlist) throw new Error(`RundownPlaylist "${args.rundownPlaylistId}" not found!`) - const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => { - logger.silly(`Creating new RundownContentObserver`) - - // TODO - can this be done cheaper? - const contentCache = createReactiveContentCache() - triggerUpdate({ newCache: contentCache }) - - const obs1 = await RundownContentObserver.create(rundownIds, contentCache) - - const innerQueries = [ - contentCache.Segments.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedSegmentIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedSegmentIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedSegmentIds: [protectString(id)] }), - }), - contentCache.Parts.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedPartIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedPartIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedPartIds: [protectString(id)] }), - }), - contentCache.Pieces.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedPieceIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedPieceIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedPieceIds: [protectString(id)] }), - }), - contentCache.PartInstances.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedPartInstanceIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedPartInstanceIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedPartInstanceIds: [protectString(id)] }), - }), - contentCache.PieceInstances.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedPieceInstanceIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedPieceInstanceIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedPieceInstanceIds: [protectString(id)] }), - }), - contentCache.AdLibPieces.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedAdlibPieceIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedAdlibPieceIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedAdlibPieceIds: [protectString(id)] }), - }), - contentCache.AdLibActions.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedAdlibActionIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedAdlibActionIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedAdlibActionIds: [protectString(id)] }), - }), - contentCache.BaselineAdLibPieces.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedBaselineAdlibPieceIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedBaselineAdlibPieceIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedBaselineAdlibPieceIds: [protectString(id)] }), - }), - contentCache.BaselineAdLibActions.find({}).observeChanges({ - added: (id) => triggerUpdate({ updatedBaselineAdlibActionIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ updatedBaselineAdlibActionIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ updatedBaselineAdlibActionIds: [protectString(id)] }), - }), - contentCache.Rundowns.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateAll: true }), - changed: () => triggerUpdate({ invalidateAll: true }), - removed: () => triggerUpdate({ invalidateAll: true }), - }), - contentCache.Blueprints.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateAll: true }), - changed: () => triggerUpdate({ invalidateAll: true }), - removed: () => triggerUpdate({ invalidateAll: true }), - }), - contentCache.ShowStyleSourceLayers.find({}).observeChanges({ - added: () => triggerUpdate({ invalidateAll: true }), - changed: () => triggerUpdate({ invalidateAll: true }), - removed: () => triggerUpdate({ invalidateAll: true }), - }), - ] - - return () => { - obs1.dispose() - - for (const query of innerQueries) { - query.stop() + const rundownsObserver = await RundownsObserver.createForPlaylist( + playlist.studioId, + playlist._id, + async (rundownIds) => { + logger.silly(`Creating new RundownContentObserver`) + + // TODO - can this be done cheaper? + const contentCache = createReactiveContentCache() + triggerUpdate({ newCache: contentCache }) + + const obs1 = await RundownContentObserver.create(rundownIds, contentCache) + + const innerQueries = [ + contentCache.Segments.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedSegmentIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedSegmentIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedSegmentIds: [protectString(id)] }), + }), + contentCache.Parts.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedPartIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedPartIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedPartIds: [protectString(id)] }), + }), + contentCache.Pieces.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedPieceIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedPieceIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedPieceIds: [protectString(id)] }), + }), + contentCache.PartInstances.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedPartInstanceIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedPartInstanceIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedPartInstanceIds: [protectString(id)] }), + }), + contentCache.PieceInstances.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedPieceInstanceIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedPieceInstanceIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedPieceInstanceIds: [protectString(id)] }), + }), + contentCache.AdLibPieces.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedAdlibPieceIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedAdlibPieceIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedAdlibPieceIds: [protectString(id)] }), + }), + contentCache.AdLibActions.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedAdlibActionIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedAdlibActionIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedAdlibActionIds: [protectString(id)] }), + }), + contentCache.BaselineAdLibPieces.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedBaselineAdlibPieceIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedBaselineAdlibPieceIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedBaselineAdlibPieceIds: [protectString(id)] }), + }), + contentCache.BaselineAdLibActions.find({}).observeChanges({ + added: (id) => triggerUpdate({ updatedBaselineAdlibActionIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ updatedBaselineAdlibActionIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ updatedBaselineAdlibActionIds: [protectString(id)] }), + }), + contentCache.Rundowns.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateAll: true }), + changed: () => triggerUpdate({ invalidateAll: true }), + removed: () => triggerUpdate({ invalidateAll: true }), + }), + contentCache.Blueprints.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateAll: true }), + changed: () => triggerUpdate({ invalidateAll: true }), + removed: () => triggerUpdate({ invalidateAll: true }), + }), + contentCache.ShowStyleSourceLayers.find({}).observeChanges({ + added: () => triggerUpdate({ invalidateAll: true }), + changed: () => triggerUpdate({ invalidateAll: true }), + removed: () => triggerUpdate({ invalidateAll: true }), + }), + ] + + return () => { + obs1.dispose() + + for (const query of innerQueries) { + query.stop() + } } } - }) + ) // Set up observers: return [ diff --git a/meteor/server/publications/segmentPartNotesUI/publication.ts b/meteor/server/publications/segmentPartNotesUI/publication.ts index d01a55c66a..05d4d86a3e 100644 --- a/meteor/server/publications/segmentPartNotesUI/publication.ts +++ b/meteor/server/publications/segmentPartNotesUI/publication.ts @@ -64,48 +64,54 @@ async function setupUISegmentPartNotesPublicationObservers( })) as Pick | undefined if (!playlist) throw new Error(`RundownPlaylist "${args.playlistId}" not found!`) - const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => { - logger.silly(`Creating new RundownContentObserver`) - - // TODO - can this be done cheaper? - const cache = createReactiveContentCache() - - // Push update - triggerUpdate({ newCache: cache }) - - const obs1 = await RundownContentObserver.create(rundownIds, cache) - - const innerQueries = [ - cache.Segments.find({}).observeChanges({ - added: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), - }), - cache.Parts.find({}).observe({ - added: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), - changed: (doc, oldDoc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId, oldDoc.segmentId] }), - removed: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), - }), - cache.DeletedPartInstances.find({}).observe({ - added: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), - changed: (doc, oldDoc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId, oldDoc.segmentId] }), - removed: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), - }), - cache.Rundowns.find({}).observeChanges({ - added: (id) => triggerUpdate({ invalidateRundownIds: [protectString(id)] }), - changed: (id) => triggerUpdate({ invalidateRundownIds: [protectString(id)] }), - removed: (id) => triggerUpdate({ invalidateRundownIds: [protectString(id)] }), - }), - ] - - return () => { - obs1.dispose() - - for (const query of innerQueries) { - query.stop() + const rundownsObserver = await RundownsObserver.createForPlaylist( + playlist.studioId, + playlist._id, + async (rundownIds) => { + logger.silly(`Creating new RundownContentObserver`) + + // TODO - can this be done cheaper? + const cache = createReactiveContentCache() + + // Push update + triggerUpdate({ newCache: cache }) + + const obs1 = await RundownContentObserver.create(rundownIds, cache) + + const innerQueries = [ + cache.Segments.find({}).observeChanges({ + added: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ invalidateSegmentIds: [protectString(id)] }), + }), + cache.Parts.find({}).observe({ + added: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), + changed: (doc, oldDoc) => + triggerUpdate({ invalidateSegmentIds: [doc.segmentId, oldDoc.segmentId] }), + removed: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), + }), + cache.DeletedPartInstances.find({}).observe({ + added: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), + changed: (doc, oldDoc) => + triggerUpdate({ invalidateSegmentIds: [doc.segmentId, oldDoc.segmentId] }), + removed: (doc) => triggerUpdate({ invalidateSegmentIds: [doc.segmentId] }), + }), + cache.Rundowns.find({}).observeChanges({ + added: (id) => triggerUpdate({ invalidateRundownIds: [protectString(id)] }), + changed: (id) => triggerUpdate({ invalidateRundownIds: [protectString(id)] }), + removed: (id) => triggerUpdate({ invalidateRundownIds: [protectString(id)] }), + }), + ] + + return () => { + obs1.dispose() + + for (const query of innerQueries) { + query.stop() + } } } - }) + ) // Set up observers: return [rundownsObserver] diff --git a/packages/blueprints-integration/src/documents/part.ts b/packages/blueprints-integration/src/documents/part.ts index 31b030869b..3981536c44 100644 --- a/packages/blueprints-integration/src/documents/part.ts +++ b/packages/blueprints-integration/src/documents/part.ts @@ -61,6 +61,12 @@ export interface IBlueprintMutatablePart + /** Classes to set on the TimelineGroupObj for this part */ classes?: string[] /** Classes to set on the TimelineGroupObj for the following part */ diff --git a/packages/corelib/src/dataModel/Rundown.ts b/packages/corelib/src/dataModel/Rundown.ts index 2359297f15..4801083b0b 100644 --- a/packages/corelib/src/dataModel/Rundown.ts +++ b/packages/corelib/src/dataModel/Rundown.ts @@ -57,9 +57,6 @@ export interface Rundown { */ orphaned?: RundownOrphanedReason - /** Last sent storyStatus to ingestDevice (MOS) */ - notifiedCurrentPlayingPartExternalId?: string - /** Holds notes (warnings / errors) thrown by the blueprints during creation */ notes?: Array diff --git a/packages/corelib/src/worker/events.ts b/packages/corelib/src/worker/events.ts index 84ad373958..33ddf4cbf2 100644 --- a/packages/corelib/src/worker/events.ts +++ b/packages/corelib/src/worker/events.ts @@ -3,7 +3,6 @@ import { PartInstanceId, RundownId, RundownPlaylistId, StudioId } from '../dataM export enum EventsJobs { PartInstanceTimings = 'partInstanceTimings', RundownDataChanged = 'rundownDataChanged', - NotifyCurrentlyPlayingPart = 'notifyCurrentlyPlayingPart', } export interface PartInstanceTimingsProps { @@ -16,12 +15,6 @@ export interface RundownDataChangedProps { rundownId: RundownId } -export interface NotifyCurrentlyPlayingPartProps { - rundownId: RundownId - isRehearsal: boolean - partExternalId: string | null -} - /** * Set of valid functions, of form: * `id: (data) => return` @@ -29,7 +22,6 @@ export interface NotifyCurrentlyPlayingPartProps { export type EventsJobFunc = { [EventsJobs.PartInstanceTimings]: (data: PartInstanceTimingsProps) => void [EventsJobs.RundownDataChanged]: (data: RundownDataChangedProps) => void - [EventsJobs.NotifyCurrentlyPlayingPart]: (data: NotifyCurrentlyPlayingPartProps) => void } export function getEventsQueueName(id: StudioId): string { diff --git a/packages/job-worker/src/blueprints/context/lib.ts b/packages/job-worker/src/blueprints/context/lib.ts index 8d3c68708d..a7edc7b58a 100644 --- a/packages/job-worker/src/blueprints/context/lib.ts +++ b/packages/job-worker/src/blueprints/context/lib.ts @@ -119,6 +119,8 @@ export const PlayoutMutatablePartSampleKeys = allKeysOfObject): IBlueprintP expectedDuration: part.expectedDuration, holdMode: part.holdMode, shouldNotifyCurrentPlayingPart: part.shouldNotifyCurrentPlayingPart, + ingestNotifyPartReady: part.ingestNotifyPartReady, + ingestNotifyItemsReady: clone(part.ingestNotifyItemsReady), classes: clone(part.classes), classesForNext: clone(part.classesForNext), displayDurationGroup: part.displayDurationGroup, diff --git a/packages/job-worker/src/events/handle.ts b/packages/job-worker/src/events/handle.ts index e195e1641e..0ac22dd80d 100644 --- a/packages/job-worker/src/events/handle.ts +++ b/packages/job-worker/src/events/handle.ts @@ -1,8 +1,4 @@ -import { - NotifyCurrentlyPlayingPartProps, - PartInstanceTimingsProps, - RundownDataChangedProps, -} from '@sofie-automation/corelib/dist/worker/events' +import { PartInstanceTimingsProps, RundownDataChangedProps } from '@sofie-automation/corelib/dist/worker/events' import { getCurrentTime } from '../lib' import { JobContext } from '../jobs' import { logger } from '../logging' @@ -17,16 +13,7 @@ import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyE import { ExternalMessageQueueObj } from '@sofie-automation/corelib/dist/dataModel/ExternalMessageQueue' import { ICollection, MongoModifier } from '../db' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' -import { ExternalMessageQueueObjId, PeripheralDeviceId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { runWithRundownLock } from '../ingest/lock' -import { - PeripheralDevice, - PeripheralDeviceCategory, - PeripheralDeviceType, -} from '@sofie-automation/corelib/dist/dataModel/PeripheralDevice' -import { MOS } from '@sofie-automation/corelib' -import { executePeripheralDeviceFunction } from '../peripheralDevice' -import { DEFAULT_MOS_TIMEOUT_TIME } from '@sofie-automation/shared-lib/dist/core/constants' +import { ExternalMessageQueueObjId } from '@sofie-automation/corelib/dist/dataModel/Ids' async function getBlueprintAndDependencies(context: JobContext, rundown: ReadonlyDeep) { const pShowStyle = context.getShowStyleCompound(rundown.showStyleVariantId, rundown.showStyleBaseId) @@ -226,120 +213,3 @@ export async function handleRundownDataHasChanged(context: JobContext, data: Run logger.error(`Error in showStyleBlueprint.onRundownDataChangedEvent: ${stringifyError(err)}`) } } - -export async function handleNotifyCurrentlyPlayingPart( - context: JobContext, - data: NotifyCurrentlyPlayingPartProps -): Promise { - const rundown = await context.directCollections.Rundowns.findOne(data.rundownId) - if (!rundown) { - logger.warn(`Rundown "${data.rundownId} is missing. Skipping notifyCurrentPlayingPart`) - return - } - - if (rundown.source.type !== 'nrcs') { - logger.warn(`Rundown "${rundown._id} has no peripheralDevice. Skipping notifyCurrentPlayingPart`) - return - } - - const device = await context.directCollections.PeripheralDevices.findOne({ - _id: rundown.source.peripheralDeviceId, - // Future: we really should be constraining this to the studio, but that is often only defined on the parent of this device - // studioId: context.studioId, - parentDeviceId: { $exists: true }, - }) - if (!device || !device.parentDeviceId) { - logger.warn( - `PeripheralDevice "${rundown.source.peripheralDeviceId}" for Rundown "${rundown._id} not found. Skipping notifyCurrentPlayingPart` - ) - return - } - const parentDevice = await context.directCollections.PeripheralDevices.findOne({ - _id: device.parentDeviceId, - 'studioAndConfigId.studioId': context.studioId, - parentDeviceId: { $exists: false }, - }) - if (!parentDevice) { - logger.warn( - `PeripheralDevice "${rundown.source.peripheralDeviceId}" for Rundown "${rundown._id} not found. Skipping notifyCurrentPlayingPart` - ) - return - } - - const previousPlayingPartExternalId: string | null = rundown.notifiedCurrentPlayingPartExternalId || null - const currentPlayingPartExternalId: string | null = data.isRehearsal ? null : data.partExternalId - - // Lock the rundown so that we are allowed to write to it - // This is technically a bit of a race condition, but is really low risk and low impact if it does - await runWithRundownLock(context, rundown._id, async (rundown0) => { - if (rundown0) { - if (currentPlayingPartExternalId) { - await context.directCollections.Rundowns.update(rundown._id, { - $set: { - notifiedCurrentPlayingPartExternalId: currentPlayingPartExternalId, - }, - }) - } else { - await context.directCollections.Rundowns.update(rundown._id, { - $unset: { - notifiedCurrentPlayingPartExternalId: 1, - }, - }) - } - } - }) - - // TODO: refactor this to be non-mos centric - if (device.category === PeripheralDeviceCategory.INGEST && device.type === PeripheralDeviceType.MOS) { - // Note: rundown may not be up to date anymore - await notifyCurrentPlayingPartMOS( - context, - device, - rundown.externalId, - previousPlayingPartExternalId, - currentPlayingPartExternalId - ) - } -} - -async function notifyCurrentPlayingPartMOS( - context: JobContext, - peripheralDevice: PeripheralDevice, - rundownExternalId: string, - oldPlayingPartExternalId: string | null, - newPlayingPartExternalId: string | null -): Promise { - if (oldPlayingPartExternalId !== newPlayingPartExternalId) { - // New implementation 2022 only sends PLAY, never stop, after getting advice from AP - // Reason 1: NRK ENPS "sendt tid" (elapsed time) stopped working in ENPS 8/9 when doing STOP prior to PLAY - // Reason 2: there's a delay between the STOP (yellow line disappears) and PLAY (yellow line re-appears), which annoys the users - if (newPlayingPartExternalId) { - try { - await setStoryStatusMOS( - context, - peripheralDevice._id, - rundownExternalId, - newPlayingPartExternalId, - MOS.IMOSObjectStatus.PLAY - ) - } catch (error) { - logger.error(`Error in setStoryStatus PLAY: ${stringifyError(error)}`) - } - } - } -} - -async function setStoryStatusMOS( - context: JobContext, - deviceId: PeripheralDeviceId, - rundownExternalId: string, - storyId: string, - status: MOS.IMOSObjectStatus -): Promise { - logger.debug('setStoryStatus', { deviceId, externalId: rundownExternalId, storyId, status }) - return executePeripheralDeviceFunction(context, deviceId, DEFAULT_MOS_TIMEOUT_TIME + 1000, 'setStoryStatus', [ - rundownExternalId, - storyId, - status, - ]) -} diff --git a/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts b/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts index 15f33f777d..8f19c87309 100644 --- a/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts +++ b/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts @@ -462,7 +462,6 @@ export class IngestModelImpl implements IngestModel, DatabasePersistedModel { // owned by elsewhere airStatus: this.#rundownImpl?.airStatus, status: this.#rundownImpl?.status, - notifiedCurrentPlayingPartExternalId: this.#rundownImpl?.notifiedCurrentPlayingPartExternalId, }) deleteAllUndefinedProperties(newRundown) diff --git a/packages/job-worker/src/playout/activePlaylistActions.ts b/packages/job-worker/src/playout/activePlaylistActions.ts index a1594b3ec9..28dde7ec88 100644 --- a/packages/job-worker/src/playout/activePlaylistActions.ts +++ b/packages/job-worker/src/playout/activePlaylistActions.ts @@ -145,8 +145,6 @@ export async function deactivateRundownPlaylistInner( let rundown: ReadonlyDeep | undefined if (currentPartInstance) { rundown = playoutModel.getRundown(currentPartInstance.partInstance.rundownId)?.rundown - - playoutModel.queueNotifyCurrentlyPlayingPartEvent(currentPartInstance.partInstance.rundownId, null) } else if (nextPartInstance) { rundown = playoutModel.getRundown(nextPartInstance.partInstance.rundownId)?.rundown } diff --git a/packages/job-worker/src/playout/model/PlayoutModel.ts b/packages/job-worker/src/playout/model/PlayoutModel.ts index f93bae234d..43a9c0f653 100644 --- a/packages/job-worker/src/playout/model/PlayoutModel.ts +++ b/packages/job-worker/src/playout/model/PlayoutModel.ts @@ -287,13 +287,6 @@ export interface PlayoutModel extends PlayoutModelReadonly, StudioPlayoutModelBa */ queuePartInstanceTimingEvent(partInstanceId: PartInstanceId): void - /** - * Queue a `NotifyCurrentlyPlayingPart` operation to be performed upon completion of this Playout operation - * @param rundownId The Rundown to report the notification to - * @param partInstance The PartInstance the event is in relation to - */ - queueNotifyCurrentlyPlayingPartEvent(rundownId: RundownId, partInstance: PlayoutPartInstanceModel | null): void - /** * Remove all loaded PartInstances marked as `rehearsal` from this RundownPlaylist */ diff --git a/packages/job-worker/src/playout/model/implementation/PlayoutModelImpl.ts b/packages/job-worker/src/playout/model/implementation/PlayoutModelImpl.ts index 1d788967f1..fbbe010cf1 100644 --- a/packages/job-worker/src/playout/model/implementation/PlayoutModelImpl.ts +++ b/packages/job-worker/src/playout/model/implementation/PlayoutModelImpl.ts @@ -56,11 +56,9 @@ import { DatabasePersistedModel } from '../../../modelBase' import { ExpectedPackageDBFromStudioBaselineObjects } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' import { ExpectedPlayoutItemStudio } from '@sofie-automation/corelib/dist/dataModel/ExpectedPlayoutItem' import { StudioBaselineHelper } from '../../../studio/model/StudioBaselineHelper' -import { EventsJobs } from '@sofie-automation/corelib/dist/worker/events' import { QuickLoopService } from '../services/QuickLoopService' import { calculatePartTimings, PartCalculatedTimings } from '@sofie-automation/corelib/dist/playout/timings' import { PieceInstanceWithTimings } from '@sofie-automation/corelib/dist/playout/processAndPrune' -import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError' import { NotificationsModelHelper } from '../../../notifications/NotificationsModelHelper' export class PlayoutModelReadonlyImpl implements PlayoutModelReadonly { @@ -283,7 +281,6 @@ export class PlayoutModelImpl extends PlayoutModelReadonlyImpl implements Playou #timelineHasChanged = false #pendingPartInstanceTimingEvents = new Set() - #pendingNotifyCurrentlyPlayingPartEvent = new Map() get hackDeletedPartInstanceIds(): PartInstanceId[] { const result: PartInstanceId[] = [] @@ -528,14 +525,6 @@ export class PlayoutModelImpl extends PlayoutModelReadonlyImpl implements Playou this.#pendingPartInstanceTimingEvents.add(partInstanceId) } - queueNotifyCurrentlyPlayingPartEvent(rundownId: RundownId, partInstance: PlayoutPartInstanceModel | null): void { - if (partInstance && partInstance.partInstance.part.shouldNotifyCurrentPlayingPart) { - this.#pendingNotifyCurrentlyPlayingPartEvent.set(rundownId, partInstance.partInstance.part.externalId) - } else if (!partInstance) { - this.#pendingNotifyCurrentlyPlayingPartEvent.set(rundownId, null) - } - } - removeAllRehearsalPartInstances(): void { const partInstancesToRemove: PartInstanceId[] = [] @@ -703,21 +692,6 @@ export class PlayoutModelImpl extends PlayoutModelReadonlyImpl implements Playou } this.#pendingPartInstanceTimingEvents.clear() - for (const [rundownId, partExternalId] of this.#pendingNotifyCurrentlyPlayingPartEvent) { - // This is low-prio, defer so that it's executed well after publications has been updated, - // so that the playout gateway has had the chance to learn about the timeline changes - this.context - .queueEventJob(EventsJobs.NotifyCurrentlyPlayingPart, { - rundownId: rundownId, - isRehearsal: !!this.playlist.rehearsal, - partExternalId: partExternalId, - }) - .catch((e) => { - logger.warn(`Failed to queue NotifyCurrentlyPlayingPart job: ${stringifyError(e)}`) - }) - } - this.#pendingNotifyCurrentlyPlayingPartEvent.clear() - if (span) span.end() } diff --git a/packages/job-worker/src/playout/snapshot.ts b/packages/job-worker/src/playout/snapshot.ts index 44aa55005e..66b7ae59cc 100644 --- a/packages/job-worker/src/playout/snapshot.ts +++ b/packages/job-worker/src/playout/snapshot.ts @@ -181,7 +181,6 @@ export async function handleRestorePlaylistSnapshot( rundownId: rd._id, } rd.studioId = snapshot.playlist.studioId - rd.notifiedCurrentPlayingPartExternalId = undefined } // TODO: This is too naive. Ideally we should unset it if it isnt valid, as anything other than a match is likely to have issues. diff --git a/packages/job-worker/src/playout/take.ts b/packages/job-worker/src/playout/take.ts index d5b737ce26..7d265d12d1 100644 --- a/packages/job-worker/src/playout/take.ts +++ b/packages/job-worker/src/playout/take.ts @@ -555,7 +555,7 @@ export function updatePartInstanceOnTake( export async function afterTake( context: JobContext, playoutModel: PlayoutModel, - takePartInstance: PlayoutPartInstanceModel + _takePartInstance: PlayoutPartInstanceModel ): Promise { const span = context.startSpan('afterTake') // This function should be called at the end of a "take" event (when the Parts have been updated) @@ -563,8 +563,6 @@ export async function afterTake( await updateTimeline(context, playoutModel) - playoutModel.queueNotifyCurrentlyPlayingPartEvent(takePartInstance.partInstance.rundownId, takePartInstance) - if (span) span.end() } diff --git a/packages/job-worker/src/workers/events/jobs.ts b/packages/job-worker/src/workers/events/jobs.ts index 680b6f1c2c..5ded9b93ac 100644 --- a/packages/job-worker/src/workers/events/jobs.ts +++ b/packages/job-worker/src/workers/events/jobs.ts @@ -1,10 +1,6 @@ import { JobContext } from '../../jobs' import { EventsJobFunc, EventsJobs } from '@sofie-automation/corelib/dist/worker/events' -import { - handleNotifyCurrentlyPlayingPart, - handlePartInstanceTimings, - handleRundownDataHasChanged, -} from '../../events/handle' +import { handlePartInstanceTimings, handleRundownDataHasChanged } from '../../events/handle' type ExecutableFunction = ( context: JobContext, @@ -18,5 +14,4 @@ export type EventsJobHandlers = { export const eventJobHandlers: EventsJobHandlers = { [EventsJobs.PartInstanceTimings]: handlePartInstanceTimings, [EventsJobs.RundownDataChanged]: handleRundownDataHasChanged, - [EventsJobs.NotifyCurrentlyPlayingPart]: handleNotifyCurrentlyPlayingPart, } diff --git a/packages/mos-gateway/src/$schemas/devices.json b/packages/mos-gateway/src/$schemas/devices.json index ae44a97199..08f3c143c3 100644 --- a/packages/mos-gateway/src/$schemas/devices.json +++ b/packages/mos-gateway/src/$schemas/devices.json @@ -63,18 +63,11 @@ "default": 10542 } }, - "required": [ - "lower", - "upper", - "query" - ], + "required": ["lower", "upper", "query"], "additionalProperties": false } }, - "required": [ - "id", - "host" - ], + "required": ["id", "host"], "additionalProperties": false }, "secondary": { @@ -141,23 +134,42 @@ "default": 10542 } }, - "required": [ - "lower", - "upper", - "query" - ], + "required": ["lower", "upper", "query"], "additionalProperties": false } }, - "required": [ - "id", - "host" - ], + "required": ["id", "host"], + "additionalProperties": false + }, + "statuses": { + "type": "object", + "ui:title": "Statuses", + "title": "MosDeviceStatusesConfig", + "properties": { + "enabled": { + "type": "boolean", + "ui:title": "Write Statuses to NRCS", + "ui:description": "", + "ui:summaryTitle": "Statuses", + "default": true + }, + "sendInRehearsal": { + "type": "boolean", + "ui:title": "Send when in Rehearsal mode", + "ui:description": "", + "default": false + }, + "onlySendPlay": { + "type": "boolean", + "ui:title": "Only send PLAY statuses", + "ui:description": "", + "default": false + } + }, + "required": ["enabled"], "additionalProperties": false } }, - "required": [ - "primary" - ], + "required": ["primary", "statuses"], "additionalProperties": false -} \ No newline at end of file +} diff --git a/packages/mos-gateway/src/CoreMosDeviceHandler.ts b/packages/mos-gateway/src/CoreMosDeviceHandler.ts index 1ce92433dc..498727687f 100644 --- a/packages/mos-gateway/src/CoreMosDeviceHandler.ts +++ b/packages/mos-gateway/src/CoreMosDeviceHandler.ts @@ -4,6 +4,7 @@ import { protectString, Observer, PeripheralDevicePubSub, + stringifyError, } from '@sofie-automation/server-core-integration' import { IMOSConnectionStatus, @@ -21,7 +22,6 @@ import { IMOSItem, IMOSROReadyToAir, IMOSROFullStory, - IMOSObjectStatus, IMOSROAck, getMosTypes, MosTypes, @@ -112,9 +112,7 @@ export class CoreMosDeviceHandler { deviceName: this._mosDevice.idPrimary, }) this.core.on('error', (err) => { - this._coreParentHandler.logger.error( - 'Core Error: ' + (typeof err === 'string' ? err : err.message || err.toString()) - ) + this._coreParentHandler.logger.error(`Core Error: ${stringifyError(err)}`) }) this.setupSubscriptionsAndObservers() @@ -138,7 +136,7 @@ export class CoreMosDeviceHandler { Promise.all([ this.core.autoSubscribe(PeripheralDevicePubSub.peripheralDeviceCommands, this.core.deviceId), ]).catch((e) => { - this._coreParentHandler.logger.error(e) + this._coreParentHandler.logger.error(stringifyError(e)) }) this._coreParentHandler.logger.info('CoreMos: Setting up observers..') @@ -349,42 +347,6 @@ export class CoreMosDeviceHandler { // console.log('GOT REPLY', results) return this.fixMosData(ro) } - async setROStatus(roId: string, status: IMOSObjectStatus): Promise { - // console.log('setStoryStatus') - const result = await this._mosDevice.sendRunningOrderStatus({ - ID: this.mosTypes.mosString128.create(roId), - Status: status, - Time: this.mosTypes.mosTime.create(new Date()), - }) - - // console.log('got result', result) - return this.fixMosData(result) - } - async setStoryStatus(roId: string, storyId: string, status: IMOSObjectStatus): Promise { - // console.log('setStoryStatus') - const result = await this._mosDevice.sendStoryStatus({ - RunningOrderId: this.mosTypes.mosString128.create(roId), - ID: this.mosTypes.mosString128.create(storyId), - Status: status, - Time: this.mosTypes.mosTime.create(new Date()), - }) - - // console.log('got result', result) - return this.fixMosData(result) - } - async setItemStatus(roId: string, storyId: string, itemId: string, status: IMOSObjectStatus): Promise { - // console.log('setStoryStatus') - const result = await this._mosDevice.sendItemStatus({ - RunningOrderId: this.mosTypes.mosString128.create(roId), - StoryId: this.mosTypes.mosString128.create(storyId), - ID: this.mosTypes.mosString128.create(itemId), - Status: status, - Time: this.mosTypes.mosTime.create(new Date()), - }) - - // console.log('got result', result) - return this.fixMosData(result) - } async replaceStoryItem( roID: string, storyID: string, diff --git a/packages/mos-gateway/src/connector.ts b/packages/mos-gateway/src/connector.ts index 82ccb6f035..adf3c2d081 100644 --- a/packages/mos-gateway/src/connector.ts +++ b/packages/mos-gateway/src/connector.ts @@ -5,6 +5,7 @@ import { PeripheralDeviceId, loadCertificatesFromDisk, CertificatesConfig, + stringifyError, } from '@sofie-automation/server-core-integration' export interface Config { @@ -36,18 +37,23 @@ export class Connector { this._logger.info('Process initialized') this._logger.info('Initializing Core...') - await this.initCore(certificates) + this.coreHandler = await CoreHandler.create( + this._logger, + this._config.core, + certificates, + this._config.device + ) this._logger.info('Initializing Mos...') - await this.initMos() + this.mosHandler = await MosHandler.create(this._logger, this._config.mos, this.coreHandler) this._logger.info('Initialization done') } catch (e: any) { - this._logger.error('Error during initialization:', e, e.stack) + this._logger.error(`Error during initialization: ${stringifyError(e)}`, e.stack) this._logger.info('Shutting down in 10 seconds!') - this.dispose().catch((e2) => this._logger.error(e2)) + this.dispose().catch((e2) => this._logger.error(stringifyError(e2))) setTimeout(() => { // eslint-disable-next-line no-process-exit @@ -55,32 +61,7 @@ export class Connector { }, 10 * 1000) } } - async initCore(certificates: Buffer[]): Promise { - if (!this._config) { - throw Error('_config is undefined!') - } - - this.coreHandler = new CoreHandler(this._logger, this._config.device) - - if (!this.coreHandler) { - throw Error('coreHandler is undefined!') - } - - return this.coreHandler.init(this._config.core, certificates) - } - async initMos(): Promise { - this.mosHandler = new MosHandler(this._logger) - if (!this._config) { - throw Error('_config is undefined!') - } - - if (!this.coreHandler) { - throw Error('coreHandler is undefined!') - } - - return this.mosHandler.init(this._config.mos, this.coreHandler) - } async dispose(): Promise { if (this.mosHandler) await this.mosHandler.dispose() diff --git a/packages/mos-gateway/src/coreHandler.ts b/packages/mos-gateway/src/coreHandler.ts index 2f578fb61a..fcc366ee42 100644 --- a/packages/mos-gateway/src/coreHandler.ts +++ b/packages/mos-gateway/src/coreHandler.ts @@ -41,12 +41,23 @@ export class CoreHandler { private _coreConfig?: CoreConfig private _certificates?: Buffer[] - constructor(logger: Winston.Logger, deviceOptions: DeviceConfig) { + public static async create( + logger: Winston.Logger, + config: CoreConfig, + certificates: Buffer[], + deviceOptions: DeviceConfig + ): Promise { + const handler = new CoreHandler(logger, deviceOptions) + await handler.init(config, certificates) + return handler + } + + private constructor(logger: Winston.Logger, deviceOptions: DeviceConfig) { this.logger = logger this._deviceOptions = deviceOptions } - async init(config: CoreConfig, certificates: Buffer[]): Promise { + private async init(config: CoreConfig, certificates: Buffer[]): Promise { // this.logger.info('========') this._coreConfig = config this._certificates = certificates @@ -224,7 +235,7 @@ export class CoreHandler { // console.log('cb done') }) .catch((e) => { - this.logger.error(e) + this.logger.error(stringifyError(e)) }) } // eslint-disable-next-line @typescript-eslint/ban-ts-comment diff --git a/packages/mos-gateway/src/mosHandler.ts b/packages/mos-gateway/src/mosHandler.ts index def8f15811..b9765b1b84 100644 --- a/packages/mos-gateway/src/mosHandler.ts +++ b/packages/mos-gateway/src/mosHandler.ts @@ -1,6 +1,5 @@ import { MosConnection, - IMOSDevice, IMOSConnectionStatus, IMOSRunningOrder, IMOSROAck, @@ -16,7 +15,6 @@ import { IMOSROReadyToAir, IMOSROFullStory, IConnectionConfig, - IMOSDeviceConnectionOptions, MosDevice, IMOSListMachInfo, IMOSString128, @@ -27,7 +25,11 @@ import { import * as Winston from 'winston' import { CoreHandler } from './coreHandler' import { CoreMosDeviceHandler } from './CoreMosDeviceHandler' -import { Observer, PeripheralDevicePubSubCollectionsNames } from '@sofie-automation/server-core-integration' +import { + Observer, + PeripheralDevicePubSubCollectionsNames, + stringifyError, +} from '@sofie-automation/server-core-integration' import { DEFAULT_MOS_TIMEOUT_TIME, DEFAULT_MOS_HEARTBEAT_INTERVAL, @@ -35,12 +37,15 @@ import { import { MosGatewayConfig } from '@sofie-automation/shared-lib/dist/generated/MosGatewayOptionsTypes' import { MosDeviceConfig } from '@sofie-automation/shared-lib/dist/generated/MosGatewayDevicesTypes' import { PeripheralDeviceForDevice } from '@sofie-automation/server-core-integration' +import _ = require('underscore') +import { MosStatusHandler } from './mosStatus/handler' +import { isPromise } from 'util/types' export interface MosConfig { self: IConnectionConfig // devices: Array } -export type MosSubDeviceSettings = Record< +type MosSubDeviceSettings = Record< string, { type: '' @@ -48,29 +53,53 @@ export type MosSubDeviceSettings = Record< } > +/** + * Represents a connection in mos-connection, paired with some additional data + */ +interface MosDeviceHandle { + readonly deviceId: string + readonly mosDevice: MosDevice + readonly deviceOptions: Readonly + + // Once connected, a core handler is setup + coreMosHandler?: CoreMosDeviceHandler | Promise + + // If writing back story/item status is enabled, the setup handler + statusHandler?: MosStatusHandler +} + export class MosHandler { public mos: MosConnection | undefined public mosOptions: MosConfig | undefined public debugLogging = false - private allMosDevices: { [id: string]: { mosDevice: IMOSDevice; coreMosHandler?: CoreMosDeviceHandler } } = {} - private _ownMosDevices: { [deviceId: string]: MosDevice } = {} + /** Map of mos devices that have been created */ + private readonly _allMosDevices = new Map() + private _logger: Winston.Logger private _disposed = false private _settings?: MosGatewayConfig - private _openMediaHotStandby: Record private _coreHandler: CoreHandler | undefined private _observers: Array> = [] private _triggerupdateDevicesTimeout: any = null private mosTypes: MosTypes - constructor(logger: Winston.Logger) { + public static async create( + logger: Winston.Logger, + config: MosConfig, + coreHandler: CoreHandler + ): Promise { + const handler = new MosHandler(logger) + await handler.init(config, coreHandler) + return handler + } + + private constructor(logger: Winston.Logger) { this._logger = logger - this._openMediaHotStandby = {} this.mosTypes = getMosTypes(this.strict) // temporary, another will be set upon init() } - async init(config: MosConfig, coreHandler: CoreHandler): Promise { + private async init(config: MosConfig, coreHandler: CoreHandler): Promise { this.mosOptions = config this._coreHandler = coreHandler /*{ @@ -121,11 +150,9 @@ export class MosHandler { return Promise.resolve() } } - setupObservers(): void { + private setupObservers(): void { if (this._observers.length) { - this._observers.forEach((obs) => { - obs.stop() - }) + this._observers.forEach((obs) => obs.stop()) this._observers = [] } this._logger.info('Renewing observers') @@ -141,15 +168,9 @@ export class MosHandler { const deviceObserver = this._coreHandler.core.observe( PeripheralDevicePubSubCollectionsNames.peripheralDeviceForDevice ) - deviceObserver.added = () => { - this._deviceOptionsChanged() - } - deviceObserver.changed = () => { - this._deviceOptionsChanged() - } - deviceObserver.removed = () => { - this._deviceOptionsChanged() - } + deviceObserver.added = () => this._deviceOptionsChanged() + deviceObserver.changed = () => this._deviceOptionsChanged() + deviceObserver.removed = () => this._deviceOptionsChanged() this._observers.push(deviceObserver) this._deviceOptionsChanged() @@ -193,7 +214,7 @@ export class MosHandler { } this._triggerupdateDevicesTimeout = setTimeout(() => { this._updateDevices().catch((e) => { - this._logger.error(e) + this._logger.error(stringifyError(e)) }) }, 20) } @@ -224,171 +245,203 @@ export class MosHandler { } this.debugLog('rawMessage', source, type, message) }) - this.mos.on('info', (message: any) => { - this._logger.info(message) + this.mos.on('info', (message, data) => { + this._logger.info(message, data) }) - this.mos.on('error', (error: any) => { - this._logger.error(error) + this.mos.on('error', (error) => { + this._logger.error(stringifyError(error)) }) - this.mos.on('warning', (warning: any) => { - this._logger.error(warning) + this.mos.on('warning', (warning) => { + this._logger.error(stringifyError(warning)) + }) + + this.mos.onConnection((mosDevice: MosDevice): void => { + this.setupMosDevice(mosDevice).catch((e) => { + this._logger.error(stringifyError(e)) + }) }) - // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.mos.onConnection(async (mosDevice: IMOSDevice): Promise => { - // a new connection to a device has been made - this._logger.info('new mosConnection established: ' + mosDevice.idPrimary + ', ' + mosDevice.idSecondary) - try { - this.allMosDevices[mosDevice.idPrimary] = { mosDevice: mosDevice } + // Open mos-server for connections: + await this.mos.init() + } + private async setupMosDevice(mosDevice: MosDevice): Promise { + // a new connection to a device has been made + this._logger.info('new mosConnection established: ' + mosDevice.idPrimary + ', ' + mosDevice.idSecondary) + try { + const deviceEntry = Array.from(this._allMosDevices.values()).find( + (d) => + d.mosDevice.idPrimary === mosDevice.idPrimary && d.mosDevice.idSecondary === mosDevice.idSecondary + ) - if (!this._coreHandler) throw Error('_coreHandler is undefined!') + if (!deviceEntry) { + // We got a connection for a connection which shouldn't exist.. + this._logger.error(`Got connection for mosDevice "${mosDevice.idPrimary}" which doesn't exist!`) + return + } - const coreMosHandler = await this._coreHandler.registerMosDevice(mosDevice, this, { - openMediaHotStandby: mosDevice.idSecondary - ? this._openMediaHotStandby[mosDevice.idSecondary] - : false, - }) - // this._logger.info('mosDevice registered -------------') - // Setup message flow between the devices: - - this.allMosDevices[mosDevice.idPrimary].coreMosHandler = coreMosHandler - - // Initial Status check: - const connectionStatus = mosDevice.getConnectionStatus() - coreMosHandler.onMosConnectionChanged(connectionStatus) // initial check - // Profile 0: ------------------------------------------------- - mosDevice.onConnectionChange((newStatus: IMOSConnectionStatus) => { - // MOSDevice >>>> Core - coreMosHandler.onMosConnectionChanged(newStatus) - }) - coreMosHandler.onMosConnectionChanged(mosDevice.getConnectionStatus()) - mosDevice.onRequestMachineInfo(async () => { - // MOSDevice >>>> Core - return coreMosHandler.getMachineInfo() - }) + if (deviceEntry.mosDevice !== mosDevice) { + // Our state doesn't match, don't try to use the connection it could be from a previous connection attempt + this._logger.error( + `Got connection for mosDevice "${mosDevice.idPrimary}" which differs to the one setup!` + ) + return + } + + // This is either a promise, if a handler is currently being setup, or the handler itself + if (deviceEntry.coreMosHandler) { + this._logger.error(`Got connection for mosDevice "${mosDevice.idPrimary}" which is already setup!`) + return + } + + if (!this._coreHandler) throw Error('_coreHandler is undefined!') - // Profile 1: ------------------------------------------------- - /* + const openMediaHotStandby = deviceEntry.deviceOptions.secondary?.openMediaHotStandby || false + + const coreMosHandler = await this._coreHandler.registerMosDevice(mosDevice, this, { + openMediaHotStandby: mosDevice.idSecondary ? openMediaHotStandby : false, + }) + // this._logger.info('mosDevice registered -------------') + // Setup message flow between the devices: + + deviceEntry.coreMosHandler = coreMosHandler + + // Initial Status check: + // Profile 0: ------------------------------------------------- + mosDevice.onConnectionChange((newStatus: IMOSConnectionStatus) => { + // MOSDevice >>>> Core + coreMosHandler.onMosConnectionChanged(newStatus) + + // Setup the status handler upon first connection to the NRCS + const isConnected = newStatus.PrimaryConnected || newStatus.SecondaryConnected + if (deviceEntry.deviceOptions.statuses?.enabled && !deviceEntry.statusHandler && isConnected) { + // Creating the handler at this point avoids sending status messages before the connection is established, + // allowing for a sync at startup without needing manual queueing + deviceEntry.statusHandler = new MosStatusHandler( + this._logger, + mosDevice, + coreMosHandler, + deviceEntry.deviceOptions.statuses, + this.strict + ) + } + }) + coreMosHandler.onMosConnectionChanged(mosDevice.getConnectionStatus()) + mosDevice.onRequestMachineInfo(async () => { + // MOSDevice >>>> Core + return coreMosHandler.getMachineInfo() + }) + + // Profile 1: ------------------------------------------------- + /* mosDevice.onRequestMOSObject((objId: string) => { // coreMosHandler.fetchMosObject(objId) // return Promise }) */ - // onRequestMOSObject: (cb: (objId: string) => Promise) => void - // onRequestAllMOSObjects: (cb: () => Promise>) => void - // getMOSObject: (objId: string) => Promise - // getAllMOSObjects: () => Promise> - // Profile 2: ------------------------------------------------- - mosDevice.onCreateRunningOrder(async (ro: IMOSRunningOrder) => { - // MOSDevice >>>> Core - return this._getROAck(ro.ID, coreMosHandler.mosRoCreate(ro)) - }) - mosDevice.onReplaceRunningOrder(async (ro: IMOSRunningOrder) => { - // MOSDevice >>>> Core - return this._getROAck(ro.ID, coreMosHandler.mosRoReplace(ro)) - }) - mosDevice.onDeleteRunningOrder(async (runningOrderId: IMOSString128) => { - // MOSDevice >>>> Core - return this._getROAck(runningOrderId, coreMosHandler.mosRoDelete(runningOrderId)) - }) - mosDevice.onMetadataReplace(async (ro: IMOSRunningOrderBase) => { - // MOSDevice >>>> Core - return this._getROAck(ro.ID, coreMosHandler.mosRoMetadata(ro)) - }) - mosDevice.onRunningOrderStatus(async (status: IMOSRunningOrderStatus) => { - // MOSDevice >>>> Core - return this._getROAck(status.ID, coreMosHandler.mosRoStatus(status)) - }) - mosDevice.onStoryStatus(async (status: IMOSStoryStatus) => { - // MOSDevice >>>> Core - return this._getROAck(status.RunningOrderId, coreMosHandler.mosRoStoryStatus(status)) - }) - mosDevice.onItemStatus(async (status: IMOSItemStatus) => { - // MOSDevice >>>> Core - return this._getROAck(status.RunningOrderId, coreMosHandler.mosRoItemStatus(status)) - }) - mosDevice.onROInsertStories(async (Action: IMOSStoryAction, Stories: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryInsert(Action, Stories)) - }) - mosDevice.onROInsertItems(async (Action: IMOSItemAction, Items: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemInsert(Action, Items)) - }) - mosDevice.onROReplaceStories(async (Action: IMOSStoryAction, Stories: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryReplace(Action, Stories)) - }) - mosDevice.onROReplaceItems(async (Action: IMOSItemAction, Items: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemReplace(Action, Items)) - }) - mosDevice.onROMoveStories(async (Action: IMOSStoryAction, Stories: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryMove(Action, Stories)) - }) - mosDevice.onROMoveItems(async (Action: IMOSItemAction, Items: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemMove(Action, Items)) - }) - mosDevice.onRODeleteStories(async (Action: IMOSROAction, Stories: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryDelete(Action, Stories)) - }) - mosDevice.onRODeleteItems(async (Action: IMOSStoryAction, Items: Array) => { - // MOSDevice >>>> Core - return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemDelete(Action, Items)) - }) - mosDevice.onROSwapStories( - async (Action: IMOSROAction, StoryID0: IMOSString128, StoryID1: IMOSString128) => { - // MOSDevice >>>> Core - return this._getROAck( - Action.RunningOrderID, - coreMosHandler.mosRoStorySwap(Action, StoryID0, StoryID1) - ) - } - ) - mosDevice.onROSwapItems( - async (Action: IMOSStoryAction, ItemID0: IMOSString128, ItemID1: IMOSString128) => { - // MOSDevice >>>> Core - return this._getROAck( - Action.RunningOrderID, - coreMosHandler.mosRoItemSwap(Action, ItemID0, ItemID1) - ) - } - ) - mosDevice.onReadyToAir(async (Action: IMOSROReadyToAir) => { + // onRequestMOSObject: (cb: (objId: string) => Promise) => void + // onRequestAllMOSObjects: (cb: () => Promise>) => void + // getMOSObject: (objId: string) => Promise + // getAllMOSObjects: () => Promise> + // Profile 2: ------------------------------------------------- + mosDevice.onCreateRunningOrder(async (ro: IMOSRunningOrder) => { + // MOSDevice >>>> Core + return this._getROAck(ro.ID, coreMosHandler.mosRoCreate(ro)) + }) + mosDevice.onReplaceRunningOrder(async (ro: IMOSRunningOrder) => { + // MOSDevice >>>> Core + return this._getROAck(ro.ID, coreMosHandler.mosRoReplace(ro)) + }) + mosDevice.onDeleteRunningOrder(async (runningOrderId: IMOSString128) => { + // MOSDevice >>>> Core + return this._getROAck(runningOrderId, coreMosHandler.mosRoDelete(runningOrderId)) + }) + mosDevice.onMetadataReplace(async (ro: IMOSRunningOrderBase) => { + // MOSDevice >>>> Core + return this._getROAck(ro.ID, coreMosHandler.mosRoMetadata(ro)) + }) + mosDevice.onRunningOrderStatus(async (status: IMOSRunningOrderStatus) => { + // MOSDevice >>>> Core + return this._getROAck(status.ID, coreMosHandler.mosRoStatus(status)) + }) + mosDevice.onStoryStatus(async (status: IMOSStoryStatus) => { + // MOSDevice >>>> Core + return this._getROAck(status.RunningOrderId, coreMosHandler.mosRoStoryStatus(status)) + }) + mosDevice.onItemStatus(async (status: IMOSItemStatus) => { + // MOSDevice >>>> Core + return this._getROAck(status.RunningOrderId, coreMosHandler.mosRoItemStatus(status)) + }) + mosDevice.onROInsertStories(async (Action: IMOSStoryAction, Stories: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryInsert(Action, Stories)) + }) + mosDevice.onROInsertItems(async (Action: IMOSItemAction, Items: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemInsert(Action, Items)) + }) + mosDevice.onROReplaceStories(async (Action: IMOSStoryAction, Stories: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryReplace(Action, Stories)) + }) + mosDevice.onROReplaceItems(async (Action: IMOSItemAction, Items: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemReplace(Action, Items)) + }) + mosDevice.onROMoveStories(async (Action: IMOSStoryAction, Stories: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryMove(Action, Stories)) + }) + mosDevice.onROMoveItems(async (Action: IMOSItemAction, Items: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemMove(Action, Items)) + }) + mosDevice.onRODeleteStories(async (Action: IMOSROAction, Stories: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoStoryDelete(Action, Stories)) + }) + mosDevice.onRODeleteItems(async (Action: IMOSStoryAction, Items: Array) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemDelete(Action, Items)) + }) + mosDevice.onROSwapStories( + async (Action: IMOSROAction, StoryID0: IMOSString128, StoryID1: IMOSString128) => { // MOSDevice >>>> Core - return this._getROAck(Action.ID, coreMosHandler.mosRoReadyToAir(Action)) - }) - // ---------------------------------------------------------------- - // Init actions - /* + return this._getROAck( + Action.RunningOrderID, + coreMosHandler.mosRoStorySwap(Action, StoryID0, StoryID1) + ) + } + ) + mosDevice.onROSwapItems(async (Action: IMOSStoryAction, ItemID0: IMOSString128, ItemID1: IMOSString128) => { + // MOSDevice >>>> Core + return this._getROAck(Action.RunningOrderID, coreMosHandler.mosRoItemSwap(Action, ItemID0, ItemID1)) + }) + mosDevice.onReadyToAir(async (Action: IMOSROReadyToAir) => { + // MOSDevice >>>> Core + return this._getROAck(Action.ID, coreMosHandler.mosRoReadyToAir(Action)) + }) + // ---------------------------------------------------------------- + // Init actions + /* mosDevice.getMachineInfo() .then((machineInfo: IMOSListMachInfo) => { }) */ - // Profile 3: ------------------------------------------------- - // Profile 4: ------------------------------------------------- - // onStory: (cb: (story: IMOSROFullStory) => Promise) => void - mosDevice.onRunningOrderStory(async (story: IMOSROFullStory) => { - // MOSDevice >>>> Core - return this._getROAck(story.RunningOrderId, coreMosHandler.mosRoFullStory(story)) - }) - } catch (e) { - this._logger.error('Error:', e) - } - }) - - // Open mos-server for connections: - await this.mos.init() + // Profile 3: ------------------------------------------------- + // Profile 4: ------------------------------------------------- + // onStory: (cb: (story: IMOSROFullStory) => Promise) => void + mosDevice.onRunningOrderStory(async (story: IMOSROFullStory) => { + // MOSDevice >>>> Core + return this._getROAck(story.RunningOrderId, coreMosHandler.mosRoFullStory(story)) + }) + } catch (e) { + this._logger.error(stringifyError(e)) + } } private sendStatusOfAllMosDevices() { // Send an update to Core of the status of all mos devices - for (const handler of Object.values<{ mosDevice: IMOSDevice; coreMosHandler?: CoreMosDeviceHandler }>( - this.allMosDevices - )) { - if (handler.coreMosHandler) { + for (const handler of this._allMosDevices.values()) { + if (handler.coreMosHandler && !isPromise(handler.coreMosHandler)) { handler.coreMosHandler.onMosConnectionChanged(handler.mosDevice.getConnectionStatus()) } } @@ -424,25 +477,18 @@ export class MosHandler { for (const [deviceId, device] of Object.entries<{ options: MosDeviceConfig }>(devices)) { if (device) { if (device.options.secondary) { - this._openMediaHotStandby[device.options.secondary.id] = - device.options.secondary?.openMediaHotStandby || false // If the host isn't set, don't use secondary: if (!device.options.secondary.host || !device.options.secondary.id) delete device.options.secondary } - const oldDevice: MosDevice | null = this._getDevice(deviceId) + const oldDevice = this._allMosDevices.get(deviceId) if (!oldDevice) { this._logger.info('Initializing new device: ' + deviceId) devicesToAdd[deviceId] = device } else { - if ( - (oldDevice.primaryId || '') !== device.options.primary?.id || - (oldDevice.primaryHost || '') !== device.options.primary?.host || - (oldDevice.secondaryId || '') !== (device.options.secondary?.id || '') || - (oldDevice.secondaryHost || '') !== (device.options.secondary?.host || '') - ) { + if (!_.isEqual(oldDevice.deviceOptions, device.options)) { this._logger.info('Re-initializing device: ' + deviceId) devicesToRemove[deviceId] = true devicesToAdd[deviceId] = device @@ -451,7 +497,7 @@ export class MosHandler { } } - for (const [deviceId, oldDevice] of Object.entries(this._ownMosDevices)) { + for (const [deviceId, oldDevice] of this._allMosDevices.entries()) { if (oldDevice && !devices[deviceId]) { this._logger.info('Un-initializing device: ' + deviceId) devicesToRemove[deviceId] = true @@ -471,29 +517,26 @@ export class MosHandler { ) } } - private async _addDevice(deviceId: string, deviceOptions: IMOSDeviceConnectionOptions): Promise { - if (this._getDevice(deviceId)) { + private async _addDevice(deviceId: string, deviceOptions0: MosDeviceConfig): Promise { + if (this._allMosDevices.has(deviceId)) { // the device is already there throw new Error('Unable to add device "' + deviceId + '", because it already exists!') } - if (!this.mos) { - throw Error('mos is undefined, call _initMosConnection first!') - } - - deviceOptions = JSON.parse(JSON.stringify(deviceOptions)) // deep clone - - deviceOptions.primary.timeout = deviceOptions.primary.timeout || DEFAULT_MOS_TIMEOUT_TIME - - deviceOptions.primary.heartbeatInterval = - deviceOptions.primary.heartbeatInterval || DEFAULT_MOS_HEARTBEAT_INTERVAL + if (!this.mos) throw Error('mos is undefined, call _initMosConnection first!') - if (deviceOptions.secondary?.id && this._openMediaHotStandby[deviceOptions.secondary.id]) { - deviceOptions.secondary.openMediaHotStandby = true - } + const deviceOptions: MosDeviceConfig = JSON.parse(JSON.stringify(deviceOptions0)) // deep clone + deviceOptions.primary.timeout ||= DEFAULT_MOS_TIMEOUT_TIME + deviceOptions.primary.heartbeatInterval ||= DEFAULT_MOS_HEARTBEAT_INTERVAL const mosDevice: MosDevice = await this.mos.connect(deviceOptions) - this._ownMosDevices[deviceId] = mosDevice + this._allMosDevices.set(deviceId, { + deviceId: deviceId, + mosDevice: mosDevice, + deviceOptions, + }) + + await this.setupMosDevice(mosDevice) try { const getMachineInfoUntilConnected = async (): Promise => @@ -534,23 +577,29 @@ export class MosHandler { return mosDevice } catch (e) { // something went wrong during init: - if (!this.mos) { - throw Error('mos is undefined!') - } + if (!this.mos) throw Error('mos is undefined!') this.mos.disposeMosDevice(mosDevice).catch((e2) => { - this._logger.error(e2) + this._logger.error(stringifyError(e2)) }) throw e } } private async _removeDevice(deviceId: string): Promise { - const mosDevice = this._getDevice(deviceId) as MosDevice + const deviceEntry = this._allMosDevices.get(deviceId) + this._allMosDevices.delete(deviceId) - delete this._ownMosDevices[deviceId] - if (mosDevice) { - if (!this._coreHandler) throw Error('_coreHandler is undefined!') - await this._coreHandler.unRegisterMosDevice(mosDevice) + if (deviceEntry) { + const mosDevice = deviceEntry.mosDevice + + // Cleanup the coreMosHandler from the device + if (this._coreHandler) await this._coreHandler.unRegisterMosDevice(mosDevice) + + // Stop the status handler, if enabled + if (deviceEntry.statusHandler) { + deviceEntry.statusHandler.dispose() + delete deviceEntry.statusHandler + } if (!this.mos) { throw Error('mos is undefined!') @@ -570,9 +619,6 @@ export class MosHandler { } return Promise.resolve() } - private _getDevice(deviceId: string): MosDevice | null { - return this._ownMosDevices[deviceId] || null - } private async _getROAck(roId: IMOSString128, p: Promise): Promise { return p .then(() => { @@ -584,7 +630,7 @@ export class MosHandler { return roAck }) .catch((err) => { - this._logger.error('ROAck error:', err) + this._logger.error(`ROAck error: ${stringifyError(err)}`) const roAck: IMOSROAck = { ID: roId, Status: this.mosTypes.mosString128.create('Error: ' + err.toString()), diff --git a/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts b/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts new file mode 100644 index 0000000000..624d7eb9c1 --- /dev/null +++ b/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts @@ -0,0 +1,317 @@ +import { protectString } from '@sofie-automation/server-core-integration' +import { + IngestPartPlaybackStatus, + IngestRundownActiveStatus, + IngestRundownStatus, +} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' +import { diffStatuses, ItemStatusEntry, MOS_STATUS_UNKNOWN, StoryStatusEntry } from '../diff' +import type { MosDeviceStatusesConfig } from '../../generated/devices' +import { IMOSObjectStatus } from '@mos-connection/connector' + +describe('diffStatuses', () => { + const defaultConfig: MosDeviceStatusesConfig = { + enabled: true, + sendInRehearsal: true, + onlySendPlay: false, + } + const singlePartRundown: IngestRundownStatus = { + _id: protectString('rundown0'), + externalId: 'external0', + active: IngestRundownActiveStatus.ACTIVE, + segments: [ + { + externalId: 'segment0', + parts: [ + { + externalId: 'part0', + isReady: true, + itemsReady: {}, + playbackStatus: IngestPartPlaybackStatus.UNKNOWN, + }, + ], + }, + ], + } + + test('diff no changes', () => { + const diff = diffStatuses(defaultConfig, singlePartRundown, singlePartRundown) + expect(diff).toHaveLength(0) + }) + + test('part playback changes', () => { + const partPlayingState = structuredClone(singlePartRundown) + partPlayingState.segments[0].parts[0].playbackStatus = IngestPartPlaybackStatus.PLAY + + { + // change to play + const diff = diffStatuses(defaultConfig, singlePartRundown, partPlayingState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.PLAY, + } satisfies StoryStatusEntry) + } + + { + const partStoppedState = structuredClone(partPlayingState) + partStoppedState.segments[0].parts[0].playbackStatus = IngestPartPlaybackStatus.STOP + + // change to stop + const diff = diffStatuses(defaultConfig, partPlayingState, partStoppedState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.STOP, + } satisfies StoryStatusEntry) + } + + { + const partClearState = structuredClone(partPlayingState) + partClearState.segments[0].parts[0].playbackStatus = IngestPartPlaybackStatus.UNKNOWN + + // change to clear + const diff = diffStatuses(defaultConfig, partPlayingState, partClearState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.READY, + } satisfies StoryStatusEntry) + } + }) + + test('part ready changes', () => { + const partNotReadyState = structuredClone(singlePartRundown) + partNotReadyState.segments[0].parts[0].isReady = false + + { + // change to not ready + const diff = diffStatuses(defaultConfig, singlePartRundown, partNotReadyState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.NOT_READY, + } satisfies StoryStatusEntry) + } + + { + // change to ready + const diff = diffStatuses(defaultConfig, partNotReadyState, singlePartRundown) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.READY, + } satisfies StoryStatusEntry) + } + + { + const partClearState = structuredClone(partNotReadyState) + partClearState.segments[0].parts[0].isReady = null + + // change to unknown + const diff = diffStatuses(defaultConfig, partNotReadyState, partClearState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: MOS_STATUS_UNKNOWN, + } satisfies StoryStatusEntry) + } + }) + + test('part added to rundown', () => { + const extraPartState = structuredClone(singlePartRundown) + extraPartState.segments[0].parts.push({ + externalId: 'part1', + isReady: false, + itemsReady: {}, + playbackStatus: IngestPartPlaybackStatus.UNKNOWN, + }) + + { + const diff = diffStatuses(defaultConfig, singlePartRundown, extraPartState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part1', + mosStatus: IMOSObjectStatus.NOT_READY, + } satisfies StoryStatusEntry) + } + }) + + test('part removed from rundown', () => { + const extraPartState = structuredClone(singlePartRundown) + extraPartState.segments[0].parts.push({ + externalId: 'part1', + isReady: false, + itemsReady: {}, + playbackStatus: IngestPartPlaybackStatus.UNKNOWN, + }) + + { + const diff = diffStatuses(defaultConfig, extraPartState, singlePartRundown) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part1', + mosStatus: MOS_STATUS_UNKNOWN, + } satisfies StoryStatusEntry) + } + }) + + test('rundown becomes inactive', () => { + const inactiveState = structuredClone(singlePartRundown) + inactiveState.active = IngestRundownActiveStatus.INACTIVE + + { + const diff = diffStatuses(defaultConfig, singlePartRundown, inactiveState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: MOS_STATUS_UNKNOWN, + } satisfies StoryStatusEntry) + } + }) + + test('rundown becomes active', () => { + const inactiveState = structuredClone(singlePartRundown) + inactiveState.active = IngestRundownActiveStatus.INACTIVE + + { + const diff = diffStatuses(defaultConfig, inactiveState, singlePartRundown) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.READY, + } satisfies StoryStatusEntry) + } + }) + + test('rundown becomes rehearsal', () => { + const inactiveState = structuredClone(singlePartRundown) + inactiveState.active = IngestRundownActiveStatus.INACTIVE + const rehearsalState = structuredClone(singlePartRundown) + rehearsalState.active = IngestRundownActiveStatus.REHEARSAL + + { + // send during rehearsal + const diff = diffStatuses(defaultConfig, inactiveState, rehearsalState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'story', + rundownExternalId: 'external0', + storyId: 'part0', + mosStatus: IMOSObjectStatus.READY, + } satisfies StoryStatusEntry) + } + + { + // no send during rehearsal + const disableRehearsalConfig = { + ...defaultConfig, + sendInRehearsal: false, + } + const diff = diffStatuses(disableRehearsalConfig, inactiveState, rehearsalState) + expect(diff).toHaveLength(0) + } + }) + + test('add items', () => { + { + const itemsState = structuredClone(singlePartRundown) + itemsState.segments[0].parts[0].itemsReady.item0 = true + + const diff = diffStatuses(defaultConfig, singlePartRundown, itemsState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'item', + rundownExternalId: 'external0', + storyId: 'part0', + itemId: 'item0', + mosStatus: IMOSObjectStatus.READY, + } satisfies ItemStatusEntry) + } + + { + const itemsState = structuredClone(singlePartRundown) + itemsState.segments[0].parts[0].itemsReady.item0 = false + + const diff = diffStatuses(defaultConfig, singlePartRundown, itemsState) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'item', + rundownExternalId: 'external0', + storyId: 'part0', + itemId: 'item0', + mosStatus: IMOSObjectStatus.NOT_READY, + } satisfies ItemStatusEntry) + } + + { + const itemsState = structuredClone(singlePartRundown) + itemsState.segments[0].parts[0].itemsReady.item0 = undefined + + const diff = diffStatuses(defaultConfig, singlePartRundown, itemsState) + expect(diff).toHaveLength(0) + } + }) + + test('remove items', () => { + { + const itemsState = structuredClone(singlePartRundown) + itemsState.segments[0].parts[0].itemsReady.item0 = true + + const diff = diffStatuses(defaultConfig, itemsState, singlePartRundown) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'item', + rundownExternalId: 'external0', + storyId: 'part0', + itemId: 'item0', + mosStatus: MOS_STATUS_UNKNOWN, + } satisfies ItemStatusEntry) + } + + { + const itemsState = structuredClone(singlePartRundown) + itemsState.segments[0].parts[0].itemsReady.item0 = undefined + + const diff = diffStatuses(defaultConfig, itemsState, singlePartRundown) + expect(diff).toHaveLength(0) + } + }) + + test('change item state', () => { + const itemsState = structuredClone(singlePartRundown) + itemsState.segments[0].parts[0].itemsReady.item0 = true + + const items2State = structuredClone(itemsState) + items2State.segments[0].parts[0].itemsReady.item0 = false + + const diff = diffStatuses(defaultConfig, itemsState, items2State) + expect(diff).toHaveLength(1) + expect(diff[0]).toEqual({ + type: 'item', + rundownExternalId: 'external0', + storyId: 'part0', + itemId: 'item0', + mosStatus: IMOSObjectStatus.NOT_READY, + } satisfies ItemStatusEntry) + }) +}) diff --git a/packages/mos-gateway/src/mosStatus/diff.ts b/packages/mos-gateway/src/mosStatus/diff.ts new file mode 100644 index 0000000000..9eaa6904f7 --- /dev/null +++ b/packages/mos-gateway/src/mosStatus/diff.ts @@ -0,0 +1,161 @@ +import { IMOSObjectStatus } from '@mos-connection/connector' +import type { MosDeviceStatusesConfig } from '../generated/devices' +import { + IngestPartPlaybackStatus, + type IngestPartStatus, + type IngestRundownStatus, +} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' + +export const MOS_STATUS_UNKNOWN = '' as IMOSObjectStatus // Force the status to be empty, which isn't a valid state in the enum + +export type SomeStatusEntry = StoryStatusEntry | ItemStatusEntry + +export interface ItemStatusEntry { + type: 'item' + rundownExternalId: string + storyId: string + itemId: string + mosStatus: IMOSObjectStatus +} + +export interface StoryStatusEntry { + type: 'story' + rundownExternalId: string + storyId: string + mosStatus: IMOSObjectStatus +} + +export function diffStatuses( + config: MosDeviceStatusesConfig, + previousStatuses: IngestRundownStatus | undefined, + newStatuses: IngestRundownStatus | undefined +): SomeStatusEntry[] { + const rundownExternalId = previousStatuses?.externalId ?? newStatuses?.externalId + + if ((!previousStatuses && !newStatuses) || !rundownExternalId) return [] + + const statuses: SomeStatusEntry[] = [] + + const previousStories = buildStoriesMap(previousStatuses) + const newStories = buildStoriesMap(newStatuses) + + // Process any removed stories first + for (const [storyId, story] of previousStories) { + if (!newStories.has(storyId)) { + // The story has been removed + statuses.push({ + type: 'story', + rundownExternalId, + storyId, + mosStatus: MOS_STATUS_UNKNOWN, + }) + + // Clear any items too + for (const [itemId, isReady] of Object.entries(story.itemsReady)) { + if (isReady === undefined) continue + + statuses.push({ + type: 'item', + rundownExternalId, + storyId, + itemId: itemId, + mosStatus: MOS_STATUS_UNKNOWN, + }) + } + } + } + + // Then any remaining stories in order + for (const [storyId, status] of newStories) { + const previousStatus = previousStories.get(storyId) + + const newMosStatus = buildMosStatus(config, status.playbackStatus, status.isReady, newStatuses?.active) + if ( + newMosStatus !== null && + (!previousStatus || + buildMosStatus( + config, + previousStatus.playbackStatus, + previousStatus.isReady, + previousStatuses?.active + ) !== newMosStatus) + ) { + statuses.push({ + type: 'story', + rundownExternalId, + storyId, + mosStatus: newMosStatus, + }) + } + + // Diff each item in the story + const previousItemStatuses = previousStatus?.itemsReady ?? {} + const allItemIds = new Set([...Object.keys(status.itemsReady), ...Object.keys(previousItemStatuses)]) + + for (const itemId of allItemIds) { + const newReady = status.itemsReady[itemId] + const previousReady = previousItemStatuses[itemId] + + const newMosStatus = + newReady !== undefined + ? buildMosStatus(config, status.playbackStatus, newReady, newStatuses?.active) + : null + const previousMosStatus = + previousReady !== undefined && previousStatus + ? buildMosStatus(config, previousStatus.playbackStatus, previousReady, previousStatuses?.active) + : null + + if ((newMosStatus !== null || previousMosStatus !== null) && previousMosStatus !== newMosStatus) { + statuses.push({ + type: 'item', + rundownExternalId, + storyId, + itemId, + mosStatus: newMosStatus ?? MOS_STATUS_UNKNOWN, + }) + } + } + } + + return statuses +} + +function buildStoriesMap(state: IngestRundownStatus | undefined): Map { + const stories = new Map() + + if (state) { + for (const segment of state.segments) { + for (const part of segment.parts) { + stories.set(part.externalId, part) + } + } + } + + return stories +} + +function buildMosStatus( + config: MosDeviceStatusesConfig, + playbackStatus: IngestPartPlaybackStatus, + isReady: boolean | null | undefined, + active: IngestRundownStatus['active'] | undefined +): IMOSObjectStatus | null { + if (active === 'inactive') return MOS_STATUS_UNKNOWN + if (active === 'rehearsal' && !config.sendInRehearsal) return null + + switch (playbackStatus) { + case IngestPartPlaybackStatus.PLAY: + return IMOSObjectStatus.PLAY + case IngestPartPlaybackStatus.STOP: + return IMOSObjectStatus.STOP + default: + switch (isReady) { + case true: + return IMOSObjectStatus.READY + case false: + return IMOSObjectStatus.NOT_READY + default: + return MOS_STATUS_UNKNOWN + } + } +} diff --git a/packages/mos-gateway/src/mosStatus/handler.ts b/packages/mos-gateway/src/mosStatus/handler.ts new file mode 100644 index 0000000000..689c74c231 --- /dev/null +++ b/packages/mos-gateway/src/mosStatus/handler.ts @@ -0,0 +1,163 @@ +import { + getMosTypes, + type IMOSItemStatus, + IMOSObjectStatus, + type IMOSStoryStatus, + type MosTypes, + type IMOSDevice, +} from '@mos-connection/connector' +import type { MosDeviceStatusesConfig } from '../generated/devices' +import type { CoreMosDeviceHandler } from '../CoreMosDeviceHandler' +import { + assertNever, + type Observer, + PeripheralDevicePubSub, + PeripheralDevicePubSubCollectionsNames, + stringifyError, + SubscriptionId, +} from '@sofie-automation/server-core-integration' +import type { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' +import type { RundownId } from '@sofie-automation/shared-lib/dist/core/model/Ids' +import type winston = require('winston') +import { Queue } from '@sofie-automation/server-core-integration/dist/lib/queue' +import { diffStatuses } from './diff' + +export class MosStatusHandler { + readonly #logger: winston.Logger + readonly #mosDevice: IMOSDevice + readonly #coreMosHandler: CoreMosDeviceHandler + readonly #config: MosDeviceStatusesConfig + readonly #mosTypes: MosTypes + + readonly #messageQueue = new Queue() + + #subId: SubscriptionId | undefined + #observer: Observer | undefined + + #destroyed = false + + readonly #lastStatuses = new Map() + + constructor( + logger: winston.Logger, + mosDevice: IMOSDevice, + coreMosHandler: CoreMosDeviceHandler, + config: MosDeviceStatusesConfig, + strictMosTypes: boolean + ) { + if (!config.enabled) throw new Error('MosStatusHandler is not enabled') + + this.#logger = logger + this.#mosDevice = mosDevice + this.#coreMosHandler = coreMosHandler + this.#config = config + this.#mosTypes = getMosTypes(strictMosTypes) + + coreMosHandler.core + .autoSubscribe(PeripheralDevicePubSub.ingestDeviceRundownStatus, coreMosHandler.core.deviceId) + .then((subId) => { + this.#subId = subId + + if (this.#destroyed) coreMosHandler.core.unsubscribe(subId) + }) + .catch((e) => { + this.#logger.error(`Error subscribing to ingestDeviceRundownStatus: ${stringifyError(e)}`) + }) + + // Setup the observer immediately, which will trigger a resync upon the documents being added + this.#observer = coreMosHandler.core.observe(PeripheralDevicePubSubCollectionsNames.ingestRundownStatus) + this.#observer.added = (id) => this.#rundownChanged(id) + this.#observer.changed = (id) => this.#rundownChanged(id) + this.#observer.removed = (id) => this.#rundownChanged(id) + + this.#logger.info(`MosStatusHandler initialized for ${coreMosHandler.core.deviceId}`) + } + + #rundownChanged(id: RundownId): void { + const collection = this.#coreMosHandler.core.getCollection( + PeripheralDevicePubSubCollectionsNames.ingestRundownStatus + ) + + const newStatuses = collection.findOne(id) + const previousStatuses = this.#lastStatuses.get(id) + + // Update the last statuses store + if (newStatuses) { + this.#lastStatuses.set(id, newStatuses) + } else { + this.#lastStatuses.delete(id) + } + + const statusDiff = diffStatuses(this.#config, previousStatuses, newStatuses) + if (statusDiff.length === 0) return + + const diffTime = this.#mosTypes.mosTime.create(Date.now()) + + // Future: should this be done with some concurrency? + for (const status of statusDiff) { + // New implementation 2022 only sends PLAY, never stop, after getting advice from AP + // Reason 1: NRK ENPS "sendt tid" (elapsed time) stopped working in ENPS 8/9 when doing STOP prior to PLAY + // Reason 2: there's a delay between the STOP (yellow line disappears) and PLAY (yellow line re-appears), which annoys the users + if (this.#config.onlySendPlay && status.mosStatus !== IMOSObjectStatus.PLAY) continue + + this.#messageQueue + .putOnQueue(async () => { + if (this.#isDeviceConnected()) { + if (status.type === 'item') { + const newStatus: IMOSItemStatus = { + RunningOrderId: this.#mosTypes.mosString128.create(status.rundownExternalId), + StoryId: this.#mosTypes.mosString128.create(status.storyId), + ID: this.#mosTypes.mosString128.create(status.itemId), + Status: status.mosStatus, + Time: diffTime, + } + this.#logger.info(`Sending Story status: ${JSON.stringify(newStatus)}`) + + // Send status + await this.#mosDevice.sendItemStatus(newStatus) + } else if (status.type === 'story') { + const newStatus: IMOSStoryStatus = { + RunningOrderId: this.#mosTypes.mosString128.create(status.rundownExternalId), + ID: this.#mosTypes.mosString128.create(status.storyId), + Status: status.mosStatus, + Time: diffTime, + } + this.#logger.info(`Sending Story status: ${JSON.stringify(newStatus)}`) + + // Send status + await this.#mosDevice.sendStoryStatus(newStatus) + } else { + this.#logger.debug(`Discarding unknown queued status: ${JSON.stringify(status)}`) + assertNever(status) + } + } else if (this.#config.onlySendPlay) { + // No need to do anything. + this.#logger.info(`Not connected, skipping play status: ${JSON.stringify(status)}`) + } else { + this.#logger.info(`Not connected, discarding status: ${JSON.stringify(status)}`) + } + }) + .catch((e) => { + this.#logger.error( + `Error sending of "${status.rundownExternalId}"-"${ + status.storyId + }" status to MOS device: ${stringifyError(e)}` + ) + }) + } + } + + #isDeviceConnected(): boolean { + return ( + this.#mosDevice.getConnectionStatus().PrimaryConnected || + this.#mosDevice.getConnectionStatus().SecondaryConnected + ) + } + + dispose(): void { + this.#destroyed = true + + this.#observer?.stop() + if (this.#subId) this.#coreMosHandler.core.unsubscribe(this.#subId) + } +} diff --git a/packages/mos-gateway/src/versions.ts b/packages/mos-gateway/src/versions.ts index 8cacd8223a..435a9fa418 100644 --- a/packages/mos-gateway/src/versions.ts +++ b/packages/mos-gateway/src/versions.ts @@ -20,7 +20,7 @@ export function getVersions(logger: Winston.Logger): { [packageName: string]: st } } } catch (e) { - logger.error(e) + logger.error(stringifyError(e)) } return versions } diff --git a/packages/shared-lib/src/generated/MosGatewayDevicesTypes.ts b/packages/shared-lib/src/generated/MosGatewayDevicesTypes.ts index f192cf7614..b6ebdc5665 100644 --- a/packages/shared-lib/src/generated/MosGatewayDevicesTypes.ts +++ b/packages/shared-lib/src/generated/MosGatewayDevicesTypes.ts @@ -31,4 +31,10 @@ export interface MosDeviceConfig { query: number } } + statuses: MosDeviceStatusesConfig +} +export interface MosDeviceStatusesConfig { + enabled: boolean + sendInRehearsal?: boolean + onlySendPlay?: boolean } diff --git a/packages/shared-lib/src/ingest/rundownStatus.ts b/packages/shared-lib/src/ingest/rundownStatus.ts new file mode 100644 index 0000000000..535473e014 --- /dev/null +++ b/packages/shared-lib/src/ingest/rundownStatus.ts @@ -0,0 +1,42 @@ +import type { RundownId } from '../core/model/Ids' + +export interface IngestRundownStatus { + _id: RundownId + + /** Rundown external id */ + externalId: string + + active: IngestRundownActiveStatus + + segments: IngestSegmentStatus[] +} + +export enum IngestRundownActiveStatus { + ACTIVE = 'active', + REHEARSAL = 'rehearsal', + INACTIVE = 'inactive', +} + +export interface IngestSegmentStatus { + /** Segment external id */ + externalId: string + + parts: IngestPartStatus[] +} + +export interface IngestPartStatus { + /** Part external id */ + externalId: string + + isReady: boolean | null + + itemsReady: Record + + playbackStatus: IngestPartPlaybackStatus +} + +export enum IngestPartPlaybackStatus { + UNKNOWN = 'unknown', + PLAY = 'play', + STOP = 'stop', +} diff --git a/packages/shared-lib/src/pubsub/peripheralDevice.ts b/packages/shared-lib/src/pubsub/peripheralDevice.ts index d170835290..762c97275d 100644 --- a/packages/shared-lib/src/pubsub/peripheralDevice.ts +++ b/packages/shared-lib/src/pubsub/peripheralDevice.ts @@ -10,6 +10,7 @@ import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '../core/model/ import { PeripheralDeviceCommand } from '../core/model/PeripheralDeviceCommand' import { ExpectedPlayoutItemPeripheralDevice } from '../expectedPlayoutItem' import { DeviceTriggerMountedAction, PreviewWrappedAdLib } from '../input-gateway/deviceTriggerPreviews' +import type { IngestRundownStatus } from '../ingest/rundownStatus' /** * Ids of possible DDP subscriptions for any PeripheralDevice. @@ -51,6 +52,13 @@ export enum PeripheralDevicePubSub { packageManagerPackageContainers = 'packageManagerPackageContainers', /** Package manager: The expected packages in the Studio of the PeripheralDevice */ packageManagerExpectedPackages = 'packageManagerExpectedPackages', + + // Ingest gateway: + + /** + * Ingest status of rundowns for a PeripheralDevice + */ + ingestDeviceRundownStatus = 'ingestDeviceRundownStatus', } /** @@ -114,6 +122,11 @@ export interface PeripheralDevicePubSubTypes { filterPlayoutDeviceIds: PeripheralDeviceId[] | undefined, token?: string ) => PeripheralDevicePubSubCollectionsNames.packageManagerExpectedPackages + + [PeripheralDevicePubSub.ingestDeviceRundownStatus]: ( + deviceId: PeripheralDeviceId, + token?: string | undefined + ) => PeripheralDevicePubSubCollectionsNames.ingestRundownStatus } export enum PeripheralDevicePubSubCollectionsNames { @@ -134,6 +147,8 @@ export enum PeripheralDevicePubSubCollectionsNames { packageManagerPlayoutContext = 'packageManagerPlayoutContext', packageManagerPackageContainers = 'packageManagerPackageContainers', packageManagerExpectedPackages = 'packageManagerExpectedPackages', + + ingestRundownStatus = 'ingestRundownStatus', } export type PeripheralDevicePubSubCollections = { @@ -154,4 +169,6 @@ export type PeripheralDevicePubSubCollections = { [PeripheralDevicePubSubCollectionsNames.packageManagerPlayoutContext]: PackageManagerPlayoutContext [PeripheralDevicePubSubCollectionsNames.packageManagerPackageContainers]: PackageManagerPackageContainers [PeripheralDevicePubSubCollectionsNames.packageManagerExpectedPackages]: PackageManagerExpectedPackage + + [PeripheralDevicePubSubCollectionsNames.ingestRundownStatus]: IngestRundownStatus } From 2734084a599e2a1da14903779dd8bc4c6d3847c4 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 20 Jan 2025 12:53:04 +0000 Subject: [PATCH 02/11] fix: change part mos item statuses to be an array, to avoid minimongo key errors --- .../ingestStatus/createIngestRundownStatus.ts | 2 +- .../src/documents/part.ts | 3 +- .../job-worker/src/blueprints/context/lib.ts | 3 +- packages/mos-gateway/src/mosStatus/diff.ts | 43 ++++++++++++------- .../shared-lib/src/ingest/rundownStatus.ts | 7 ++- 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts index 75857614fe..853fec26f0 100644 --- a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts +++ b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts @@ -127,7 +127,7 @@ function createIngestPartStatus( externalId: ingestPart.externalId, isReady: isReady ?? null, - itemsReady: itemsReady ?? {}, + itemsReady: itemsReady ?? [], playbackStatus, } diff --git a/packages/blueprints-integration/src/documents/part.ts b/packages/blueprints-integration/src/documents/part.ts index 3981536c44..29bef476b3 100644 --- a/packages/blueprints-integration/src/documents/part.ts +++ b/packages/blueprints-integration/src/documents/part.ts @@ -1,6 +1,7 @@ import { UserEditingDefinition, UserEditingProperties } from '../userEditing' import type { NoteSeverity } from '../lib' import type { ITranslatableMessage } from '../translations' +import type { IngestPartNotifyItemReady } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' /** Timings for the inTransition, when supported and allowed */ export interface IBlueprintPartInTransition { @@ -65,7 +66,7 @@ export interface IBlueprintMutatablePart + ingestNotifyItemsReady?: IngestPartNotifyItemReady[] /** Classes to set on the TimelineGroupObj for this part */ classes?: string[] diff --git a/packages/job-worker/src/blueprints/context/lib.ts b/packages/job-worker/src/blueprints/context/lib.ts index a7edc7b58a..c6130a03d7 100644 --- a/packages/job-worker/src/blueprints/context/lib.ts +++ b/packages/job-worker/src/blueprints/context/lib.ts @@ -64,6 +64,7 @@ import { } from '@sofie-automation/blueprints-integration/dist/userEditing' import type { PlayoutMutatablePart } from '../../playout/model/PlayoutPartInstanceModel' import { BlueprintQuickLookInfo } from '@sofie-automation/blueprints-integration/dist/context/quickLoopInfo' +import { IngestPartNotifyItemReady } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' /** * Convert an object to have all the values of all keys (including optionals) be 'true' @@ -283,7 +284,7 @@ export function convertPartToBlueprints(part: ReadonlyDeep): IBlueprintP holdMode: part.holdMode, shouldNotifyCurrentPlayingPart: part.shouldNotifyCurrentPlayingPart, ingestNotifyPartReady: part.ingestNotifyPartReady, - ingestNotifyItemsReady: clone(part.ingestNotifyItemsReady), + ingestNotifyItemsReady: clone(part.ingestNotifyItemsReady), classes: clone(part.classes), classesForNext: clone(part.classesForNext), displayDurationGroup: part.displayDurationGroup, diff --git a/packages/mos-gateway/src/mosStatus/diff.ts b/packages/mos-gateway/src/mosStatus/diff.ts index 9eaa6904f7..16799044ef 100644 --- a/packages/mos-gateway/src/mosStatus/diff.ts +++ b/packages/mos-gateway/src/mosStatus/diff.ts @@ -1,6 +1,7 @@ import { IMOSObjectStatus } from '@mos-connection/connector' import type { MosDeviceStatusesConfig } from '../generated/devices' import { + IngestPartNotifyItemReady, IngestPartPlaybackStatus, type IngestPartStatus, type IngestRundownStatus, @@ -51,14 +52,12 @@ export function diffStatuses( }) // Clear any items too - for (const [itemId, isReady] of Object.entries(story.itemsReady)) { - if (isReady === undefined) continue - + for (const itemStatus of story.itemsReady) { statuses.push({ type: 'item', rundownExternalId, storyId, - itemId: itemId, + itemId: itemStatus.externalId, mosStatus: MOS_STATUS_UNKNOWN, }) } @@ -88,21 +87,35 @@ export function diffStatuses( }) } - // Diff each item in the story - const previousItemStatuses = previousStatus?.itemsReady ?? {} - const allItemIds = new Set([...Object.keys(status.itemsReady), ...Object.keys(previousItemStatuses)]) + const allItemIds = new Set() + const previousItemStatuses = new Map() + const newItemStatuses = new Map() + for (const itemStatus of previousStatus?.itemsReady ?? []) { + previousItemStatuses.set(itemStatus.externalId, itemStatus) + allItemIds.add(itemStatus.externalId) + } + for (const itemStatus of status.itemsReady) { + newItemStatuses.set(itemStatus.externalId, itemStatus) + allItemIds.add(itemStatus.externalId) + } + + // Diff each item in the story for (const itemId of allItemIds) { - const newReady = status.itemsReady[itemId] - const previousReady = previousItemStatuses[itemId] + const newItemStatus = newItemStatuses.get(itemId) + const previousItemStatus = previousItemStatuses.get(itemId) - const newMosStatus = - newReady !== undefined - ? buildMosStatus(config, status.playbackStatus, newReady, newStatuses?.active) - : null + const newMosStatus = newItemStatus + ? buildMosStatus(config, status.playbackStatus, newItemStatus.ready, newStatuses?.active) + : null const previousMosStatus = - previousReady !== undefined && previousStatus - ? buildMosStatus(config, previousStatus.playbackStatus, previousReady, previousStatuses?.active) + previousItemStatus && previousStatus + ? buildMosStatus( + config, + previousStatus.playbackStatus, + previousItemStatus.ready, + previousStatuses?.active + ) : null if ((newMosStatus !== null || previousMosStatus !== null) && previousMosStatus !== newMosStatus) { diff --git a/packages/shared-lib/src/ingest/rundownStatus.ts b/packages/shared-lib/src/ingest/rundownStatus.ts index 535473e014..166b35765e 100644 --- a/packages/shared-lib/src/ingest/rundownStatus.ts +++ b/packages/shared-lib/src/ingest/rundownStatus.ts @@ -30,7 +30,7 @@ export interface IngestPartStatus { isReady: boolean | null - itemsReady: Record + itemsReady: IngestPartNotifyItemReady[] playbackStatus: IngestPartPlaybackStatus } @@ -40,3 +40,8 @@ export enum IngestPartPlaybackStatus { PLAY = 'play', STOP = 'stop', } + +export interface IngestPartNotifyItemReady { + externalId: string + ready: boolean +} From b5406a0fec1017d60a4ab511103539c1350da96b Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 20 Jan 2025 14:50:50 +0000 Subject: [PATCH 03/11] chore: fix unit tests --- .../src/mosStatus/__tests__/diff.spec.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts b/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts index 624d7eb9c1..912e19e03b 100644 --- a/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts +++ b/packages/mos-gateway/src/mosStatus/__tests__/diff.spec.ts @@ -25,7 +25,7 @@ describe('diffStatuses', () => { { externalId: 'part0', isReady: true, - itemsReady: {}, + itemsReady: [], playbackStatus: IngestPartPlaybackStatus.UNKNOWN, }, ], @@ -134,7 +134,7 @@ describe('diffStatuses', () => { extraPartState.segments[0].parts.push({ externalId: 'part1', isReady: false, - itemsReady: {}, + itemsReady: [], playbackStatus: IngestPartPlaybackStatus.UNKNOWN, }) @@ -155,7 +155,7 @@ describe('diffStatuses', () => { extraPartState.segments[0].parts.push({ externalId: 'part1', isReady: false, - itemsReady: {}, + itemsReady: [], playbackStatus: IngestPartPlaybackStatus.UNKNOWN, }) @@ -235,7 +235,7 @@ describe('diffStatuses', () => { test('add items', () => { { const itemsState = structuredClone(singlePartRundown) - itemsState.segments[0].parts[0].itemsReady.item0 = true + itemsState.segments[0].parts[0].itemsReady.push({ externalId: 'item0', ready: true }) const diff = diffStatuses(defaultConfig, singlePartRundown, itemsState) expect(diff).toHaveLength(1) @@ -250,7 +250,7 @@ describe('diffStatuses', () => { { const itemsState = structuredClone(singlePartRundown) - itemsState.segments[0].parts[0].itemsReady.item0 = false + itemsState.segments[0].parts[0].itemsReady.push({ externalId: 'item0', ready: false }) const diff = diffStatuses(defaultConfig, singlePartRundown, itemsState) expect(diff).toHaveLength(1) @@ -265,7 +265,7 @@ describe('diffStatuses', () => { { const itemsState = structuredClone(singlePartRundown) - itemsState.segments[0].parts[0].itemsReady.item0 = undefined + // itemsState.segments[0].parts[0].itemsReady.item0 = undefined const diff = diffStatuses(defaultConfig, singlePartRundown, itemsState) expect(diff).toHaveLength(0) @@ -275,7 +275,7 @@ describe('diffStatuses', () => { test('remove items', () => { { const itemsState = structuredClone(singlePartRundown) - itemsState.segments[0].parts[0].itemsReady.item0 = true + itemsState.segments[0].parts[0].itemsReady.push({ externalId: 'item0', ready: true }) const diff = diffStatuses(defaultConfig, itemsState, singlePartRundown) expect(diff).toHaveLength(1) @@ -290,7 +290,7 @@ describe('diffStatuses', () => { { const itemsState = structuredClone(singlePartRundown) - itemsState.segments[0].parts[0].itemsReady.item0 = undefined + // itemsState.segments[0].parts[0].itemsReady.item0 = undefined const diff = diffStatuses(defaultConfig, itemsState, singlePartRundown) expect(diff).toHaveLength(0) @@ -299,10 +299,10 @@ describe('diffStatuses', () => { test('change item state', () => { const itemsState = structuredClone(singlePartRundown) - itemsState.segments[0].parts[0].itemsReady.item0 = true + itemsState.segments[0].parts[0].itemsReady.push({ externalId: 'item0', ready: true }) const items2State = structuredClone(itemsState) - items2State.segments[0].parts[0].itemsReady.item0 = false + items2State.segments[0].parts[0].itemsReady[0].ready = false const diff = diffStatuses(defaultConfig, itemsState, items2State) expect(diff).toHaveLength(1) From a2a6556b07c4dc16dd24165a5b1a581b75b6d674 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 3 Feb 2025 11:15:50 +0000 Subject: [PATCH 04/11] chore: fix double cleanup in publication --- meteor/server/publications/lib/rundownsObserver.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/meteor/server/publications/lib/rundownsObserver.ts b/meteor/server/publications/lib/rundownsObserver.ts index 2c5efea890..44bd0fc40b 100644 --- a/meteor/server/publications/lib/rundownsObserver.ts +++ b/meteor/server/publications/lib/rundownsObserver.ts @@ -30,11 +30,15 @@ export class RundownsObserver implements Meteor.LiveQueryHandle { if (this.#disposed) return if (!this.#changed) return this.#cleanup?.() + this.#cleanup = undefined const changed = this.#changed this.#cleanup = await changed(this.rundownIds) - if (this.#disposed) this.#cleanup?.() + if (this.#disposed) { + this.#cleanup?.() + this.#cleanup = undefined + } }, REACTIVITY_DEBOUNCE) private constructor(onChanged: ChangedHandler) { @@ -109,5 +113,6 @@ export class RundownsObserver implements Meteor.LiveQueryHandle { this.#rundownsLiveQuery.stop() this.#changed = undefined this.#cleanup?.() + this.#cleanup = undefined } } From 47efb95f521274b447713dfb7a069ac1958fef94 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 24 Feb 2025 10:42:43 +0000 Subject: [PATCH 05/11] feat: add ingest statuses debug page --- .../publications/ingestStatus/publication.ts | 6 +- .../client/ui/TestTools/DeviceTriggers.tsx | 15 +-- .../ui/TestTools/IngestRundownStatus.tsx | 127 ++++++++++++++++++ .../src/client/ui/TestTools/Mappings.tsx | 7 +- .../src/client/ui/TestTools/Timeline.tsx | 7 +- .../client/ui/TestTools/TimelineDatastore.tsx | 5 +- .../src/client/ui/TestTools/collections.ts | 30 +++++ .../webui/src/client/ui/TestTools/index.tsx | 12 +- 8 files changed, 178 insertions(+), 31 deletions(-) create mode 100644 packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx create mode 100644 packages/webui/src/client/ui/TestTools/collections.ts diff --git a/meteor/server/publications/ingestStatus/publication.ts b/meteor/server/publications/ingestStatus/publication.ts index 59057b19fa..26fa5830fb 100644 --- a/meteor/server/publications/ingestStatus/publication.ts +++ b/meteor/server/publications/ingestStatus/publication.ts @@ -21,6 +21,7 @@ import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/ru import { protectString } from '@sofie-automation/corelib/dist/protectedString' import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' import { createIngestRundownStatus } from './createIngestRundownStatus' +import { Settings } from '../../Settings' interface IngestRundownStatusArgs { readonly deviceId: PeripheralDeviceId @@ -176,7 +177,10 @@ meteorCustomPublish( async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) { check(deviceId, String) - await checkAccessAndGetPeripheralDevice(deviceId, token, this) + if (Settings.enableHeaderAuth) { + // This is used in a testTool, so only check when auth is enabled + await checkAccessAndGetPeripheralDevice(deviceId, token, this) + } await setUpCollectionOptimizedObserver< IngestRundownStatus, diff --git a/packages/webui/src/client/ui/TestTools/DeviceTriggers.tsx b/packages/webui/src/client/ui/TestTools/DeviceTriggers.tsx index 52ee2d4ae1..2e64e21bc3 100644 --- a/packages/webui/src/client/ui/TestTools/DeviceTriggers.tsx +++ b/packages/webui/src/client/ui/TestTools/DeviceTriggers.tsx @@ -1,7 +1,5 @@ import React, { Fragment, useState } from 'react' import { useSubscription, useTracker } from '../../lib/ReactMeteorData/react-meteor-data' -import { Mongo } from 'meteor/mongo' -import {} from '@sofie-automation/meteor-lib/dist/api/pubsub' import { protectString, unprotectString } from '@sofie-automation/corelib/dist/protectedString' import { useTranslation } from 'react-i18next' import { Link, useParams } from 'react-router-dom' @@ -9,21 +7,12 @@ import { PeripheralDeviceId } from '@sofie-automation/corelib/dist/dataModel/Ids import { DeviceTriggerMountedAction, PreviewWrappedAdLib } from '@sofie-automation/meteor-lib/dist/api/MountedTriggers' import { PeripheralDevices } from '../../collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { - PeripheralDevicePubSub, - PeripheralDevicePubSubCollectionsNames, -} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' +import { PeripheralDevicePubSub } from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' +import { MountedTriggers, MountedTriggersPreviews } from './collections' import Row from 'react-bootstrap/Row' import Col from 'react-bootstrap/Col' import Form from 'react-bootstrap/Form' -const MountedTriggers = new Mongo.Collection( - PeripheralDevicePubSubCollectionsNames.mountedTriggers -) -const MountedTriggersPreviews = new Mongo.Collection( - PeripheralDevicePubSubCollectionsNames.mountedTriggersPreviews -) - interface DeviceTriggersViewRouteParams { peripheralDeviceId: string } diff --git a/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx b/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx new file mode 100644 index 0000000000..3f126a2225 --- /dev/null +++ b/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx @@ -0,0 +1,127 @@ +import { useSubscription, useTracker } from '../../lib/ReactMeteorData/react-meteor-data' +import { unprotectString } from '../../lib/tempLib' +import { makeTableOfObject } from '../../lib/utilComponents' +import { PeripheralDeviceId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { PeripheralDevicePubSub } from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' +import { useTranslation } from 'react-i18next' +import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' +import { PeripheralDevices } from '../../collections' +import { Link } from 'react-router-dom' +import { PeripheralDeviceCategory } from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI' +import { IngestRundownStatuses } from './collections' +import { IngestPartStatus, IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' + +interface IMappingsViewProps { + match?: { + params?: { + peripheralDeviceId: PeripheralDeviceId + } + } +} +function IngestRundownStatusView(props: Readonly): JSX.Element { + const { t } = useTranslation() + + return ( +
+
+

{t('Ingest Rundown Status')}

+
+
+ {props.match && props.match.params && ( + + )} +
+
+ ) +} + +interface ComponentMappingsTableProps { + peripheralDeviceId: PeripheralDeviceId +} +function ComponentMappingsTable({ peripheralDeviceId }: Readonly): JSX.Element { + useSubscription(PeripheralDevicePubSub.ingestDeviceRundownStatus, peripheralDeviceId) + + const rundowns = useTracker(() => IngestRundownStatuses.find({}).fetch(), [], []) + + return ( + <> + {rundowns.map((rundown) => ( + + ))} + + ) +} + +function StatusesForRundown({ rundown }: { rundown: IngestRundownStatus }): JSX.Element { + return ( +
+

+ {rundown.externalId} ({unprotectString(rundown._id)}) +

+ +

Status: {rundown.active}

+ + + + + + + + + + + {rundown.segments.flatMap((segment) => + segment.parts.map((part) => ( + + )) + )} + +
Segment IdPart IdReadyStatusItems
+ +

+
+ ) +} + +interface StatusesForSegmentRowProps { + segmentId: string + part: IngestPartStatus +} +function StatusesForSegmentRow({ segmentId, part }: Readonly) { + return ( + + {segmentId} + {part.externalId} + {JSON.stringify(part.isReady)} + {part.playbackStatus} + {makeTableOfObject(part.itemsReady)} + + ) +} + +function IngestRundownStatusSelect(): JSX.Element | null { + const { t } = useTranslation() + + useSubscription(CorelibPubSub.peripheralDevices, null) + const devices = useTracker(() => PeripheralDevices.find({ category: PeripheralDeviceCategory.INGEST }).fetch(), []) + + return ( +
+
+

{t('Ingest Rundown Statuses')}

+
+
+ Peripheral Device +
    + {devices?.map((device) => ( +
  • + {device.name} +
  • + ))} +
+
+
+ ) +} + +export { IngestRundownStatusView, IngestRundownStatusSelect } diff --git a/packages/webui/src/client/ui/TestTools/Mappings.tsx b/packages/webui/src/client/ui/TestTools/Mappings.tsx index 341fc0542e..910e65ad75 100644 --- a/packages/webui/src/client/ui/TestTools/Mappings.tsx +++ b/packages/webui/src/client/ui/TestTools/Mappings.tsx @@ -6,16 +6,11 @@ import { makeTableOfObject } from '../../lib/utilComponents' import { StudioSelect } from './StudioSelect' import { MappingExt } from '@sofie-automation/corelib/dist/dataModel/Studio' import { LookaheadMode, TSR } from '@sofie-automation/blueprints-integration' -import { createSyncPeripheralDeviceCustomPublicationMongoCollection } from '../../collections/lib' import { StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { PeripheralDevicePubSubCollectionsNames } from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' import { useTranslation } from 'react-i18next' import Row from 'react-bootstrap/Row' import Col from 'react-bootstrap/Col' - -const StudioMappings = createSyncPeripheralDeviceCustomPublicationMongoCollection( - PeripheralDevicePubSubCollectionsNames.studioMappings -) +import { StudioMappings } from './collections' interface IMappingsViewProps { match?: { diff --git a/packages/webui/src/client/ui/TestTools/Timeline.tsx b/packages/webui/src/client/ui/TestTools/Timeline.tsx index 9a6349e486..857da8b578 100644 --- a/packages/webui/src/client/ui/TestTools/Timeline.tsx +++ b/packages/webui/src/client/ui/TestTools/Timeline.tsx @@ -20,17 +20,12 @@ import { useTranslation } from 'react-i18next' import { useParams } from 'react-router-dom' import { useCallback, useEffect, useMemo, useState } from 'react' import Classnames from 'classnames' -import { createSyncPeripheralDeviceCustomPublicationMongoCollection } from '../../collections/lib' import { StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { PeripheralDevicePubSubCollectionsNames } from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' import Row from 'react-bootstrap/Row' import Col from 'react-bootstrap/Col' import Button from 'react-bootstrap/Button' import Form from 'react-bootstrap/Form' - -export const StudioTimeline = createSyncPeripheralDeviceCustomPublicationMongoCollection( - PeripheralDevicePubSubCollectionsNames.studioTimeline -) +import { StudioTimeline } from './collections' interface TimelineViewRouteParams { studioId: string | undefined diff --git a/packages/webui/src/client/ui/TestTools/TimelineDatastore.tsx b/packages/webui/src/client/ui/TestTools/TimelineDatastore.tsx index b75f060849..a46b4a2509 100644 --- a/packages/webui/src/client/ui/TestTools/TimelineDatastore.tsx +++ b/packages/webui/src/client/ui/TestTools/TimelineDatastore.tsx @@ -1,7 +1,5 @@ import { useSubscription, useTracker } from '../../lib/ReactMeteorData/react-meteor-data' import { StudioSelect } from './StudioSelect' -import { Mongo } from 'meteor/mongo' -import { DBTimelineDatastoreEntry } from '@sofie-automation/corelib/dist/dataModel/TimelineDatastore' import { protectString, unprotectString } from '@sofie-automation/corelib/dist/protectedString' import { useTranslation } from 'react-i18next' import { useParams } from 'react-router-dom' @@ -9,8 +7,7 @@ import { StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' import Row from 'react-bootstrap/Row' import Col from 'react-bootstrap/Col' - -const TimelineDatastore = new Mongo.Collection('timelineDatastore') +import { TimelineDatastore } from './collections' interface TimelineDatastoreViewRouteParams { studioId: string diff --git a/packages/webui/src/client/ui/TestTools/collections.ts b/packages/webui/src/client/ui/TestTools/collections.ts new file mode 100644 index 0000000000..553d0a333f --- /dev/null +++ b/packages/webui/src/client/ui/TestTools/collections.ts @@ -0,0 +1,30 @@ +import { PeripheralDevicePubSubCollectionsNames } from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' +import { createSyncPeripheralDeviceCustomPublicationMongoCollection } from '../../collections/lib' + +/** + * These collections are not public and are for the use of the TestTools only. + * They are defined in this file, as hot reloading them is not supported + */ + +export const IngestRundownStatuses = createSyncPeripheralDeviceCustomPublicationMongoCollection( + PeripheralDevicePubSubCollectionsNames.ingestRundownStatus +) + +export const MountedTriggers = createSyncPeripheralDeviceCustomPublicationMongoCollection( + PeripheralDevicePubSubCollectionsNames.mountedTriggers +) +export const MountedTriggersPreviews = createSyncPeripheralDeviceCustomPublicationMongoCollection( + PeripheralDevicePubSubCollectionsNames.mountedTriggersPreviews +) + +export const StudioMappings = createSyncPeripheralDeviceCustomPublicationMongoCollection( + PeripheralDevicePubSubCollectionsNames.studioMappings +) + +export const StudioTimeline = createSyncPeripheralDeviceCustomPublicationMongoCollection( + PeripheralDevicePubSubCollectionsNames.studioTimeline +) + +export const TimelineDatastore = createSyncPeripheralDeviceCustomPublicationMongoCollection( + PeripheralDevicePubSubCollectionsNames.timelineDatastore +) diff --git a/packages/webui/src/client/ui/TestTools/index.tsx b/packages/webui/src/client/ui/TestTools/index.tsx index 4c87c50de4..7640c12429 100644 --- a/packages/webui/src/client/ui/TestTools/index.tsx +++ b/packages/webui/src/client/ui/TestTools/index.tsx @@ -9,6 +9,7 @@ import { DeviceTriggersDeviceSelect, DeviceTriggersView } from './DeviceTriggers import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' import Row from 'react-bootstrap/Row' import Col from 'react-bootstrap/Col' +import { IngestRundownStatusSelect, IngestRundownStatusView } from './IngestRundownStatus' function StatusMenu() { const { t } = useTranslation() @@ -43,6 +44,13 @@ function StatusMenu() { >

{t('Device Triggers')}

+ +

{t('Ingest Rundown Statuses')}

+
) } @@ -68,7 +76,9 @@ export default function Status(): JSX.Element { - {' '} + + + From cd9583f389c9e5f537662443d5cb79b29e121b6a Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 24 Feb 2025 15:27:22 +0000 Subject: [PATCH 06/11] feat: allow translating part externalId for mos statuses --- .../ingestStatus/createIngestRundownStatus.ts | 127 +++++++++++++----- .../ingestStatus/reactiveContentCache.ts | 2 + .../src/documents/part.ts | 3 + .../job-worker/src/blueprints/context/lib.ts | 2 + 4 files changed, 100 insertions(+), 34 deletions(-) diff --git a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts index 853fec26f0..c321ce75ee 100644 --- a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts +++ b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts @@ -1,11 +1,11 @@ -import type { RundownId, PartId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import type { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache' -import type { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' import { IngestRundownStatus, IngestPartPlaybackStatus, IngestRundownActiveStatus, IngestPartStatus, + IngestPartNotifyItemReady, } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' import type { ReadonlyDeep } from 'type-fest' import _ from 'underscore' @@ -15,6 +15,7 @@ import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection' import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances' import { IngestPart } from '@sofie-automation/blueprints-integration' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' +import { unprotectString } from '@sofie-automation/corelib/dist/protectedString' export function createIngestRundownStatus( cache: ReadonlyDeep, @@ -41,9 +42,6 @@ export function createIngestRundownStatus( newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE } - // Find the most important part instance for each part - const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances) - const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch() for (const nrcsSegment of nrcsSegments) { const nrcsParts = cache.NrcsIngestData.find({ @@ -58,10 +56,25 @@ export function createIngestRundownStatus( nrcsParts.map((nrcsPart) => { if (!nrcsPart.partId || !nrcsPart.segmentId) return null - const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId }) - const partInstance = partInstanceMap.get(nrcsPart.partId) - - return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart) + const parts = cache.Parts.find({ + rundownId: rundownId, + $or: [ + { + externalId: nrcsPart.data.externalId, + }, + { + ingestNotifyPartExternalId: nrcsPart.data.externalId, + }, + ], + }).fetch() + const partInstances = findPartInstancesForIngestPart( + playlist, + rundownId, + cache.PartInstances, + nrcsPart.data.externalId + ) + + return createIngestPartStatus(playlist, partInstances, parts, nrcsPart.data as IngestPart) }) ), }) @@ -70,64 +83,110 @@ export function createIngestRundownStatus( return newDoc } -function findPartInstanceForEachPart( +function findPartInstancesForIngestPart( playlist: Pick | undefined, rundownId: RundownId, - partInstancesCache: ReadonlyDeep>> + partInstancesCache: ReadonlyDeep>>, + partExternalId: string ) { - const partInstanceMap = new Map>() - if (!playlist) return partInstanceMap + const result: Record> = {} + if (!playlist) return result + + const candidatePartInstances = partInstancesCache + .find({ + rundownId: rundownId, + $or: [ + { + 'part.externalId': partExternalId, + }, + { + 'part.ingestNotifyPartExternalId': partExternalId, + }, + ], + }) + .fetch() - for (const partInstance of partInstancesCache.find({}).fetch()) { + for (const partInstance of candidatePartInstances) { if (partInstance.rundownId !== rundownId) continue // Ignore the next partinstance if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue + const partId = unprotectString(partInstance.part._id) + // The current part instance is the most important if (partInstance._id === playlist.currentPartInfo?.partInstanceId) { - partInstanceMap.set(partInstance.part._id, partInstance) + result[partId] = partInstance continue } // Take the part with the highest takeCount - const existingEntry = partInstanceMap.get(partInstance.part._id) + const existingEntry = result[partId] if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) { - partInstanceMap.set(partInstance.part._id, partInstance) + result[partId] = partInstance } } - return partInstanceMap + return result } function createIngestPartStatus( playlist: Pick | undefined, - partInstance: Pick | undefined, - part: Pick | undefined, + partInstances: Record>, + parts: Pick[], ingestPart: IngestPart ): IngestPartStatus { // Determine the playback status from the PartInstance let playbackStatus = IngestPartPlaybackStatus.UNKNOWN - if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) { - const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id - - if (isCurrentPartInstance) { - // If the current, it is playing - playbackStatus = IngestPartPlaybackStatus.PLAY - } else { - // If not the current, but has been played, it is stopped - playbackStatus = IngestPartPlaybackStatus.STOP + + let isReady: boolean | null = null // Start off as null, the first value will make this true or false + + const itemsReady: IngestPartNotifyItemReady[] = [] + + const updateStatusWithPart = (part: Pick) => { + // If the part affects the ready status, update it + if (typeof part.ingestNotifyPartReady === 'boolean') { + isReady = (isReady ?? true) && part.ingestNotifyPartReady + } + + // Include the items + if (part.ingestNotifyItemsReady) { + itemsReady.push(...part.ingestNotifyItemsReady) } } - // Determine the ready status from the PartInstance or Part - const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady - const itemsReady = partInstance ? partInstance.part.ingestNotifyItemsReady : part?.ingestNotifyItemsReady + // Loop through the partInstances, starting off the state + if (playlist) { + for (const partInstance of Object.values>(partInstances)) { + if (!partInstance) continue + + if (partInstance.part.shouldNotifyCurrentPlayingPart) { + const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id + + if (isCurrentPartInstance) { + // If the current, it is playing + playbackStatus = IngestPartPlaybackStatus.PLAY + } else if (playbackStatus === IngestPartPlaybackStatus.UNKNOWN) { + // If not the current, but has been played, it is stopped + playbackStatus = IngestPartPlaybackStatus.STOP + } + } + + updateStatusWithPart(partInstance.part) + } + } + + for (const part of parts) { + // Check if the part has already been handled by a partInstance + if (partInstances[unprotectString(part._id)]) continue + + updateStatusWithPart(part) + } return { externalId: ingestPart.externalId, - isReady: isReady ?? null, - itemsReady: itemsReady ?? [], + isReady: isReady, + itemsReady: itemsReady, playbackStatus, } diff --git a/meteor/server/publications/ingestStatus/reactiveContentCache.ts b/meteor/server/publications/ingestStatus/reactiveContentCache.ts index 9e2a026cea..b1515926eb 100644 --- a/meteor/server/publications/ingestStatus/reactiveContentCache.ts +++ b/meteor/server/publications/ingestStatus/reactiveContentCache.ts @@ -32,6 +32,7 @@ export type PartFields = | 'shouldNotifyCurrentPlayingPart' | 'ingestNotifyPartReady' | 'ingestNotifyItemsReady' + | 'ingestNotifyPartExternalId' export const partFieldSpecifier = literal>>({ _id: 1, rundownId: 1, @@ -40,6 +41,7 @@ export const partFieldSpecifier = literal): IBlueprintP expectedDuration: part.expectedDuration, holdMode: part.holdMode, shouldNotifyCurrentPlayingPart: part.shouldNotifyCurrentPlayingPart, + ingestNotifyPartExternalId: part.ingestNotifyPartExternalId, ingestNotifyPartReady: part.ingestNotifyPartReady, ingestNotifyItemsReady: clone(part.ingestNotifyItemsReady), classes: clone(part.classes), From 7d9ec305b0749fa1185dd4dfa88527622ee99e80 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 24 Feb 2025 15:36:03 +0000 Subject: [PATCH 07/11] fix: ignore `externalId` when `ingestNotifyPartExternalId` is set --- .../publications/ingestStatus/createIngestRundownStatus.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts index c321ce75ee..e5cac7bbf4 100644 --- a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts +++ b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts @@ -61,6 +61,7 @@ export function createIngestRundownStatus( $or: [ { externalId: nrcsPart.data.externalId, + ingestNotifyPartExternalId: { $exists: false }, }, { ingestNotifyPartExternalId: nrcsPart.data.externalId, @@ -98,6 +99,7 @@ function findPartInstancesForIngestPart( $or: [ { 'part.externalId': partExternalId, + 'part.ingestNotifyPartExternalId': { $exists: false }, }, { 'part.ingestNotifyPartExternalId': partExternalId, From 194340f7a777c329283a91b9effaed60db711cfa Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 10 Mar 2025 15:34:40 +0000 Subject: [PATCH 08/11] fix: handle permissions for ingest rundown status test tool with header auth properly --- .../publications/ingestStatus/publication.ts | 54 ++++++++++++------- packages/meteor-lib/src/api/pubsub.ts | 11 ++++ packages/webui/package.json | 2 +- .../ui/TestTools/IngestRundownStatus.tsx | 4 +- packages/webui/vite.config.mts | 1 + 5 files changed, 51 insertions(+), 21 deletions(-) diff --git a/meteor/server/publications/ingestStatus/publication.ts b/meteor/server/publications/ingestStatus/publication.ts index 26fa5830fb..a28dd3844f 100644 --- a/meteor/server/publications/ingestStatus/publication.ts +++ b/meteor/server/publications/ingestStatus/publication.ts @@ -1,6 +1,7 @@ import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { ReadonlyDeep } from 'type-fest' import { + CustomPublish, CustomPublishCollection, meteorCustomPublish, setUpCollectionOptimizedObserver, @@ -21,7 +22,8 @@ import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/ru import { protectString } from '@sofie-automation/corelib/dist/protectedString' import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' import { createIngestRundownStatus } from './createIngestRundownStatus' -import { Settings } from '../../Settings' +import { assertConnectionHasOneOfPermissions } from '../../security/auth' +import { MeteorPubSub } from '@sofie-automation/meteor-lib/dist/api/pubsub' interface IngestRundownStatusArgs { readonly deviceId: PeripheralDeviceId @@ -171,29 +173,45 @@ async function manipulateIngestRundownStatusPublicationData( } } +async function startOrJoinIngestStatusPublication( + pub: CustomPublish, + deviceId: PeripheralDeviceId +) { + await setUpCollectionOptimizedObserver< + IngestRundownStatus, + IngestRundownStatusArgs, + IngestRundownStatusState, + IngestRundownStatusUpdateProps + >( + `pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`, + { deviceId }, + setupIngestRundownStatusPublicationObservers, + manipulateIngestRundownStatusPublicationData, + pub, + 100 + ) +} + meteorCustomPublish( PeripheralDevicePubSub.ingestDeviceRundownStatus, PeripheralDevicePubSubCollectionsNames.ingestRundownStatus, async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) { check(deviceId, String) - if (Settings.enableHeaderAuth) { - // This is used in a testTool, so only check when auth is enabled - await checkAccessAndGetPeripheralDevice(deviceId, token, this) - } + await checkAccessAndGetPeripheralDevice(deviceId, token, this) + + await startOrJoinIngestStatusPublication(pub, deviceId) + } +) + +meteorCustomPublish( + MeteorPubSub.ingestDeviceRundownStatusTestTool, + PeripheralDevicePubSubCollectionsNames.ingestRundownStatus, + async function (pub, deviceId: PeripheralDeviceId) { + check(deviceId, String) + + assertConnectionHasOneOfPermissions(this.connection, 'testing') - await setUpCollectionOptimizedObserver< - IngestRundownStatus, - IngestRundownStatusArgs, - IngestRundownStatusState, - IngestRundownStatusUpdateProps - >( - `pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`, - { deviceId }, - setupIngestRundownStatusPublicationObservers, - manipulateIngestRundownStatusPublicationData, - pub, - 100 - ) + await startOrJoinIngestStatusPublication(pub, deviceId) } ) diff --git a/packages/meteor-lib/src/api/pubsub.ts b/packages/meteor-lib/src/api/pubsub.ts index 69ec8a1d3e..2088b54efc 100644 --- a/packages/meteor-lib/src/api/pubsub.ts +++ b/packages/meteor-lib/src/api/pubsub.ts @@ -2,6 +2,7 @@ import { BucketId, OrganizationId, PartId, + PeripheralDeviceId, RundownId, RundownPlaylistActivationId, RundownPlaylistId, @@ -118,6 +119,11 @@ export enum MeteorPubSub { */ timelineForStudio = 'timelineForStudio', + /** + * Ingest status of rundowns for a PeripheralDevice + */ + ingestDeviceRundownStatusTestTool = 'ingestDeviceRundownStatusTestTool', + /** * Fetch the simplified playout UI view of the specified ShowStyleBase */ @@ -218,6 +224,11 @@ export interface MeteorPubSubTypes { studioId: StudioId, token?: string ) => PeripheralDevicePubSubCollectionsNames.studioTimeline + + [MeteorPubSub.ingestDeviceRundownStatusTestTool]: ( + peripheralDeviceId: PeripheralDeviceId + ) => PeripheralDevicePubSubCollectionsNames.ingestRundownStatus + [MeteorPubSub.uiShowStyleBase]: (showStyleBaseId: ShowStyleBaseId) => CustomCollectionName.UIShowStyleBase /** Subscribe to one or all studios */ [MeteorPubSub.uiStudio]: (studioId: StudioId | null) => CustomCollectionName.UIStudio diff --git a/packages/webui/package.json b/packages/webui/package.json index 50bbb748ba..50af51cba2 100644 --- a/packages/webui/package.json +++ b/packages/webui/package.json @@ -14,7 +14,7 @@ }, "homepage": "https://github.com/nrkno/sofie-core/blob/master/packages/webui#readme", "scripts": { - "dev": "vite --port=3005", + "dev": "vite --port=3005 --force", "build": "tsc -b && vite build", "build:main": "tsc -p tsconfig.app.json --noEmit", "check-types": "tsc -p tsconfig.app.json --noEmit", diff --git a/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx b/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx index 3f126a2225..ba2ae15511 100644 --- a/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx +++ b/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx @@ -2,7 +2,6 @@ import { useSubscription, useTracker } from '../../lib/ReactMeteorData/react-met import { unprotectString } from '../../lib/tempLib' import { makeTableOfObject } from '../../lib/utilComponents' import { PeripheralDeviceId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { PeripheralDevicePubSub } from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice' import { useTranslation } from 'react-i18next' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' import { PeripheralDevices } from '../../collections' @@ -10,6 +9,7 @@ import { Link } from 'react-router-dom' import { PeripheralDeviceCategory } from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI' import { IngestRundownStatuses } from './collections' import { IngestPartStatus, IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' +import { MeteorPubSub } from '@sofie-automation/meteor-lib/dist/api/pubsub' interface IMappingsViewProps { match?: { @@ -39,7 +39,7 @@ interface ComponentMappingsTableProps { peripheralDeviceId: PeripheralDeviceId } function ComponentMappingsTable({ peripheralDeviceId }: Readonly): JSX.Element { - useSubscription(PeripheralDevicePubSub.ingestDeviceRundownStatus, peripheralDeviceId) + useSubscription(MeteorPubSub.ingestDeviceRundownStatusTestTool, peripheralDeviceId) const rundowns = useTracker(() => IngestRundownStatuses.find({}).fetch(), [], []) diff --git a/packages/webui/vite.config.mts b/packages/webui/vite.config.mts index 13f53d5ea0..b0d151994f 100644 --- a/packages/webui/vite.config.mts +++ b/packages/webui/vite.config.mts @@ -69,6 +69,7 @@ export default defineConfig({ ws: true, }, }, + allowedHosts: true, }, // TODO: old meteor recompile instructions? From 2238ae39866b487742d1a36c3794447899866b3d Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Wed, 26 Mar 2025 11:23:24 +0000 Subject: [PATCH 09/11] fix: update new page --- .../ui/TestTools/IngestRundownStatus.tsx | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx b/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx index ba2ae15511..dfdfea8c78 100644 --- a/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx +++ b/packages/webui/src/client/ui/TestTools/IngestRundownStatus.tsx @@ -10,6 +10,8 @@ import { PeripheralDeviceCategory } from '@sofie-automation/shared-lib/dist/peri import { IngestRundownStatuses } from './collections' import { IngestPartStatus, IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' import { MeteorPubSub } from '@sofie-automation/meteor-lib/dist/api/pubsub' +import Row from 'react-bootstrap/Row' +import Col from 'react-bootstrap/Col' interface IMappingsViewProps { match?: { @@ -22,11 +24,11 @@ function IngestRundownStatusView(props: Readonly): JSX.Eleme const { t } = useTranslation() return ( -
-
+
+

{t('Ingest Rundown Status')}

-
+
{props.match && props.match.params && ( )} @@ -54,32 +56,32 @@ function ComponentMappingsTable({ peripheralDeviceId }: Readonly -

- {rundown.externalId} ({unprotectString(rundown._id)}) -

+ + +

+ {rundown.externalId} ({unprotectString(rundown._id)}) +

-

Status: {rundown.active}

+

Status: {rundown.active}

- - - - - - - - - - {rundown.segments.flatMap((segment) => - segment.parts.map((part) => ( - - )) - )} - -
Segment IdPart IdReadyStatusItems
- -

-
+ + + + + + + + + + {rundown.segments.flatMap((segment) => + segment.parts.map((part) => ( + + )) + )} + +
Segment IdPart IdReadyStatusItems
+ + ) } @@ -106,11 +108,11 @@ function IngestRundownStatusSelect(): JSX.Element | null { const devices = useTracker(() => PeripheralDevices.find({ category: PeripheralDeviceCategory.INGEST }).fetch(), []) return ( -
-
+
+

{t('Ingest Rundown Statuses')}

-
+
Peripheral Device
    {devices?.map((device) => ( From a3e97c0cc8b40f1dfc977c085b96ec7b9a8544ec Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Wed, 26 Mar 2025 11:27:20 +0000 Subject: [PATCH 10/11] chore: fix build --- packages/mos-gateway/src/mosStatus/diff.ts | 2 +- packages/mos-gateway/src/mosStatus/handler.ts | 2 +- packages/webui/vite.config.mts | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/mos-gateway/src/mosStatus/diff.ts b/packages/mos-gateway/src/mosStatus/diff.ts index 16799044ef..37e06b4655 100644 --- a/packages/mos-gateway/src/mosStatus/diff.ts +++ b/packages/mos-gateway/src/mosStatus/diff.ts @@ -1,5 +1,5 @@ import { IMOSObjectStatus } from '@mos-connection/connector' -import type { MosDeviceStatusesConfig } from '../generated/devices' +import type { MosDeviceStatusesConfig } from '@sofie-automation/shared-lib/dist/generated/MosGatewayDevicesTypes' import { IngestPartNotifyItemReady, IngestPartPlaybackStatus, diff --git a/packages/mos-gateway/src/mosStatus/handler.ts b/packages/mos-gateway/src/mosStatus/handler.ts index 689c74c231..a4b57f49f6 100644 --- a/packages/mos-gateway/src/mosStatus/handler.ts +++ b/packages/mos-gateway/src/mosStatus/handler.ts @@ -6,7 +6,7 @@ import { type MosTypes, type IMOSDevice, } from '@mos-connection/connector' -import type { MosDeviceStatusesConfig } from '../generated/devices' +import type { MosDeviceStatusesConfig } from '@sofie-automation/shared-lib/dist/generated/MosGatewayDevicesTypes' import type { CoreMosDeviceHandler } from '../CoreMosDeviceHandler' import { assertNever, diff --git a/packages/webui/vite.config.mts b/packages/webui/vite.config.mts index b0d151994f..13f53d5ea0 100644 --- a/packages/webui/vite.config.mts +++ b/packages/webui/vite.config.mts @@ -69,7 +69,6 @@ export default defineConfig({ ws: true, }, }, - allowedHosts: true, }, // TODO: old meteor recompile instructions? From a5c9e145ecb0ba114a2d8587fb337b1f9c5537da Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Wed, 9 Apr 2025 14:03:04 +0100 Subject: [PATCH 11/11] fix: limit scope of loading of NrcsIngestData for ingestDeviceRundownStatus publication --- .../ingestStatus/createIngestRundownStatus.ts | 28 ++++----- .../ingestStatus/reactiveContentCache.ts | 59 ++++++++++++------- .../ingestStatus/rundownContentObserver.ts | 5 +- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts index e5cac7bbf4..b90c14b12f 100644 --- a/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts +++ b/meteor/server/publications/ingestStatus/createIngestRundownStatus.ts @@ -9,12 +9,8 @@ import { } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus' import type { ReadonlyDeep } from 'type-fest' import _ from 'underscore' -import type { ContentCache, PartFields, PartInstanceFields, PlaylistFields } from './reactiveContentCache' -import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' +import type { ContentCache, PartCompact, PartInstanceCompact, PlaylistCompact } from './reactiveContentCache' import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection' -import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances' -import { IngestPart } from '@sofie-automation/blueprints-integration' -import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' import { unprotectString } from '@sofie-automation/corelib/dist/protectedString' export function createIngestRundownStatus( @@ -75,7 +71,7 @@ export function createIngestRundownStatus( nrcsPart.data.externalId ) - return createIngestPartStatus(playlist, partInstances, parts, nrcsPart.data as IngestPart) + return createIngestPartStatus(playlist, partInstances, parts, nrcsPart.data.externalId) }) ), }) @@ -85,12 +81,12 @@ export function createIngestRundownStatus( } function findPartInstancesForIngestPart( - playlist: Pick | undefined, + playlist: PlaylistCompact | undefined, rundownId: RundownId, - partInstancesCache: ReadonlyDeep>>, + partInstancesCache: ReadonlyDeep>, partExternalId: string ) { - const result: Record> = {} + const result: Record = {} if (!playlist) return result const candidatePartInstances = partInstancesCache @@ -132,10 +128,10 @@ function findPartInstancesForIngestPart( } function createIngestPartStatus( - playlist: Pick | undefined, - partInstances: Record>, - parts: Pick[], - ingestPart: IngestPart + playlist: PlaylistCompact | undefined, + partInstances: Record, + parts: PartCompact[], + ingestPartExternalId: string ): IngestPartStatus { // Determine the playback status from the PartInstance let playbackStatus = IngestPartPlaybackStatus.UNKNOWN @@ -144,7 +140,7 @@ function createIngestPartStatus( const itemsReady: IngestPartNotifyItemReady[] = [] - const updateStatusWithPart = (part: Pick) => { + const updateStatusWithPart = (part: PartCompact) => { // If the part affects the ready status, update it if (typeof part.ingestNotifyPartReady === 'boolean') { isReady = (isReady ?? true) && part.ingestNotifyPartReady @@ -158,7 +154,7 @@ function createIngestPartStatus( // Loop through the partInstances, starting off the state if (playlist) { - for (const partInstance of Object.values>(partInstances)) { + for (const partInstance of Object.values(partInstances)) { if (!partInstance) continue if (partInstance.part.shouldNotifyCurrentPlayingPart) { @@ -185,7 +181,7 @@ function createIngestPartStatus( } return { - externalId: ingestPart.externalId, + externalId: ingestPartExternalId, isReady: isReady, itemsReady: itemsReady, diff --git a/meteor/server/publications/ingestStatus/reactiveContentCache.ts b/meteor/server/publications/ingestStatus/reactiveContentCache.ts index b1515926eb..a755ee4f02 100644 --- a/meteor/server/publications/ingestStatus/reactiveContentCache.ts +++ b/meteor/server/publications/ingestStatus/reactiveContentCache.ts @@ -8,8 +8,11 @@ import type { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' import type { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' import type { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' -export type PlaylistFields = '_id' | 'activationId' | 'rehearsal' | 'currentPartInfo' | 'nextPartInfo' -export const playlistFieldSpecifier = literal>>({ +export type PlaylistCompact = Pick< + DBRundownPlaylist, + '_id' | 'activationId' | 'rehearsal' | 'currentPartInfo' | 'nextPartInfo' +> +export const playlistFieldSpecifier = literal>({ _id: 1, activationId: 1, rehearsal: 1, @@ -17,14 +20,15 @@ export const playlistFieldSpecifier = literal>>({ +export type RundownCompact = Pick +export const rundownFieldSpecifier = literal>({ _id: 1, externalId: 1, playlistId: 1, }) -export type PartFields = +export type PartCompact = Pick< + DBPart, | '_id' | 'rundownId' | 'segmentId' @@ -33,7 +37,8 @@ export type PartFields = | 'ingestNotifyPartReady' | 'ingestNotifyItemsReady' | 'ingestNotifyPartExternalId' -export const partFieldSpecifier = literal>>({ +> +export const partFieldSpecifier = literal>({ _id: 1, rundownId: 1, segmentId: 1, @@ -44,10 +49,8 @@ export const partFieldSpecifier = literal> ->({ +export type PartInstanceCompact = Pick +export const partInstanceFieldSpecifier = literal>({ _id: 1, rundownId: 1, segmentId: 1, @@ -55,25 +58,41 @@ export const partInstanceFieldSpecifier = literal< takeCount: 1, }) +export type NrcsIngestDataCacheObjCompact = Pick< + NrcsIngestDataCacheObj, + '_id' | 'type' | 'rundownId' | 'segmentId' | 'partId' +> & { data: { externalId: string } } +export const nrcsIngestDataCacheObjSpecifier = literal>({ + _id: 1, + type: 1, + rundownId: 1, + segmentId: 1, + partId: 1, + data: { + // We need to be very selective here, as the payload portion could contain data not safe for minimongo + externalId: 1, + }, +}) + export interface ContentCache { RundownIds: RundownId[] - Playlists: ReactiveCacheCollection> - Rundowns: ReactiveCacheCollection> - NrcsIngestData: ReactiveCacheCollection - Parts: ReactiveCacheCollection> - PartInstances: ReactiveCacheCollection> + Playlists: ReactiveCacheCollection + Rundowns: ReactiveCacheCollection + NrcsIngestData: ReactiveCacheCollection + Parts: ReactiveCacheCollection + PartInstances: ReactiveCacheCollection } export function createReactiveContentCache(rundownIds: RundownId[]): ContentCache { const cache: ContentCache = { RundownIds: rundownIds, - Playlists: new ReactiveCacheCollection>('playlists'), - Rundowns: new ReactiveCacheCollection>('rundowns'), - NrcsIngestData: new ReactiveCacheCollection('nrcsIngestData'), // TODO - is this needed? - Parts: new ReactiveCacheCollection>('parts'), - PartInstances: new ReactiveCacheCollection>('partInstances'), + Playlists: new ReactiveCacheCollection('playlists'), + Rundowns: new ReactiveCacheCollection('rundowns'), + NrcsIngestData: new ReactiveCacheCollection('nrcsIngestData'), + Parts: new ReactiveCacheCollection('parts'), + PartInstances: new ReactiveCacheCollection('partInstances'), } return cache diff --git a/meteor/server/publications/ingestStatus/rundownContentObserver.ts b/meteor/server/publications/ingestStatus/rundownContentObserver.ts index 2aa048be36..bc95137c8f 100644 --- a/meteor/server/publications/ingestStatus/rundownContentObserver.ts +++ b/meteor/server/publications/ingestStatus/rundownContentObserver.ts @@ -3,6 +3,7 @@ import { RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dat import { logger } from '../../logging' import { ContentCache, + nrcsIngestDataCacheObjSpecifier, partFieldSpecifier, partInstanceFieldSpecifier, playlistFieldSpecifier, @@ -111,7 +112,9 @@ export class RundownContentObserver { }, }, cache.NrcsIngestData.link(), - {}, + { + projection: nrcsIngestDataCacheObjSpecifier, + }, { nonMutatingCallbacks: true, }