From 1455ec341244b591b9f71aea1be5a3e75345d3c9 Mon Sep 17 00:00:00 2001 From: yohamta Date: Wed, 27 Apr 2022 12:12:37 +0900 Subject: [PATCH 1/2] Fix header nav type --- internal/admin/handlers/web/templates/base.gohtml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/admin/handlers/web/templates/base.gohtml b/internal/admin/handlers/web/templates/base.gohtml index 0a5bddff6..2070e3c82 100644 --- a/internal/admin/handlers/web/templates/base.gohtml +++ b/internal/admin/handlers/web/templates/base.gohtml @@ -82,7 +82,7 @@ From 1a7e9afc18d3842adaffcdd9b618d9cb4c6b84e0 Mon Sep 17 00:00:00 2001 From: yohamta Date: Wed, 27 Apr 2022 13:51:24 +0900 Subject: [PATCH 2/2] Add function to update status --- internal/admin/handlers/dag.go | 79 ++++++++++++++++++ .../admin/handlers/web/templates/dag.gohtml | 80 ++++++++++++++++++- internal/controller/controller.go | 50 ++++++++++++ internal/controller/controller_test.go | 30 +++++++ internal/database/database.go | 3 + tests/testdata/controller_update_status.yaml | 4 + 6 files changed, 243 insertions(+), 3 deletions(-) create mode 100644 tests/testdata/controller_update_status.yaml diff --git a/internal/admin/handlers/dag.go b/internal/admin/handlers/dag.go index f3cfcb6bc..840f02b79 100644 --- a/internal/admin/handlers/dag.go +++ b/internal/admin/handlers/dag.go @@ -115,10 +115,12 @@ func HandleGetDAG(hc *DAGHandlerConfig) http.HandlerFunc { switch params.Tab { case DAG_TabType_Status: data.Graph = models.StepGraph(dag.Status.Nodes, params.Tab != DAG_TabType_Config) + case DAG_TabType_Config: steps := models.FromSteps(dag.Config.Steps) data.Graph = models.StepGraph(steps, params.Tab != DAG_TabType_Config) data.Definition, _ = config.ReadConfig(path.Join(hc.DAGsDir, params.Group, cfg)) + case DAG_TabType_History: logs, err := controller.New(dag.Config).GetStatusHist(30) if err != nil { @@ -126,6 +128,7 @@ func HandleGetDAG(hc *DAGHandlerConfig) http.HandlerFunc { return } data.LogData = buildLog(logs) + case DAG_TabType_StepLog: if isJsonRequest(r) { data.StepLog, err = readStepLog(c, params.File, params.Step, hc.LogEncodingCharset) @@ -134,6 +137,7 @@ func HandleGetDAG(hc *DAGHandlerConfig) http.HandlerFunc { return } } + case DAG_TabType_ScLog: if isJsonRequest(r) { data.ScLog, err = readSchedulerLog(c, params.File) @@ -142,6 +146,7 @@ func HandleGetDAG(hc *DAGHandlerConfig) http.HandlerFunc { return } } + default: } @@ -169,6 +174,7 @@ func HandlePostDAGAction(hc *PostDAGHandlerConfig) http.HandlerFunc { action := r.FormValue("action") group := r.FormValue("group") reqId := r.FormValue("request-id") + step := r.FormValue("step") cfg, err := getPathParameter(r) if err != nil { @@ -197,6 +203,7 @@ func HandlePostDAGAction(hc *PostDAGHandlerConfig) http.HandlerFunc { w.Write([]byte(err.Error())) return } + case "stop": if dag.Status.Status != scheduler.SchedulerStatus_Running { w.WriteHeader(http.StatusBadRequest) @@ -209,6 +216,7 @@ func HandlePostDAGAction(hc *PostDAGHandlerConfig) http.HandlerFunc { w.Write([]byte(err.Error())) return } + case "retry": if reqId == "" { w.WriteHeader(http.StatusBadRequest) @@ -221,6 +229,57 @@ func HandlePostDAGAction(hc *PostDAGHandlerConfig) http.HandlerFunc { w.Write([]byte(err.Error())) return } + + case "mark-success": + if dag.Status.Status == scheduler.SchedulerStatus_Running { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("DAG is running.")) + return + } + if reqId == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("request-id is required.")) + return + } + if step == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("step is required.")) + return + } + + err = updateStatus(c, reqId, step, scheduler.NodeStatus_Success) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + return + + case "mark-failed": + if dag.Status.Status == scheduler.SchedulerStatus_Running { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("DAG is running.")) + return + } + if reqId == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("request-id is required.")) + return + } + if step == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("step is required.")) + return + } + + err = updateStatus(c, reqId, step, scheduler.NodeStatus_Error) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + default: encodeError(w, errInvalidArgs) return @@ -230,6 +289,26 @@ func HandlePostDAGAction(hc *PostDAGHandlerConfig) http.HandlerFunc { } } +func updateStatus(c controller.Controller, reqId, step string, to scheduler.NodeStatus) error { + status, err := c.GetStatusByRequestId(reqId) + if err != nil { + return err + } + found := false + for i := range status.Nodes { + if status.Nodes[i].Step.Name == step { + status.Nodes[i].Status = to + status.Nodes[i].StatusText = to.String() + found = true + break + } + } + if !found { + return fmt.Errorf("step %s not found", step) + } + return c.UpdateStatus(status) +} + func readSchedulerLog(c controller.Controller, file string) (*schedulerLog, error) { logFile := "" if file == "" { diff --git a/internal/admin/handlers/web/templates/dag.gohtml b/internal/admin/handlers/web/templates/dag.gohtml index e715fb202..14cec8a59 100644 --- a/internal/admin/handlers/web/templates/dag.gohtml +++ b/internal/admin/handlers/web/templates/dag.gohtml @@ -507,6 +507,27 @@ {}, ] function NodeTable({ nodes, file = "", dag }) { + const [modal, setModal] = React.useState(false); + const [current, setCurrent] = React.useState(null); + const requireModal = (step) => { + if (dag.Status.Status == SCHEDULER_STATUS__RUNNING || dag.Status.Status == SCHEDULER_STATUS__NONE) { + return; + } + setCurrent(step); + setModal(true); + } + const dismissModal = () => { setModal(false); } + React.useEffect(() => { + document.addEventListener('keydown', (event) => { + const e = event || window.event; + if (e.keyCode === 27) { // Escape key + setModal(false); + } + }); + return () => { + document.removeEventListener('keydown'); + }; + }, []) const tableStyle = { "tableLayout": "fixed", "wordWrap": "break-word", @@ -515,6 +536,10 @@ "overflow-x": "auto", }; const styles = stepTabColStyles; + const modalbuttonStyle = {}; + const modalStyle = { + display: modal ? "flex" : "none", + } let i = 0; if (!nodes.length) { return null; @@ -537,9 +562,49 @@ - {nodes.map((n, idx) => )} + {nodes.map((n, idx) => )} + + {current ? ( +
+
+
+
+

Update status of "{current.Name}"

+ +
+
+
+
+ + + + +
+
+ + + + +
+
+
+
+ +
+
+
+ + ) : null} ) } @@ -550,8 +615,15 @@ ) } - function NodeTableRow({ rownum, node, file }) { + function NodeTableRow({ rownum, node, file, onRequireModal }) { const url = encodeURI("?t=" + TAB_ID__STEPLOG + "&group={{.Group}}&file=" + file + "&step=" + node.Step.Name) + const buttonStyle = { + "margin": "0px", + "padding": "0px", + "border": "0px", + "background": "none", + "outline": "none", + }; return ( {rownum} @@ -561,7 +633,9 @@ {node.Step.Args ? node.Step.Args.join(" ") : ""} {node.StartedAt} {node.FinishedAt} - {node.StatusText} + {node.Error} {node.Log} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 88acdc391..2f071c336 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -25,7 +25,9 @@ type Controller interface { Retry(bin string, workDir string, reqId string) error GetStatus() (*models.Status, error) GetLastStatus() (*models.Status, error) + GetStatusByRequestId(requestId string) (*models.Status, error) GetStatusHist(n int) ([]*models.StatusFile, error) + UpdateStatus(*models.Status) error } func GetDAGs(dir string) (dags []*DAG, errs []string, err error) { @@ -151,6 +153,15 @@ func (s *controller) GetLastStatus() (*models.Status, error) { return status, nil } +func (s *controller) GetStatusByRequestId(requestId string) (*models.Status, error) { + db := database.New(database.DefaultConfig()) + ret, err := db.FindByRequestId(s.cfg.ConfigPath, requestId) + if err != nil { + return nil, err + } + return ret.Status, nil +} + func (s *controller) GetStatusHist(n int) ([]*models.StatusFile, error) { db := database.New(database.DefaultConfig()) ret, err := db.ReadStatusHist(s.cfg.ConfigPath, n) @@ -160,6 +171,45 @@ func (s *controller) GetStatusHist(n int) ([]*models.StatusFile, error) { return ret, nil } +func (s *controller) UpdateStatus(status *models.Status) error { + client := sock.Client{Addr: sock.GetSockAddr(s.cfg.ConfigPath)} + res, err := client.Request("GET", "/status") + if err != nil { + if errors.Is(err, sock.ErrTimeout) { + return err + } + } + if err == nil { + ss, err := models.StatusFromJson(res) + if err != nil { + return err + } + if ss.RequestId == status.RequestId && ss.Status == scheduler.SchedulerStatus_Running { + return fmt.Errorf("the DAG is running") + } + } + db := database.New(database.DefaultConfig()) + toUpdate, err := db.FindByRequestId(s.cfg.ConfigPath, status.RequestId) + if err != nil { + return err + } + w, err := db.NewWriterFor(s.cfg.ConfigPath, toUpdate.File) + if err != nil { + return err + } + if err := w.Open(); err != nil { + return err + } + defer w.Close() + if err := w.Write(status); err != nil { + return err + } + if err := w.Close(); err != nil { + return err + } + return nil +} + func defaultStatus(cfg *config.Config) *models.Status { return models.NewStatus( cfg, diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 3758b7bcc..88d6bfc10 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -85,3 +85,33 @@ func TestGetDAGList(t *testing.T) { matches, _ := filepath.Glob(path.Join(testsDir, "*.yaml")) assert.Equal(t, len(matches), len(dags)) } + +func TestUpdateStatus(t *testing.T) { + file := testConfig("controller_update_status.yaml") + + dag, err := controller.FromConfig(file) + require.NoError(t, err) + + a := agent.Agent{Config: &agent.Config{ + DAG: dag.Config, + }} + + err = a.Run() + require.NoError(t, err) + + st, err := controller.New(dag.Config).GetLastStatus() + require.NoError(t, err) + + require.Equal(t, 1, len(st.Nodes)) + require.Equal(t, scheduler.NodeStatus_Success, st.Nodes[0].Status) + + st.Nodes[0].Status = scheduler.NodeStatus_Error + err = controller.New(dag.Config).UpdateStatus(st) + require.NoError(t, err) + + updated, err := controller.New(dag.Config).GetLastStatus() + require.NoError(t, err) + + require.Equal(t, 1, len(st.Nodes)) + require.Equal(t, scheduler.NodeStatus_Error, updated.Nodes[0].Status) +} diff --git a/internal/database/database.go b/internal/database/database.go index c2543684a..16f3ee283 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -116,6 +116,9 @@ func (db *Database) ReadStatusToday(configPath string) (*models.Status, error) { } func (db *Database) FindByRequestId(configPath string, requestId string) (*models.StatusFile, error) { + if requestId == "" { + return nil, fmt.Errorf("requestId is empty") + } pattern := db.pattern(configPath) + "*.dat" matches, err := filepath.Glob(pattern) if err != nil { diff --git a/tests/testdata/controller_update_status.yaml b/tests/testdata/controller_update_status.yaml new file mode 100644 index 000000000..3b54fbb68 --- /dev/null +++ b/tests/testdata/controller_update_status.yaml @@ -0,0 +1,4 @@ +name: "update status" +steps: + - name: "1" + command: "true" \ No newline at end of file