8000 feat(workers): many small RQ worker features by ankush · Pull Request #18995 · frappe/frappe · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(workers): many small RQ worker features #18995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions frappe/commands/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,34 @@ def start_scheduler():


@click.command("worker")
@click.option("--queue", type=str)
@click.option(
"--queue",
type=str,
help="Queue to consume from. Multiple queues can be specified using comma-separated string. If not specified all queues are consumed.",
)
@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs")
@click.option("-u", "--rq-username", default=None, help="Redis ACL user")
@click.option("-p", "--rq-password", default=None, help="Redis ACL user password")
def start_worker(queue, quiet=False, rq_username=None, rq_password=None):
"""Site is used to find redis credentials."""
@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.")
@click.option(
"--strategy",
required=False,
type=click.Choice(["round_robbin", "random"]),
help="Dequeuing strategy to use",
)
def start_worker(
queue, quiet=False, rq_username=None, rq_password=None, burst=False, strategy=None
):
from frappe.utils.background_jobs import start_worker

start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password)
start_worker(
queue,
quiet=quiet,
rq_username=rq_username,
rq_password=rq_password,
burst=burst,
strategy=strategy,
)


@click.command("ready-for-migration")
Expand Down
2 changes: 1 addition & 1 deletion frappe/commands/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ def build_search_index(context):


@click.command("clear-log-table")
@click.option("--doctype", default="text", type=click.Choice(LOG_DOCTYPES), help="Log DocType")
@click.option("--doctype", required=True, type=click.Choice(LOG_DOCTYPES), help="Log DocType")
@click.option("--days", type=int, help="Keep records for days")
@click.option("--no-backup", is_flag=True, default=False, help="Do not backup the table")
@pass_context
Expand Down
10 changes: 10 additions & 0 deletions frappe/core/doctype/rq_job/test_rq_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import frappe
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job
from frappe.tests.utils import FrappeTestCase, timeout
from frappe.utils import cstr, execute_in_shell
from frappe.utils.background_jobs import is_job_queued


Expand Down Expand Up @@ -92,6 +93,15 @@ def test_is_enqueued(self):
self.check_status(actual_job, "finished")
self.assertFalse(is_job_queued(job_name))

@timeout(20)
def test_multi_queue_burst_consumption(self):
for _ in range(3):
for q in ["default", "short"]:
frappe.enqueue(self.BG_JOB, sleep=1, queue=q)

_, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True)
self.assertIn("quitting", cstr(stderr))


def test_func(fail=False, sleep=0):
if fail:
Expand Down
6 changes: 3 additions & 3 deletions frappe/core/doctype/rq_worker/rq_worker.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@
{
"fieldname": "queue",
"fieldtype": "Data",
"label": "Queue"
"label": "Queue(s)"
},
{
"fieldname": "queue_type",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Queue Type",
"label": "Queue Type(s)",
"options": "default\nlong\nshort"
},
{
Expand Down Expand Up @@ -113,7 +113,7 @@
"in_create": 1,
"is_virtual": 1,
"links": [],
"modified": "2022-11-14 15:35:32.786012",
"modified": "2022-11-24 14:50:48.511706",
"modified_by": "Administrator",
"module": "Core",
"name": "RQ Worker",
Expand Down
13 changes: 9 additions & 4 deletions frappe/core/doctype/rq_worker/rq_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ class RQWorker(Document):
def load_from_db(self):

all_workers = get_workers()
worker = [w for w in all_workers if w.pid == cint(self.name)][0]
d = serialize_worker(worker)
workers = [w for w in all_workers if w.pid == cint(self.name)]
if not workers:
raise frappe.DoesNotExistError
d = serialize_worker(workers[0])

super(Document, self).__init__(d)

Expand Down Expand Up @@ -51,12 +53,15 @@ def delete(self):


def serialize_worker(worker: Worker) -> frappe._dict:
queue = ", ".join(worker.queue_names())
queue_names = worker.queue_names()

queue = ", ".join(queue_names)
queue_types = ",".join(q.rsplit(":", 1)[1] for q in queue_names)

return frappe._dict(
name=worker.pid,
queue=queue,
queue_type=queue.rsplit(":", 1)[1],
queue_type=queue_types,
worker_name=worker.name,
status=worker.get_state(),
pid=worker.pid,
Expand Down
28 changes: 24 additions & 4 deletions frappe/utils/background_jobs.py
675F
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import time
from collections import defaultdict
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Union
from typing import TYPE_CHECKING, Any, Literal, NoReturn, Union
from uuid import uuid4

import redis
from redis.exceptions import BusyLoadingError, ConnectionError
from rq import Connection, Queue, Worker
from rq.logutils import setup_loghandlers
from rq.worker import RandomWorker, RoundRobinWorker
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed

import frappe
Expand All @@ -34,9 +35,11 @@ def get_queues_timeout():
custom_workers_config = common_site_config.get("workers", {})
default_timeout = 300

# Note: Order matters here
# If no queues are specified then RQ prioritizes queues in specified order
return {
"default": default_timeout,
"short": default_timeout,
"default": default_timeout,
"long": 1500,
**{
worker: config.get("timeout", default_timeout)
Expand Down Expand Up @@ -209,22 +212,37 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
frappe.destroy()


def start_worker(queue=None, quiet=False, rq_username=None, rq_password=None):
def start_worker(
queue: str | None = None,
quiet: bool = False,
rq_username: str | None = None,
rq_password: str | None = None,
burst: bool = False,
strategy: Literal["round_robbin", "random"] | None = None,
) -> NoReturn | None:
"""Wrapper to start rq worker. Connects to redis and monitors these queues."""
DEQUEUE_STRATEGIES = {"round_robbin": RoundRobinWorker, "random": RandomWorker}

with frappe.init_site():
# empty init is required to get redis_queue from common_site_config.json
redis_connection = get_redis_conn(username=rq_username, password=rq_password)

if queue:
queue = [q.strip() for q in queue.split(",")]
queues = get_queue_list(queue, build_queue_name=True)
queue_name = queue and generate_qname(queue)

if os.environ.get("CI"):
setup_loghandlers("ERROR")

WorkerKlass = DEQUEUE_STRATEGIES.get(strategy, Worker)

with Connection(redis_connection):
logging_level = "INFO"
if quiet:
logging_level = "WARNING"
Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level)
worker = WorkerKlass(queues, name=get_worker_name(queue_name))
worker.work(logging_level=logging_level, burst=burst)


def get_worker_name(queue):
Expand Down Expand Up @@ -367,6 +385,8 @@ def generate_qname(qtype: str) -> str:

qnames are useful to define namespaces of customers.
"""
if isinstance(qtype, list):
qtype = ",".join(qtype)
return f"{get_bench_id()}:{qtype}"


Expand Down
0