8000 fix: `start` button to enqueue `QueueItem` by yottahmd · Pull Request #964 · dagu-org/dagu · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: start button to enqueue QueueItem #964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

8000
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
461 changes: 101 additions & 360 deletions api/v2/api.gen.go

Large diffs are not rendered by default.

< 8000 div class="load-diff-retry text-center p-1" data-show-on-error hidden> Oops, something went wrong.
52 changes: 0 additions & 52 deletions api/v2/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -293,58 +293,6 @@ paths:
schema:
$ref: "#/components/schemas/Error"

/dags/{fileName}/stop:
post:
summary: "Terminate a running DAG-run"
description: "Forcefully stops a running DAG-run created from this DAG"
operationId: "terminateDAGDAGRun"
tags:
- "dags"
parameters:
- $ref: "#/components/parameters/RemoteNode"
- $ref: "#/components/parameters/DAGFileName"
responses:
"200":
description: "A successful response"
default:
description: "Generic error response"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"

/dags/{fileName}/retry:
post:
summary: "Retry DAG-run execution"
description: "Creates a new DAG-run based on a previous execution"
operationId: "retryDAGDAGRun"
tags:
- "dags"
parameters:
- $ref: "#/components/parameters/RemoteNode"
- $ref: "#/components/parameters/DAGFileName"
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
dagRunId:
type: string
description: "ID of the DAG-run to retry"
required:
- dagRunId
responses:
"200":
description: "A successful response"
default:
description: "Generic error response"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"

/dags/{fileName}/dag-runs:
get:
summary: "Retrieve execution history of a DAG"
Expand Down
5 changes: 3 additions & 2 deletions internal/cmd/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
hrOpts = append(hrOpts, localdagrun.WithHistoryFileCache(hc))
}

drs := localdagrun.New(cfg.Paths.DAGRunsDir, hrOpts...)
drm := dagrun.New(drs, cfg.Paths.Executable, cfg.Global.WorkDir)
ps := localproc.New(cfg.Paths.ProcDir)
drs := localdagrun.New(cfg.Paths.DAGRunsDir, hrOpts...)
drm := dagrun.New(drs, ps, cfg.Paths.Executable, cfg.Global.WorkDir)
qs := localqueue.New(cfg.Paths.QueueDir)

return &Context{
Expand All @@ -140,6 +140,7 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
func (c *Context) HistoryManager(drs models.DAGRunStore) dagrun.Manager {
return dagrun.New(
drs,
c.ProcStore,
c.Config.Paths.Executable,
c.Config.Global.WorkDir,
)
Expand Down
18 changes: 16 additions & 2 deletions internal/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,29 @@ func runStart(ctx *Context, args []string) error {
return handleChildDAGRun(ctx, dag, dagRunID, params, root, parent)
}

var disabledQueue bool
if os.Getenv("DISABLE_DAG_RUN_QUEUE") != "" {
disabledQueue = true
}

// Log root dag-run
logger.Info(ctx, "Executing root dag-run",
"dag", dag.Name,
"params", params,
"dagRunId", dagRunID,
)

// Execute the dag-run
return executeDAGRun(ctx, dag, digraph.DAGRunRef{}, dagRunID, root)
// Check if the DAG needs to be enqueued or executed directly
// We need to enqueue it unless if the queue is disabled
if dag.MaxActiveRuns < 0 || disabledQueue {
// MaxActiveRuns < 0 means queueing is disabled for this DAG
return executeDAGRun(ctx, dag, digraph.DAGRunRef{}, dagRunID, root)
}

dag.Location = "" // Queued dag-runs must not have a location

// Enqueue the DAG-run for execution
return enqueueDAGRun(ctx, dag, dagRunID)
}

// getDAGRunInfo extracts and validates dag-run ID and references from command flags
Expand Down
48 changes: 30 additions & 18 deletions internal/dagrun/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
// The Manager is used to interact with the DAG.
func New(
drs models.DAGRunStore,
ps models.ProcStore,
executable string,
workDir string,
) Manager {
return Manager{
dagRunStore: drs,
procStore: ps,
executable: executable,
workDir: workDir,
}
Expand All @@ -39,6 +41,7 @@ func New(
// through a socket interface and manages dag-run data.
type Manager struct {
dagRunStore models.DAGRunStore // Store interface for persisting run data
procStore models.ProcStore // Store interface for process management

executable string // Path to the executable used to run DAGs
workDir string // Working directory for executing commands
Expand Down Expand Up @@ -305,7 +308,6 @@ func (m *Manager) FindChildDAGRunStatus(ctx context.Context, rootDAGRun digraph.
// currentStatus retrieves the current status of a running DAG by querying its socket.
// This is a private method used internally by other status-related methods.
func (*Manager) currentStatus(_ context.Context, dag *digraph.DAG, dagRunID string) (*models.DAGRunStatus, error) {
// FIXME: Should handle the case of dynamic DAG
AE96 client := sock.NewClient(dag.SockAddr(dagRunID))
statusJSON, err := client.Request("GET", "/status")
if err != nil {
Expand All @@ -319,7 +321,18 @@ func (*Manager) currentStatus(_ context.Context, dag *digraph.DAG, dagRunID stri
// If the DAG is running, it attempts to get the current status from the socket.
// If that fails or no status exists, it returns an initial status or an error.
func (m *Manager) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (models.DAGRunStatus, error) {
var latestStatus *models.DAGRunStatus
var status *models.DAGRunStatus

// Find the proc store to check if the DAG is running
alive, _ := m.procStore.CountAlive(ctx, dag.Name)
if alive > 0 {
items, _ := m.dagRunStore.ListStatuses(
ctx, models.WithName(dag.Name), models.WithStatuses([]scheduler.Status{scheduler.StatusRunning}),
)
if len(items) > 0 {
return *items[0], nil
}
}

// Find the latest status by name
attempt, err := m.dagRunStore.LatestAttempt(ctx, dag.Name)
Expand All @@ -328,47 +341,46 @@ func (m *Manager) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (models
}

// Read the latest status
latestStatus, err = attempt.ReadStatus(ctx)
status, err = attempt.ReadStatus(ctx)
if err != nil {
goto handleError
}

// If the DAG is running, query the current status
if latestStatus.Status == scheduler.StatusRunning {
currentStatus, err := m.currentStatus(ctx, dag, latestStatus.DAGRunID)
if err == nil {
return *currentStatus, nil
} else {
logger.Debug(ctx, "Failed to get current status from socket", "error", err)
if status.Status == scheduler.StatusRunning {
dag, err = attempt.ReadDAG(ctx)
if err != nil {
currentStatus, err := m.currentStatus(ctx, dag, status.DAGRunID)
if err == nil {
status = currentStatus
} else {
logger.Debug(ctx, "Failed to get current status from socket", "error", err)
}
}
}

// If querying the current status fails, ensure if the status is running,
if latestStatus.Status == scheduler.StatusRunning {
if status.Status == scheduler.StatusRunning {
// Check the PID is still alive
pid := int(latestStatus.PID)
pid := int(status.PID)
if pid > 0 {
_, err := os.FindProcess(pid)
if err != nil {
// If we cannot find the process, mark the status as error
latestStatus.Status = scheduler.StatusError
status.Status = scheduler.StatusError
logger.Warn(ctx, "No PID set for running status, marking status as error")
}
}
}

return *latestStatus, nil
return *status, nil

handleError:

// If the latest status is not found, return the default status
ret := models.InitialStatus(dag)
if errors.Is(err, models.ErrNoStatusData) {
// No status for today
return ret, nil
}

return ret, err
return ret, nil
}

// ListRecentStatus retrieves the n most recent statuses for a DAG by name.
Expand Down
90 changes: 0 additions & 90 deletions internal/frontend/api/v2/dags.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,23 +391,6 @@ func (a *API) ExecuteDAG(ctx context.Context, request api.ExecuteDAGRequestObjec
}
}

status, err := a.dagRunMgr.GetLatestStatus(ctx, dag)
if err != nil {
return nil, &Error{
HTTPStatus: http.StatusNotFound,
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG %s not found", request.FileName),
}
}

if status.Status == scheduler.StatusRunning {
return nil, &Error{
HTTPStatus: http.StatusBadRequest,
Code: api.ErrorCodeAlreadyRunning,
Message: "DAG is already running",
}
}

dagRunId := valueOf(request.Body.DagRunId)
if dagRunId == "" {
var err error
Expand Down Expand Up @@ -564,79 +547,6 @@ waitLoop:
return nil
}

func (a *API) TerminateDAGDAGRun(ctx context.Context, request api.TerminateDAGDAGRunRequestObject) (api.TerminateDAGDAGRunResponseObject, error) {
if err := a.isAllowed(ctx, config.PermissionRunDAGs); err != nil {
return nil, err
}

dag, err := a.dagStore.GetMetadata(ctx, request.FileName)
if err != nil {
return nil, &Error{
HTTPStatus: http.StatusNotFound,
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG %s not found", request.FileName),
}
}

status, err := a.dagRunMgr.GetLatestStatus(ctx, dag)
if err != nil {
return nil, &Error{
HTTPStatus: http.StatusNotFound,
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG %s not found", request.FileName),
}
}
if status.Status != scheduler.StatusRunning {
return nil, &Error{
HTTPStatus: http.StatusBadRequest,
Code: api.ErrorCodeNotRunning,
Message: "DAG is not running",
}
}
if err := a.dagRunMgr.Stop(ctx, dag, status.DAGRunID); err != nil {
return nil, fmt.Errorf("error stopping DAG: %w", err)
}
return api.TerminateDAGDAGRun200Response{}, nil
}

func (a *API) RetryDAGDAGRun(ctx context.Context, request api.RetryDAGDAGRunRequestObject) (api.RetryDAGDAGRunResponseObject, error) {
if err := a.isAllowed(ctx, config.PermissionRunDAGs); err != nil {
return nil, err
}

dag, err := a.dagStore.GetMetadata(ctx, request.FileName)
if err != nil {
return nil, &Error{
HTTPStatus: http.StatusNotFound,
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG %s not found", request.FileName),
}
}

status, err := a.dagRunMgr.GetLatestStatus(ctx, dag)
if err != nil {
return nil, &Error{
HTTPStatus: http.StatusNotFound,
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG %s not found", request.FileName),
}
}

if status.Status == scheduler.StatusRunning {
return nil, &Error{
HTTPStatus: http.StatusBadRequest,
Code: api.ErrorCodeAlreadyRunning,
Message: "DAG is already running",
}
}

if err := a.dagRunMgr.RetryDAGRun(ctx, dag, request.Body.DagRunId); err != nil {
return nil, fmt.Errorf("error retrying DAG: %w", err)
}

return api.RetryDAGDAGRun200Response{}, nil
}

func (a *API) UpdateDAGSuspensionState(ctx context.Context, request api.UpdateDAGSuspensionStateRequestObject) (api.UpdateDAGSuspensionStateResponseObject, error) {
if err := a.isAllowed(ctx, config.PermissionRunDAGs); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func setupTest(t *testing.T) testHelper {
ps := localproc.New(cfg.Paths.ProcDir)
qs := localqueue.New(cfg.Paths.QueueDir)

drm := dagrun.New(drs, cfg.Paths.Executable, cfg.Global.WorkDir)
drm := dagrun.New(drs, ps, cfg.Paths.Executable, cfg.Global.WorkDir)
em := scheduler.NewEntryReader(testdataDir, ds, drm, "", "")

return testHelper{
Expand Down
5 changes: 4 additions & 1 deletion internal/test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func Setup(t *testing.T, opts ...HelperOption) Helper {
// Set the log level to debug
_ = os.Setenv("DEBUG", "true")

// Disable the DAG run queue for tests
_ = os.Setenv("DISABLE_DAG_RUN_QUEUE", "true")

random := uuid.New().String()
tmpDir := fileutil.MustTempDir(fmt.Sprintf("dagu-test-%s", random))
require.NoError(t, os.Setenv("DAGU_HOME", tmpDir))
Expand All @@ -99,7 +102,7 @@ func Setup(t *testing.T, opts ...HelperOption) Helper {
runStore := localdagrun.New(cfg.Paths.DAGRunsDir)
procStore := localproc.New(cfg.Paths.ProcDir)

drm := dagrun.New(runStore, cfg.Paths.Executable, cfg.Global.WorkDir)
drm := dagrun.New(runStore, procStore, cfg.Paths.Executable, cfg.Global.WorkDir)

helper := Helper{
Context: createDefaultContext(),
Expand Down
Loading
Loading
0