8000 Queue improvement by tc-mccarthy · Pull Request #36 · tc-mccarthy/torrent-docker-rig · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Queue improvement #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2b11518
Removes unsupported dolby video versions
tc-mccarthy Jun 27, 2025
5735426
Adds logic for getting/storing the computeScore
tc-mccarthy Jun 27, 2025
c559cbf
Adds logic for getting/storing the computeScore
tc-mccarthy Jun 27, 2025
030db63
Corrects compute score logic
tc-mccarthy Jun 27, 2025
a7ffcd2
Implements transcode queue
tc-mccarthy Jun 27, 2025
8448229
Implements transcode queue
tc-mccarthy Jun 27, 2025
ec42437
Another attempt at implementing the transcodeQueue
tc-mccarthy Jun 27, 2025
2fd0554
Adds transcodeQueue start call
tc-mccarthy Jun 27, 2025
1e55b58
Fixes computeScore custom getter
tc-mccarthy Jun 27, 2025
499e7bb
Moves integrity check to debug
tc-mccarthy Jun 27, 2025
461eefc
Adds available compute logging
tc-mccarthy Jun 27, 2025
20ea8ef
Updates interface colors to be dark. Adds compute score to table
tc-mccarthy Jun 28, 2025
e24dfb8
Adds computeScore to filelist.json
tc-mccarthy Jun 28, 2025
8250135
Moves variables to an independent file and makes nav dark
tc-mccarthy Jun 28, 2025
67fdea6
Progress labels have color styles now too
tc-mccarthy Jun 28, 2025
664adfb
Progress labels have color styles now too
tc-mccarthy Jun 28, 2025
9ae851a
Updates package.json version
tc-mccarthy Jun 28, 2025
037cfaa
Another attempt at committing
tc-mccarthy Jun 28, 2025
8678fe9
Adds important to progress-label override
tc-mccarthy Jun 28, 2025
1e8973b
Cache busts bundle request
tc-mccarthy Jun 28, 2025
77b02cf
Adds logic for reducing the compute score if we decide not to transco…
tc-mccarthy Jun 28, 2025
0436899
Improves the scheduleJobs method to allow the process queue to open u…
tc-mccarthy Jun 28, 2025
45fe965
Adds compute score to front end. Reorganizes blocks
tc-mccarthy Jun 28, 2025
793b2e4
More output improvements
tc-mccarthy Jun 28, 2025
22e56af
Move UI improvements
tc-mccarthy Jun 28, 2025
854c6a3
Moves file progress to the top
tc-mccarthy Jun 28, 2025
de6c249
Some more style improvements
tc-mccarthy Jun 28, 2025
9f7daf2
One more style change
tc-mccarthy Jun 28, 2025
264df89
Removes all memcached dependency
tc-mccarthy Jun 28, 2025
368d443
Removes unneeded return from setLock method
tc-mccarthy Jun 28, 2025
bc3125b
Moves integrity check away from a loop and to a queue
tc-mccarthy Jun 28, 2025
c3de33d
Improved comments
tc-mccarthy Jun 28, 2025
16e27ab
Extends transcodeQueue poll delay to 10 seconds. Adds qualifier to av…
tc-mccarthy Jun 28, 2025
a134b3f
Adds logging to indicate when jobs are being delayed to allow for com…
tc-mccarthy Jun 28, 2025
9f97172
More logging
tc-mccarthy Jun 28, 2025
2ccb47b
Corrects prioritization logic
tc-mccarthy Jun 28, 2025
5a118bb
Reduces verbosity
tc-mccarthy Jun 28, 2025
ad3c022
Improves data refresh logic
tc-mccarthy Jun 28, 2025
5f907ca
Adds priority to output
tc-mccarthy Jun 28, 2025
d38f32a
Moves date to ETA
tc-mccarthy Jun 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions transcode/eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ export default [{
"new-parens": "error",
"no-array-constructor": "error",
"no-async-promise-executor": 0,
"no-await-in-loop": 0,
"no-caller": "error",
"no-case-declarations": "error",
"no-class-assign": "error",
Expand Down
35 changes: 14 additions & 21 deletions transcode/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import dayjs from 'dayjs';
import mongo_connect from './lib/mongo_connection';
import update_active from './lib/update_active';
import update_queue from './lib/update_queue';
import transcode_loop from './lib/transcode_loop';
import fs_monitor from './lib/fs_monitor';
import redisClient from './lib/redis';
import logger from './lib/logger';
Expand All @@ -12,7 +11,8 @@ import pre_sanitize from './lib/pre_sanitize';
import { create_scratch_disks } from './lib/fs';
import config from './lib/config';
import generate_filelist from './lib/generate_filelist';
import integrity_loop from './lib/integrity_check_loop';
import IntegrityQueue from './lib/integrityQueue';
import TranscodeQueue from './lib/transcodeQueue';

const {
concurrent_transcodes,
Expand Down Expand Up @@ -55,38 +55,31 @@ async function run () {
// update the transcode queue
update_queue();

// start the transcode loops
logger.info(`Starting ${concurrent_transcodes} transcode loops...`);
const transcodeQueue = new TranscodeQueue({ maxScore: concurrent_transcodes, pollDelay: 10000 });
transcodeQueue.start();

Array.from({ length: concurrent_transcodes }).forEach((val, idx) => {
transcode_loop(idx);
});
const integrityQueue = new IntegrityQueue({ maxScore: concurrent_integrity_checks });

const currentHourLocalTime = dayjs().tz(process.env.TZ).hour();
logger.info(
`Current local time is ${currentHourLocalTime}`
);
if (currentHourLocalTime >= 0 && currentHourLocalTime < 9) {
logger.info(
'Starting integrity check loop immediately since it is before 9 AM'
);
integrity_loop();
integrityQueue.start();
}

// start the integrity check loops every day at midnight
// start the integrity check queue every day at midnight
cron.schedule('0 0 * * *', () => {
logger.info(
`Starting ${concurrent_integrity_checks} integrity check loops...`
);
Array.from({ length: concurrent_integrity_checks }).forEach(
(val, idx) => {
integrity_loop(idx);
}
);
integrityQueue.start();
});

// pause the integrity check queue every day at 9am
cron.schedule('0 9 * * *', () => {
integrityQueue.stop();
});

// generate the filelist every 10 minutes
cron.schedule('*/5 * * * *', async () => {
cron.schedule('*/2 * * * *', async () => {
generate_filelist({ limit: 1000, writeToFile: true });
});

Expand Down
10 changes: 10 additions & 0 deletions transcode/lib/ffprobe.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,21 @@ export default async function ffprobe_func (file) {
video.aspect = aspect_round(video.width / video.height);
}

// mark the video as unsupported if it has a dv_profile value and that value is less than 8
if (video.side_data_list?.dv_profile && video.side_data_list?.dv_profile < 8) {
throw new Error(`Video not supported: Dolby Vision profile version is less than 8`);
}

return data;
} catch (e) {
if (/command\s+failed/gi.test(e.message)) {
trash(file);
}

if (/video\s+not\s+supported/gi.test(e.message)) {
trash(file);
}

logger.error('FFPROBE FAILED', e);
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion transcode/lib/generate_filelist.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export default async function generate_filelist ({ limit = 1, writeToFile = fals
codec: `${
f.probe.streams.find((v) => v.codec_type === 'video')?.codec_name
}/${f.probe.streams.find((v) => v.codec_type === 'audio')?.codec_name}`,
encode_version: f.encode_version
encode_version: f.encode_version,
computeScore: f.computeScore
}))
)
);
Expand Down
10 changes: 1 addition & 9 deletions transcode/lib/integrityCheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,11 @@ export default function integrityCheck (file) {
return new Promise(async (resolve, reject) => {
try {
// mongo record of the video
logger.info(file, { label: 'INTEGRITY CHECKING FILE' });
logger.debug(file, { label: 'INTEGRITY CHECKING FILE' });

const video_record = file;
file = file.path;

// if the file is locked, short circuit
if (await video_record.hasLock()) {
logger.info(
`File is locked. Skipping integrity check: ${file} - ${video_record._id}`
);
return resolve();
}

await video_record.setLock('integrity');

const exists = fs.existsSync(file);
Expand Down
95 changes: 95 additions & 0 deletions transcode/lib/integrityQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { setTimeout as delay } from 'timers/promises';
import integrityCheck from './integrityCheck';
import logger from './logger';
import generate_integrity_filelist from './generate_integrity_filelist';

export default class IntegrityQueue {
constructor ({ maxScore = 4, pollDelay = 2000 }) {
// start the integrity check loops
logger.info(`Initiating integrity check queue for a max compute of ${maxScore}...`);
this.maxScore = maxScore; // Max compute units allowed simultaneously
this.pollDelay = pollDelay; // Delay between scheduling attempts (ms)
this.runningJobs = []; // In-memory list of currently active jobs
this._isRunning = false; // Flag for controlling the loop
}

// Starts the recursive scheduling loop
async start () {
if (this._isRunning) return;
this._isRunning = true;
logger.info('Integrity check queue started.');
await this.loop();
}

// Stops the queue
stop () {
this._isRunning = false;
console.log('Integrity queue stopped.');
}

// Returns total compute in use
getUsedCompute () {
return this.runningJobs.reduce((sum, job) => sum + job.computeScore, 0);
}

// Returns available compute capacity
getAvailableCompute () {
return this.maxScore - this.getUsedCompute();
}

// Main loop: tries to schedule jobs and waits before the next run
async loop () {
while (this._isRunning) {
await this.scheduleJobs();
await delay(this.pollDelay); // Wait before checking again
}
}

// Attempts to find and run a job that fits within available compute
async scheduleJobs () {
const availableCompute = this.getAvailableCompute();
logger.info(`Available integrity check compute: ${availableCompute}.`);

if (availableCompute <= 0) return;

logger.info('Checking for new jobs to run...');
const jobs = await generate_integrity_filelist({ limit: 50 });

// Are there any jobs being blocked due to lack of compute?
const blockedHighPriorityJob = jobs.find((job) => {
const alreadyRunning = this.runningJobs.some((j) => j._id.toString() === job._id.toString());
return !alreadyRunning && job.computeScore > availableCompute;
});

// Now let's find the next job that will fit within available compute
const nextJob = jobs.find((job) => {
const alreadyRunning = this.runningJobs.some((j) => j._id.toString() === job._id.toString()); // skip any already running jobs
if (alreadyRunning || job.computeScore > availableCompute) return false; // discount any jobs that are already running or exceed available compute

// If a higher-priority job is blocked, don't schedule lower-priority jobs
if (blockedHighPriorityJob && job.sortFields.priority > blockedHighPriorityJob.sortFields.priority) return false; // if a higher-priority job is blocked, don't schedule lower-priority jobs, let the queue open up to process the higher-priority job

// If we reach here, the job is eligible to run
return true;
});

if (nextJob) {
this.runJob(nextJob);
}
}

// Handles job execution and cleanup
async runJob (job) {
try {
this.runningJobs.push(job);
await integrityCheck(job); // Await external ffmpeg logic
} catch (err) {
console.error(`Integrity check failed for ${job.inputPath}: ${err.message}`);
} finally {
// Always clean up the memory queue
this.runningJobs = this.runningJobs.filter(
(j) => j._id.toString() !== job._id.toString()
);
}
}
}
40 changes: 0 additions & 40 deletions transcode/lib/integrity_check_loop.js

This file was deleted.

3 changes: 3 additions & 0 deletions transcode/lib/round-to-nearest-quarter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function roundToNearestQuarter (number) {
return Math.round(number * 4) / 4;
}
20 changes: 11 additions & 9 deletions transcode/lib/transcode.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ export default function transcode (file) {
throw new Error(`Video record not found for file: ${file}`);
}

// if the file is locked, short circuit
if (await video_record.hasLock('transcode')) {
logger.info(
`File is locked. Skipping transcode: ${file} - ${video_record._id}`
);
return resolve({ locked: true });
}

const { profiles } = config;
const exists = fs.existsSync(file);

Expand Down Expand Up @@ -169,7 +161,7 @@ export default function transcode (file) {
ffprobe_data.format.size <= 350000
) {
logger.debug(
'Video stream codec is HEVC and size is less than 1GB. Not transcoding'
'Video stream codec is h264 and size is less than 350mb. Not transcoding'
);
transcode_video = false;
}
Expand Down Expand Up @@ -202,6 +194,14 @@ export default function transcode (file) {
await integrityCheck(video_record);
}

if (!transcode_video) {
video_record.computeScore = 0.2; // set the compute score to 0.2 because we're not transcoding
}

if (!transcode_audio) {
video_record.computeScore -= 0.1; // reduce 0.1 from the compute score because we're not transcoding audio
}

let cmd = ffmpeg(file);

cmd = cmd
Expand Down Expand Up @@ -293,6 +293,8 @@ export default function transcode (file) {
pct_remaining,
time_remaining,
est_completed_seconds,
computeScore: video_record.computeScore,
priority: video_record.sortFields.priority,
size: {
progress: {
kb: progress.targetSize,
Expand Down
Loading
0