diff --git a/cmd/stop.go b/cmd/stop.go index b9893f306..aec196f2a 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -42,7 +42,7 @@ func stop(cfg *config.Config) error { log.Printf("DAG is not running.") return nil } - syscall.Kill(int(status.Pid), syscall.SIGINT) + syscall.Kill(int(status.Pid), syscall.SIGTERM) for { time.Sleep(time.Second * 3) s, err := controller.New(cfg).GetStatus() diff --git a/internal/admin/handlers/dag.go b/internal/admin/handlers/dag.go index 5eb6e0e45..552da5683 100644 --- a/internal/admin/handlers/dag.go +++ b/internal/admin/handlers/dag.go @@ -450,7 +450,7 @@ func buildLog(logs []*models.StatusFile) *Log { } func getPathParameter(r *http.Request) (string, error) { - re := regexp.MustCompile("/dags/([^/\\?]+)/?$") + re := regexp.MustCompile(`/dags/([^/\?]+)/?$`) m := re.FindStringSubmatch(r.URL.Path) if len(m) < 2 { return "", fmt.Errorf("invalid URL") diff --git a/internal/agent/agent.go b/internal/agent/agent.go index eb85d5624..8354ccdc8 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -101,7 +101,7 @@ func (a *Agent) Status() *models.Status { } // Signal sends the signal to the processes running -// if processes do not terminate for 60 seconds, +// if processes do not terminate for 120 seconds, // cancel all processes which will send signal -1 to the processes. func (a *Agent) Signal(sig os.Signal) { log.Printf("Sending %s signal to running child processes.", sig) @@ -112,7 +112,7 @@ func (a *Agent) Signal(sig os.Signal) { select { case <-done: log.Printf("All child processes have been terminated.") - case <-time.After(time.Second * 60): + case <-time.After(time.Second * 120): a.Cancel(sig) default: log.Printf("Waiting for child processes to exit...") @@ -121,7 +121,7 @@ func (a *Agent) Signal(sig os.Signal) { } // Cancel sends signal -1 to all child processes. -// then it waits another 20 seconds before therminating the +// then it waits another 60 seconds before therminating the // parent process. func (a *Agent) Cancel(sig os.Signal) { log.Printf("Sending -1 signal to running child processes.") @@ -132,7 +132,7 @@ func (a *Agent) Cancel(sig os.Signal) { select { case <-done: log.Printf("All child processes have been terminated.") - case <-time.After(time.Second * 20): + case <-time.After(time.Second * 60): log.Printf("Terminating the controller process.") a.Kill(done) default: @@ -154,7 +154,7 @@ func (a *Agent) init() { &scheduler.Config{ LogDir: path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_")), MaxActiveRuns: a.DAG.MaxActiveRuns, - DelaySec: a.DAG.DelaySec, + Delay: a.DAG.Delay, Dry: a.Dry, OnExit: a.DAG.HandlerOn.Exit, OnSuccess: a.DAG.HandlerOn.Success, diff --git a/internal/config/config.go b/internal/config/config.go index 3bf08e44c..193df6331 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,7 +26,7 @@ type Config struct { ErrorMail *MailConfig InfoMail *MailConfig Smtp *SmtpConfig - DelaySec time.Duration + Delay time.Duration HistRetentionDays int Preconditions []*Condition MaxActiveRuns int @@ -135,7 +135,7 @@ func buildFromDefinition(def *configDefinition, file string, globalConfig *Confi c.Description = def.Description c.MailOn.Failure = def.MailOn.Failure c.MailOn.Success = def.MailOn.Success - c.DelaySec = time.Second * time.Duration(def.DelaySec) + c.Delay = time.Second * time.Duration(def.DelaySec) if opts != nil && opts.headOnly { return c, nil @@ -308,9 +308,11 @@ func buildStep(variables []string, def *stepDef) (*Step, error) { Limit: def.RetryPolicy.Limit, } } + if def.RepeatPolicy != nil { + step.RepeatPolicy.Repeat = def.RepeatPolicy.Repeat + step.RepeatPolicy.Interval = time.Second * time.Duration(def.RepeatPolicy.IntervalSec) + } step.MailOnError = def.MailOnError - step.Repeat = def.Repeat - step.RepeatInterval = time.Second * time.Duration(def.RepeatIntervalSec) step.Preconditions = loadPreCondition(def.Preconditions) return step, nil } diff --git a/internal/config/definition.go b/internal/config/definition.go index d8a1bf035..864df491c 100644 --- a/internal/config/definition.go +++ b/internal/config/definition.go @@ -31,17 +31,16 @@ type handerOnDef struct { } type stepDef struct { - Name string - Description string - Dir string - Command string - Depends []string - ContinueOn *continueOnDef - RetryPolicy *retryPolicyDef - MailOnError bool - Repeat bool - RepeatIntervalSec int - Preconditions []*conditionDef + Name string + Description string + Dir string + Command string + Depends []string + ContinueOn *continueOnDef + RetryPolicy *retryPolicyDef + RepeatPolicy *repeatPolicyDef + MailOnError bool + Preconditions []*conditionDef } type continueOnDef struct { @@ -49,6 +48,11 @@ type continueOnDef struct { Skipped bool } +type repeatPolicyDef struct { + Repeat bool + IntervalSec int +} + type retryPolicyDef struct { Limit int } diff --git a/internal/config/loader_test.go b/internal/config/loader_test.go index 26c61246f..1310ca9f6 100644 --- a/internal/config/loader_test.go +++ b/internal/config/loader_test.go @@ -40,6 +40,10 @@ func TestLoadConfig(t *testing.T) { RetryPolicy: &config.RetryPolicy{ Limit: 2, }, + RepeatPolicy: config.RepeatPolicy{ + Repeat: true, + Interval: time.Second * 10, + }, }, { Name: "2", @@ -90,7 +94,7 @@ func TestLoadConfig(t *testing.T) { Failure: true, Success: true, }, - DelaySec: time.Second * 1, + Delay: time.Second * 1, MaxActiveRuns: 1, Params: []string{"param1", "param2"}, DefaultParams: "param1 param2", diff --git a/internal/config/step.go b/internal/config/step.go index c9aeaa043..d73496e85 100644 --- a/internal/config/step.go +++ b/internal/config/step.go @@ -7,25 +7,29 @@ import ( ) type Step struct { - Name string - Description string - Variables []string - Dir string - Command string - Args []string - Depends []string - ContinueOn ContinueOn - RetryPolicy *RetryPolicy - MailOnError bool - Repeat bool - RepeatInterval time.Duration - Preconditions []*Condition + Name string + Description string + Variables []string + Dir string + Command string + Args []string + Depends []string + ContinueOn ContinueOn + RetryPolicy *RetryPolicy + RepeatPolicy RepeatPolicy + MailOnError bool + Preconditions []*Condition } type RetryPolicy struct { Limit int } +type RepeatPolicy struct { + Repeat bool + Interval time.Duration +} + type ContinueOn struct { Failure bool Skipped bool diff --git a/internal/models/status_test.go b/internal/models/status_test.go index df4de5366..404046e33 100644 --- a/internal/models/status_test.go +++ b/internal/models/status_test.go @@ -33,14 +33,14 @@ func TestStatusSerialization(t *testing.T) { Dir: "dir", Command: "echo 1", Args: []string{}, Depends: []string{}, ContinueOn: config.ContinueOn{}, RetryPolicy: &config.RetryPolicy{}, MailOnError: false, - Repeat: false, RepeatInterval: 0, Preconditions: []*config.Condition{}, + RepeatPolicy: config.RepeatPolicy{}, Preconditions: []*config.Condition{}, }, }, MailOn: config.MailOn{}, ErrorMail: &config.MailConfig{}, InfoMail: &config.MailConfig{}, Smtp: &config.SmtpConfig{}, - DelaySec: 0, + Delay: 0, HistRetentionDays: 0, Preconditions: []*config.Condition{}, MaxActiveRuns: 0, diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index 3c9ec363e..0506c2760 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -71,9 +71,7 @@ func (n *Node) Execute() error { cmd := exec.CommandContext(ctx, n.Command, n.Args...) n.cmd = cmd cmd.Dir = n.Dir - for _, v := range n.Variables { - cmd.Env = append(cmd.Env, v) - } + cmd.Env = append(cmd.Env, n.Variables...) if n.logWriter != nil { cmd.Stdout = n.logWriter diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 269c6e312..2ff4f3cff 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -52,7 +52,7 @@ type Scheduler struct { type Config struct { LogDir string MaxActiveRuns int - DelaySec time.Duration + Delay time.Duration Dry bool OnExit *config.Step OnSuccess *config.Step @@ -134,18 +134,22 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error { // nothing to do case NodeStatus_Error: sc.lastError = err - fallthrough - default: - if done != nil { - done <- node - } } - return } - if node.Repeat { + if node.ReadStatus() != NodeStatus_Cancel { node.incDoneCount() - time.Sleep(node.RepeatInterval) - continue + } + if node.RepeatPolicy.Repeat { + if err == nil || node.ContinueOn.Failure { + time.Sleep(node.RepeatPolicy.Interval) + continue + } + } + if err != nil { + if done != nil { + done <- node + } + return } break } @@ -155,7 +159,7 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error { } }(node) - time.Sleep(sc.DelaySec) + time.Sleep(sc.Delay) } time.Sleep(sc.pause) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 9e6ef6a4e..f2a228e9b 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -216,12 +216,10 @@ func TestSchedulerRetrySuccess(t *testing.T) { defer os.Remove(tmpDir) go func() { - select { - case <-time.After(time.Millisecond * 300): - f, err := os.Create(tmpFile) - require.NoError(t, err) - f.Close() - } + <-time.After(time.Millisecond * 300) + f, err := os.Create(tmpFile) + require.NoError(t, err) + f.Close() }() g, sc, err := testSchedule(t, @@ -402,6 +400,58 @@ func TestSchedulerOnFailure(t *testing.T) { assert.Equal(t, sc.HanderNode(constants.OnCancel).ReadStatus(), scheduler.NodeStatus_None) } +func TestRepeat(t *testing.T) { + g, _ := scheduler.NewExecutionGraph( + &config.Step{ + Name: "1", + Command: "sleep", + Args: []string{"1"}, + RepeatPolicy: config.RepeatPolicy{ + Repeat: true, + Interval: time.Millisecond * 300, + }, + }, + ) + sc := scheduler.New(&scheduler.Config{}) + + done := make(chan bool) + go func() { + <-time.After(time.Millisecond * 3000) + sc.Cancel(g, done) + }() + + err := sc.Schedule(g, nil) + <-done // Wait for canceling finished + require.NoError(t, err) + + nodes := g.Nodes() + + assert.Equal(t, sc.Status(g), scheduler.SchedulerStatus_Cancel) + assert.Equal(t, nodes[0].Status, scheduler.NodeStatus_Cancel) + assert.Equal(t, nodes[0].DoneCount, 2) +} + +func TestRepeatFail(t *testing.T) { + g, _ := scheduler.NewExecutionGraph( + &config.Step{ + Name: "1", + Command: testCommandFail, + RepeatPolicy: config.RepeatPolicy{ + Repeat: true, + Interval: time.Millisecond * 300, + }, + }, + ) + sc := scheduler.New(&scheduler.Config{}) + err := sc.Schedule(g, nil) + require.Error(t, err) + + nodes := g.Nodes() + assert.Equal(t, sc.Status(g), scheduler.SchedulerStatus_Error) + assert.Equal(t, nodes[0].Status, scheduler.NodeStatus_Error) + assert.Equal(t, nodes[0].DoneCount, 1) +} + func testSchedule(t *testing.T, steps ...*config.Step) ( *scheduler.ExecutionGraph, *scheduler.Scheduler, error, ) { diff --git a/tests/testdata/config_load.yaml b/tests/testdata/config_load.yaml index cfb122604..9107794b0 100644 --- a/tests/testdata/config_load.yaml +++ b/tests/testdata/config_load.yaml @@ -44,6 +44,9 @@ steps: skipped: true retryPolicy: limit: 2 + repeatPolicy: + repeat: true + intervalSec: 10 preconditions: - condition: "`echo test`" expected: test