From 7f76b6e65e0bcb76e18ec416355846902bc49e2b Mon Sep 17 00:00:00 2001 From: Wolf480pl Date: Fri, 23 Feb 2018 12:06:01 +0100 Subject: [PATCH 01/11] Make queue printing work on PrioritizingScheduler Add __repr__() to various classes in scheduler/prioritizing.py so that printing the queue works. --- sio/sioworkersd/scheduler/prioritizing.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sio/sioworkersd/scheduler/prioritizing.py b/sio/sioworkersd/scheduler/prioritizing.py index 0aca465..0d3cf62 100644 --- a/sio/sioworkersd/scheduler/prioritizing.py +++ b/sio/sioworkersd/scheduler/prioritizing.py @@ -100,6 +100,9 @@ def popleft(self): def getTasksRequiredRam(self): return self._tasks_required_ram + def __repr__(self): + return '' % self.dict.keys() + class WorkerInfo(object): """A class responsible for tracking state of a single worker. @@ -202,6 +205,11 @@ def __init__(self, env, contest): # Mutable data self.assigned_worker = None + def __repr__(self): + return '' % (self.id, + 'realCPU' if self.real_cpu else 'vCPU', self.priority, + self.contest, self.assigned_worker) + class ContestInfo(object): """Tracks priority and weight of a contest. @@ -222,6 +230,10 @@ def __init__(self, contest_uid, priority, weight): self.priority = priority self.weight = weight + def __repr__(self): + return '' % (self.uid, + self.priority, self.weight) + class TasksQueues(object): """Per-contest priority queues of tasks. @@ -298,6 +310,9 @@ def chooseTask(self): return self.queues[best_contest][-1] + def __repr__(self): + return '' % self.queues + class PrioritizingScheduler(Scheduler): """The prioritizing scheduler main class, implementing scheduler interface. From 32482bb5b4e26462d36424a17dc574f319edbcbc Mon Sep 17 00:00:00 2001 From: Wolf480pl Date: Fri, 23 Feb 2018 13:55:28 +0100 Subject: [PATCH 02/11] Make get_queues output dict representation of queues A dict representation of queues that can be easily serialized to json is way more admin-friendly than a unicode representation based on repr(). - add a dump() method to the scheduler interface, and make it fall back to unicode(self) by default - implement the dump() method for prioritizing scheduler and all its internal objects - use the dump() method instead of unicode() in taskmanager. getQueues() --- sio/sioworkersd/scheduler/__init__.py | 5 +++++ sio/sioworkersd/scheduler/prioritizing.py | 19 +++++++++++++++++++ sio/sioworkersd/taskmanager.py | 2 +- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sio/sioworkersd/scheduler/__init__.py b/sio/sioworkersd/scheduler/__init__.py index 370ed27..bb79a6f 100644 --- a/sio/sioworkersd/scheduler/__init__.py +++ b/sio/sioworkersd/scheduler/__init__.py @@ -34,6 +34,11 @@ def schedule(self): (task_id, worker_id).""" raise NotImplementedError() + def dump(self): + """Return a dict representation of the scheduler state""" + # use string representation as a fallback + return unicode(self) + def getDefaultSchedulerClassName(): return 'sio.sioworkersd.scheduler.prioritizing.PrioritizingScheduler' diff --git a/sio/sioworkersd/scheduler/prioritizing.py b/sio/sioworkersd/scheduler/prioritizing.py index 0d3cf62..5b36a54 100644 --- a/sio/sioworkersd/scheduler/prioritizing.py +++ b/sio/sioworkersd/scheduler/prioritizing.py @@ -103,6 +103,9 @@ def getTasksRequiredRam(self): def __repr__(self): return '' % self.dict.keys() + def __iter__(self): + return iter(self.dict.keys()) + class WorkerInfo(object): """A class responsible for tracking state of a single worker. @@ -210,6 +213,11 @@ def __repr__(self): 'realCPU' if self.real_cpu else 'vCPU', self.priority, self.contest, self.assigned_worker) + def dump(self): + return {'id': self.id, 'real_cpu': self.real_cpu, + 'priority': self.priority, 'contest': self.contest.dump(), + 'assigned_worker': self.assigned_worker.id if self.assigned_worker else None} + class ContestInfo(object): """Tracks priority and weight of a contest. @@ -234,6 +242,10 @@ def __repr__(self): return '' % (self.uid, self.priority, self.weight) + def dump(self): + return {'uid': self.uid, 'priority': self.priority, + 'weight': self.weight} + class TasksQueues(object): """Per-contest priority queues of tasks. @@ -313,6 +325,9 @@ def chooseTask(self): def __repr__(self): return '' % self.queues + def dump(self): + return {('%s:%s' % k.uid): [x.dump() for x in v] for k, v in self.queues.items()} + class PrioritizingScheduler(Scheduler): """The prioritizing scheduler main class, implementing scheduler interface. @@ -371,6 +386,10 @@ def __unicode__(self): """ return unicode((self.tasks_queues, self.waiting_real_cpu_tasks)) + def dump(self): + return {'tasks_queues': {k: v.dump() for k,v in self.tasks_queues.items()}, + 'waiting_real_cpu_tasks': [x.dump() for x in self.waiting_real_cpu_tasks]} + # Worker scheduling def _insertWorkerToQueue(self, worker): diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index b0b4d65..a2d65bb 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -188,7 +188,7 @@ def _deferTask(self, env): return d def getQueue(self): - return unicode(self.scheduler) + return self.scheduler.dump() def _addGroup(self, group_env): singleTasks = [] From 7f7ef4ddd36fea159317f4c80f654344a3d2591b Mon Sep 17 00:00:00 2001 From: Wolf480pl Date: Sun, 25 Oct 2020 22:21:04 +0100 Subject: [PATCH 03/11] fixup! Make queue printing work on PrioritizingScheduler --- sio/sioworkersd/scheduler/prioritizing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sio/sioworkersd/scheduler/prioritizing.py b/sio/sioworkersd/scheduler/prioritizing.py index 5b36a54..4810fe0 100644 --- a/sio/sioworkersd/scheduler/prioritizing.py +++ b/sio/sioworkersd/scheduler/prioritizing.py @@ -101,7 +101,7 @@ def getTasksRequiredRam(self): return self._tasks_required_ram def __repr__(self): - return '' % self.dict.keys() + return '<_WaitingTasksQueue %r>' % self._dict.keys() def __iter__(self): return iter(self.dict.keys()) From 87d3db7e3bd70408186d260155d70040f71752ea Mon Sep 17 00:00:00 2001 From: Wolf480pl Date: Sun, 25 Oct 2020 22:22:13 +0100 Subject: [PATCH 04/11] fixup! Make get_queues output dict representation of queues --- sio/sioworkersd/scheduler/prioritizing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sio/sioworkersd/scheduler/prioritizing.py b/sio/sioworkersd/scheduler/prioritizing.py index 4810fe0..414cfa3 100644 --- a/sio/sioworkersd/scheduler/prioritizing.py +++ b/sio/sioworkersd/scheduler/prioritizing.py @@ -104,7 +104,7 @@ def __repr__(self): return '<_WaitingTasksQueue %r>' % self._dict.keys() def __iter__(self): - return iter(self.dict.keys()) + return iter(self._dict.keys()) class WorkerInfo(object): From 4406a4868fba86dd067bc57465dbd112f8796397 Mon Sep 17 00:00:00 2001 From: Wolf480pl Date: Sun, 25 Oct 2020 22:22:59 +0100 Subject: [PATCH 05/11] (no-ticket) Fix unicode handling in remote exceptions If the remote exception contains non-ascii unicode characters, the str() used by MultiException would try to encode it in ascii and fail. To avoid that, use a unicode format string, and then explicitly encode it as utf-8 as to avoid implicit encoding by another str() deep in twisted's bowels. --- sio/sioworkersd/taskmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index a2d65bb..cf6c43a 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -36,7 +36,7 @@ def __init__(self, desc, excs): s = desc + '\n\n' l = [] for (e, tb) in excs: - l.append("Exception: %s\n%s" % (str(e), tb)) + l.append((u"Exception: %s\n%s" % (e, tb)).encode('utf-8')) s += ('='*80 + '\n').join(l) super(MultiException, self).__init__(s) From 966ebcd8268a9ae3e213b10fcdc77b9ebbe05944 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 26 Oct 2020 11:48:58 +0100 Subject: [PATCH 06/11] (no-ticket) Increase checker memory limit a bit --- sio/sioworkersd/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sio/sioworkersd/utils.py b/sio/sioworkersd/utils.py index 6c60a63..e993403 100644 --- a/sio/sioworkersd/utils.py +++ b/sio/sioworkersd/utils.py @@ -7,7 +7,7 @@ 'inwer': 256 * 1024, 'compile': 512 * 1024, 'exec': 64 * 1024, - 'checker': 256 * 1024, + 'checker': 268 * 1024, 'default': 256 * 1024, } From 4bc3257ec3f4527c4925cd9ed29acd44cdaeb8cb Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 26 Oct 2020 11:53:24 +0100 Subject: [PATCH 07/11] (no-ticket) [sioworkersd] Handle corrupt db entries gracefully When loading jobs from database on startup, and some of them are not valid jsons, ignore/delete the invalid ones, instead of crashing the whole daemon. --- sio/sioworkersd/taskmanager.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index cf6c43a..8b20aa8 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -62,7 +62,18 @@ def restart_db_sync_task(failure, task): task=self.db_sync_task) def get_items(self): - return [json.loads(self.db[k]) for k in self.db.keys()] + items = [] + error = [] + for k in self.db.keys(): + try: + items.append(json.loads(self.db[k])) + except: + error.append(k) + log.error("Failed to decode {key}", key=k) + for k in error: + log.error("Removing {key}", key=k) + #del self.db[k] + return items def update(self, job_id, dict_update, sync=True): job = json.loads(self.db.get(job_id, '{}')) From e89f569446ac44f7618d594579850b226f5c107d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 26 Oct 2020 11:57:01 +0100 Subject: [PATCH 08/11] (no-ticket) [sioworkersd] Limit startup returnToSio concurrency At startup, when returning old 'to_return' jobs from database, don't return them all in parallell, as that overwhelms sio2's receive_from_workers, causing return failures above the retry limit, stuck jobs, and overall sadness. Instead, use a fixed-size pool of job-returning things, each returning its portion of jobs in a serial manner. --- sio/sioworkersd/taskmanager.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index 8b20aa8..566fb6f 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -111,6 +111,10 @@ def startService(self): if len(all_jobs) > 0: log.info("Unfinished jobs found in database, resuming them...") + return_old_task_concurrency = 16 + jobs_to_return = [ [] for _ in range(return_old_task_concurrency) ] + j = 0 + for job in all_jobs: if job['status'] == 'to_judge': d = self._addGroup(job['env']) @@ -118,11 +122,22 @@ def startService(self): d.addBoth(self.returnToSio, url=job['env']['return_url'], orig_env=job['env'], tid=job['id']) elif job['status'] == 'to_return': - log.warn("Trying again to return old task {tid}", - tid=job['id']) - self.returnToSio(job['env'], url=job['env']['return_url'], - orig_env=job['env'], tid=job['id'], - count=job['retry_cnt']) + jobs_to_return[j].append(job) + j = (j + 1) % return_old_task_concurrency + + for i in range(return_old_task_concurrency): + log.warn("Returning {n} tasks", n=len(jobs_to_return[i])) + def return_old_task(x, i, jobs): + if len(jobs) != 0: + job = jobs.pop() + log.warn("Trying again to return old task {tid} from {qid}", + tid=job['id'], qid=i) + d = self.returnToSio(job['env'], url=job['env']['return_url'], + orig_env=job['env'], tid=job['id'], + count=job['retry_cnt']) + d.addBoth(return_old_task, i=i, jobs=jobs) + return_old_task(None, i=i, jobs=jobs_to_return[i]) + self.workerm.notifyOnNewWorker(self._newWorker) self.workerm.notifyOnLostWorker(self._lostWorker) self._tryExecute() From 8fc9442dd40de394b32a96f5a2ade2baff22b308 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 26 Oct 2020 12:04:34 +0100 Subject: [PATCH 09/11] [TMP] Add some log statements for easier debugging of stuck tasks? or sth --- sio/sioworkersd/scheduler/prioritizing.py | 16 ++++++++++++++++ sio/sioworkersd/taskmanager.py | 4 ++++ 2 files changed, 20 insertions(+) diff --git a/sio/sioworkersd/scheduler/prioritizing.py b/sio/sioworkersd/scheduler/prioritizing.py index 414cfa3..972e8e7 100644 --- a/sio/sioworkersd/scheduler/prioritizing.py +++ b/sio/sioworkersd/scheduler/prioritizing.py @@ -51,6 +51,9 @@ from sio.sioworkersd.scheduler import Scheduler from sio.sioworkersd.utils import get_required_ram_for_job +from twisted.logger import Logger, LogLevel + +log = Logger() class _WaitingTasksQueue(object): @@ -404,6 +407,7 @@ def _removeWorkerFromQueue(self, worker): def addWorker(self, worker_id): """Will be called when a new worker appears.""" + log.warn("addWorker, workers {}".format(len(self.workers))) worker = WorkerInfo(worker_id, self.manager.getWorkers()[worker_id]) self.workers[worker_id] = worker @@ -693,10 +697,22 @@ def schedule(self): """Return a list of tasks to be executed now, as a list of pairs (task_id, worker_id). """ + if self.tasks: + log.warn("{} tasks availible, tasks queues:".format(len(self.tasks))) + for q, v in self.tasks_queues.iteritems(): + log.warn(" {} {}".format(q, bool(v))) + log.warn("workers {}, queues:".format(len(self.workers))) + for q, v in self.workers_queues.iteritems(): + log.warn(" {} {}".format(q, len(v))) + result = [] while True: association = self._scheduleOnce() if association is None: break result.append(association) + + if result: + log.warn("{} tasks scheduled".format(len(result))) + return result diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index 566fb6f..3f06216 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -155,6 +155,9 @@ def _tryExecute(self, x=None): # a performance problem for complex schedulers, especially during # rejudges. A solution exists, but it is a bit complex. jobs = self.scheduler.schedule() + if len(jobs) > 0: + log.warn("jobs: {}, inProgress: {}".format(len(jobs), len(self.inProgress))) + for (task_id, worker) in jobs: task = self.inProgress[task_id] d = self.workerm.runOnWorker(worker, task.env) @@ -208,6 +211,7 @@ def _deferTask(self, env): if tid in self.inProgress: raise RuntimeError('Tried to add same task twice') d = defer.Deferred() + log.warn("adding task, inProgress {}".format(len(self.inProgress))) self.inProgress[tid] = Task(env=env, d=d) d.addBoth(self._taskDone, tid=tid) From a8b082a1e42813dadc87fef99e3acbcbc801c556 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 26 Oct 2020 12:05:51 +0100 Subject: [PATCH 10/11] [tmphack] Add a bunch of str() to avoid some crash I frogot Whoever wrote this (probably me) was fixing some issue caused by task_id not being a string. The placement of str() calls and the presumed reliability of uuid4().run returing a str suggests that the bogus task_ids must've come from database. --- sio/sioworkersd/taskmanager.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index 3f06216..b66ebd0 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -76,9 +76,9 @@ def get_items(self): return items def update(self, job_id, dict_update, sync=True): - job = json.loads(self.db.get(job_id, '{}')) + job = json.loads(self.db.get(str(job_id), '{}')) job.update(dict_update) - self.db[job_id] = json.dumps(job) + self.db[str(job_id)] = json.dumps(job) if sync: self.db.sync() @@ -120,7 +120,7 @@ def startService(self): d = self._addGroup(job['env']) log.debug("added again unfinished task {tid}", tid=job['id']) d.addBoth(self.returnToSio, url=job['env']['return_url'], - orig_env=job['env'], tid=job['id']) + orig_env=job['env'], tid=str(job['id'])) elif job['status'] == 'to_return': jobs_to_return[j].append(job) j = (j + 1) % return_old_task_concurrency @@ -133,7 +133,7 @@ def return_old_task(x, i, jobs): log.warn("Trying again to return old task {tid} from {qid}", tid=job['id'], qid=i) d = self.returnToSio(job['env'], url=job['env']['return_url'], - orig_env=job['env'], tid=job['id'], + orig_env=job['env'], tid=str(job['id']), count=job['retry_cnt']) d.addBoth(return_old_task, i=i, jobs=jobs) return_old_task(None, i=i, jobs=jobs_to_return[i]) @@ -181,6 +181,7 @@ def _retry_on_disconnect(failure, task_id=task_id, task=task): return x def _taskDone(self, x, tid): + tid = str(tid) if isinstance(x, Failure): self.inProgress[tid].env['error'] = { 'message': x.getErrorMessage(), @@ -208,6 +209,7 @@ def _taskDone(self, x, tid): def _deferTask(self, env): tid = env['task_id'] + tid = str(tid) if tid in self.inProgress: raise RuntimeError('Tried to add same task twice') d = defer.Deferred() @@ -360,7 +362,7 @@ def retry(err, retry_cnt): return ret def _returnDone(self, _, tid): - self.database.delete(tid, sync=False) + self.database.delete(str(tid), sync=False) # No db sync here, because we are allowing some jobs to be done # multiple times in case of server failure for better performance. # It should be synced soon with other task From ae44f9aee3b1dff3d167e10bffb68290ef365aa8 Mon Sep 17 00:00:00 2001 From: SIO2 Instance - Morality Core Date: Fri, 10 Dec 2021 23:02:54 +0100 Subject: [PATCH 11/11] sioworkersd: add RPC method to dump in-progress task envs --- sio/sioworkersd/siorpc.py | 3 +++ sio/sioworkersd/taskmanager.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/sio/sioworkersd/siorpc.py b/sio/sioworkersd/siorpc.py index 947303a..e74abeb 100644 --- a/sio/sioworkersd/siorpc.py +++ b/sio/sioworkersd/siorpc.py @@ -44,6 +44,9 @@ def xmlrpc_get_workers(self): def xmlrpc_get_queue(self): return self.taskm.getQueue() + def xmlrpc_get_tasks(self): + return self.taskm.getTasks() + def _prepare_group(self, env): tasks = env['workers_jobs'] group_id = 'GROUP_' + uuid4().urn diff --git a/sio/sioworkersd/taskmanager.py b/sio/sioworkersd/taskmanager.py index b66ebd0..9f7182f 100644 --- a/sio/sioworkersd/taskmanager.py +++ b/sio/sioworkersd/taskmanager.py @@ -222,6 +222,9 @@ def _deferTask(self, env): def getQueue(self): return self.scheduler.dump() + def getTasks(self): + return {k: t.env for k, t in self.inProgress.iteritems()} + def _addGroup(self, group_env): singleTasks = [] idMap = {}