[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future DagRun rarely triggered by Race Condition when max_active_runs has reached its upper limit #31407

Open
2 tasks done
doiken opened this issue May 19, 2023 · 4 comments · Fixed by #31414
Open
2 tasks done
Labels
affected_version:2.2 Issues Reported for 2.2 area:core good first issue kind:bug This is a clearly a bug

Comments

@doiken
Copy link
Contributor
doiken commented May 19, 2023

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

The scheduler rarely triggers a DagRun to be executed in the future.
Here are the conditions as I understand them.

  • max_active_runs is set and upper limit is reached
  • The preceding DagRun completes very slightly earlier than the following DagRun

Details in "Anything else".

What you think should happen instead

DagRun should wait until scheduled

How to reproduce

I have confirmed reproduction in Airflow 2.2.2 with the following code.
I reproduced it in my environment after running it for about half a day.

import copy
import logging
import time
from datetime import datetime, timedelta

import pendulum
from airflow import DAG, AirflowException
from airflow.sensors.python import PythonSensor
from airflow.utils import timezone

logger = logging.getLogger(__name__)

# very small min_file_process_interval may help to reproduce more. e.g. min_file_process_interval=3
def create_dag(interval):
    with DAG(
        dag_id=f"example_reproduce_{interval:0>2}",
        schedule_interval=f"*/{interval} * * * *",
        start_date=datetime(2021, 1, 1),
        catchup=False,
        max_active_runs=2,
        tags=["example_race_condition"],
    ) as dag:
        target_s = 10

        def raise_if_future(context):
            now = timezone.utcnow() - timedelta(seconds=30)
            if context["data_interval_start"] > now:
                raise AirflowException("DagRun supposed to be triggered in the future triggered")

        def wait_sync():
            now_dt = pendulum.now()
            if now_dt.minute % (interval * 2) == 0:
                # wait until target time to synchronize end time with the preceding job
                target_dt = copy.copy(now_dt).replace(second=target_s + 2)
                wait_sec = (target_dt - now_dt).total_seconds()
                logger.info(f"sleep {now_dt} -> {target_dt} in {wait_sec} seconds")
                if wait_sec > 0:
                    time.sleep(wait_sec)
                return True

        PythonSensor(
            task_id="t2",
            python_callable=wait_sync,
            # To avoid getting stuck in SequentialExecutor, try to re-poke after the next job starts
            poke_interval=interval * 60 + target_s,
            mode="reschedule",
            pre_execute=raise_if_future,
        )

        return dag

for i in [1, 2]:
    globals()[i] = create_dag(i)

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

MWAA 2.2.2

Anything else

The assumed flow and the associated actual query logs for the case max_active_runs=2 are shown below.

The assumed flow

  1. The first DagRun (DR1) starts
  2. The subsequent DagRun (DR2) starts
  3. DR2 completes; The scheduler set next_dagrun_create_after=null if max_active_runs is exceeded
  4. DR1 completes; The scheduler calls dag_model.calculate_dagrun_date_fields() in SchedulerJobRunner._schedule_dag_run(). The session is NOT committed yet
  5. DagFileProcessorProcess modifies next_dagrun_create_after
  6. The scheduler reflects the calculation result of DR1 to DB by guard.commit()
  7. The scheduler triggers a future DagRun because the current time satisfies next_dagrun_create_after updated in step 6

The associated query log

bb55c5b0bdce: /# grep "future_dagrun_00" /var/lib/postgresql/data/log/postgresql-2023-03-08_210056.log | grep "next_dagrun" 
2023-03-08 22: 00: 01.678 UTC[57378] LOG: statement: UPDATE dag SET next_dagrun_create_after = NULL WHERE dag.dag_id = 'future_dagrun_0' # set in step 3
2023-03-08 22: 00: 08.162 UTC[57472] LOG: statement: UPDATE dag SET last_parsed_time = '2023-03-08T22:00:07.683786+00:00':: timestamptz, next_dagrun = '2023-03-08T22:00:00+00:00':: timestamptz, next_dagrun_data_interval_start = '2023-03-08T22:00:00+00:00':: timestamptz, next_dagrun_data_interval_end = '2023-03-08T23:00:00+00:00':: timestamptz, next_dagrun_create_after = '2023-03-08T23:00:00+00:00'::timestamptz WHERE dag.dag_id = 'future_dagrun_00' # set in step 5
2023-03-08 22: 00: 09.137 UTC[57475] LOG: statement: UPDATE dag SET next_dagrun_create_after = '2023-03-08T22:00:00+00:00'::timestamptz WHERE dag.dag_id = 'future_dagrun_00'  # set in step 6
2023-03-08 22: 00: 10.418 UTC[57479] LOG: statement: UPDATE dag SET next_dagrun = '2023-03-08T23:00:00+00:00':: timestamptz, next_dagrun_data_interval_start = '2023-03-08T23:00:00+00:00':: timestamptz, next_dagrun_data_interval_end = '2023-03-09T00:00:00+00:00':: timestamptz, next_dagrun_create_after = '2023-03-09T00:00:00+00:00'::timestamptz WHERE dag.dag_id = 'future_dagrun_00' # set in step 7

From what I've read of the relevant code in the latest v2.6.1, I believe the problem continues.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@doiken doiken added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 19, 2023
@boring-cyborg
Copy link
boring-cyborg bot commented May 19, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@hussein-awala
Copy link
Member

The scheduler rarely triggers a DagRun to be executed in the future even if allow_trigger_in_future=False.

For allow_trigger_in_future, when it's true, Airflow allows externally triggered DagRuns for Execution Dates in the future. It has effect only if schedule_interval is set to None in DAG. For that I don't understand what is the relation between this config and your issue, especially that you provide a dag not None schedule_interval and you talk about scheduled dag runs in the issue description.

@doiken
Copy link
Contributor Author
doiken commented May 19, 2023

For allow_trigger_in_future, when it's true, Airflow allows externally triggered DagRuns for Execution Dates in the future.

Thank you for letting me know. I was mistaken.
Then allow_trigger_in_future has nothing to do with this issue.

I would like to remove allow_trigger_in_future from the description as well to avoid misunderstanding.

@eladkal eladkal added good first issue affected_version:2.2 Issues Reported for 2.2 and removed needs-triage label for new issues that we didn't triage yet labels Jul 13, 2023
@ephraimbuddy
Copy link
Contributor

Reopening this as I have reverted it.
@doiken , I'm having difficulty understanding this issue. Maybe you can add screenshots of the date differences

@ephraimbuddy ephraimbuddy reopened this Feb 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:core good first issue kind:bug This is a clearly a bug
Projects
None yet
4 participants