8000 feat: mos status flow rework by Julusian · Pull Request #1356 · nrkno/sofie-core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: mos status flow rework #1356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: release53
Choose a base branch
from
9 changes: 6 additions & 3 deletions .github/workflows/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,19 @@ jobs:
- node-version: 22.x
package-name: job-worker
send-coverage: true
# No tests for the gateways yet
# No tests for some gateways yet
# - node-version: 22.x
# package-name: playout-gateway
# - node-version: 22.x
# package-name: mos-gateway
# send-coverage: true
- node-version: 22.x
package-name: mos-gateway
send-coverage: true
- node-version: 22.x
package-name: live- 5D39 status-gateway
send-coverage: true
- node-version: 22.x
package-name: webui
send-coverage: true
# manual meteor-lib as it only needs a couple of versions
- node-version: 22.x
package-name: meteor-lib
Expand Down
3 changes: 2 additions & 1 deletion meteor/server/collections/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ export interface AsyncOnlyReadOnlyMongoCollection<DBInterface extends { _id: Pro
observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
async observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle> {
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
if (span) {
Expand All @@ -152,8 +153,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
}
try {
const res = await this._collection
.find((selector ?? {}) as any, options as any)
.observeChangesAsync(callbacks)
.find((selector ?? {}) as any, findOptions as any)
.observeChangesAsync(callbacks, callbackOptions)
if (span) span.end()
return res
} catch (e) {
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'

Check warning on line 6 in meteor/server/publications/_publications.ts

View check run for this annotation

Codecov / codecov/patch

meteor/server/publications/_publications.ts#L6

Added line #L6 was not covered by tests
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
191 changes: 191 additions & 0 deletions meteor/server/publications/ingestStatus/createIngestRundownStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import type { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
import {
IngestRundownStatus,
IngestPartPlaybackStatus,
IngestRundownActiveStatus,
IngestPartStatus,
IngestPartNotifyItemReady,
} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import type { ReadonlyDeep } from 'type-fest'
import _ from 'underscore'
import type { ContentCache, PartCompact, PartInstanceCompact, PlaylistCompact } from './reactiveContentCache'
import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection'
import { unprotectString } from '@sofie-automation/corelib/dist/protectedString'

export function createIngestRundownStatus(
cache: ReadonlyDeep<ContentCache>,
rundownId: RundownId
): IngestRundownStatus | null {
const rundown = cache.Rundowns.findOne(rundownId)
if (!rundown) return null

const newDoc: IngestRundownStatus = {
_id: rundownId,
externalId: rundown.externalId,

active: IngestRundownActiveStatus.INACTIVE,

segments: [],
}

const playlist = cache.Playlists.findOne({
_id: rundown.playlistId,
activationId: { $exists: true },
})

if (playlist) {
newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE
}

const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
for (const nrcsSegment of nrcsSegments) {
const nrcsParts = cache.NrcsIngestData.find({
rundownId,
segmentId: nrcsSegment.segmentId,
type: NrcsIngestCacheType.PART,
}).fetch()

newDoc.segments.push({
externalId: nrcsSegment.data.externalId,
parts: _.compact(
nrcsParts.map((nrcsPart) => {
if (!nrcsPart.partId || !nrcsPart.segmentId) return null

const parts = cache.Parts.find({
rundownId: rundownId,
$or: [
{
externalId: nrcsPart.data.externalId,
ingestNotifyPartExternalId: { $exists: false },
},
{
ingestNotifyPartExternalId: nrcsPart.data.externalId,
},
],
}).fetch()
const partInstances = findPartInstancesForIngestPart(
playlist,
rundownId,
cache.PartInstances,
nrcsPart.data.externalId
)

return createIngestPartStatus(playlist, partInstances, parts, nrcsPart.data.externalId)
})
),
})
}

return newDoc
}

function findPartInstancesForIngestPart(
playlist: PlaylistCompact | undefined,
rundownId: RundownId,
partInstancesCache: ReadonlyDeep<ReactiveCacheCollection<PartInstanceCompact>>,
partExternalId: string
) {
const result: Record<string, PartInstanceCompact> = {}
if (!playlist) return result

const candidatePartInstances = partInstancesCache
.find({
rundownId: rundownId,
$or: [
{
'part.externalId': partExternalId,
'part.ingestNotifyPartExternalId': { $exists: false },
},
{
'part.ingestNotifyPartExternalId': partExternalId,
},
],
})
.fetch()

for (const partInstance of candidatePartInstances) {
if (partInstance.rundownId !== rundownId) continue
// Ignore the next partinstance
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue

const partId = unprotectString(partInstance.part._id)

// The current part instance is the most important
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
result[partId] = partInstance
continue
}

// Take the part with the highest takeCount
const existingEntry = result[partId]
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
result[partId] = partInstance
}
}

return result
}

function createIngestPartStatus(
playlist: PlaylistCompact | undefined,
partInstances: Record<string, PartInstanceCompact>,
parts: PartCompact[],
ingestPartExternalId: string
): IngestPartStatus {
// Determine the playback status from the PartInstance
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN

let isReady: boolean | null = null // Start off as null, the first value will make this true or false

const itemsReady: IngestPartNotifyItemReady[] = []

const updateStatusWithPart = (part: PartCompact) => {
// If the part affects the ready status, update it
if (typeof part.ingestNotifyPartReady === 'boolean') {
isReady = (isReady ?? true) && part.ingestNotifyPartReady
}

// Include the items
if (part.ingestNotifyItemsReady) {
itemsReady.push(...part.ingestNotifyItemsReady)
}
}

// Loop through the partInstances, starting off the state
if (playlist) {
for (const partInstance of Object.values<PartInstanceCompact>(partInstances)) {
if (!partInstance) continue

if (partInstance.part.shouldNotifyCurrentPlayingPart) {
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id

if (isCurrentPartInstance) {
// If the current, it is playing
playbackStatus = IngestPartPlaybackStatus.PLAY
} else if (playbackStatus === IngestPartPlaybackStatus.UNKNOWN) {
// If not the current, but has been played, it is stopped
playbackStatus = IngestPartPlaybackStatus.STOP
}
}

updateStatusWithPart(partInstance.part)
}
}

for (const part of parts) {
// Check if the part has already been handled by a partInstance
if (partInstances[unprotectString(part._id)]) continue

updateStatusWithPart(part)
}

return {
externalId: ingestPartExternalId,

isReady: isReady,
itemsReady: itemsReady,

playbackStatus,
}
}

Check warning on line 191 in meteor/server/publications/ingestStatus/createIngestRundownStatus.ts

View check run for this annotation

Codecov / codecov/patch

meteor/server/publications/ingestStatus/createIngestRundownStatus.ts#L2-L191

Added lines #L2 - L191 were not covered by tests
Loading
Loading
0