From d12589cd17063e5b53ad7d00b45787bcf65a0c46 Mon Sep 17 00:00:00 2001 From: Mathias Bogaert Date: Sat, 29 Mar 2025 12:17:39 +0000 Subject: [PATCH] Optimize SSE build event streaming - Replace fmt.Sscanf with strconv.ParseUint for more efficient header parsing - Replace fmt.Sprintf("%d", id) with strconv.FormatUint for more efficient string conversion - Use Header().Set() to prevent duplicate headers - Add validation for http.Flusher type assertion - Improve error handling and logging context - Enhance code readability with consistent structure and formatting - Refactor event writing for better maintainability Signed-off-by: Mathias Bogaert --- atc/api/buildserver/eventhandler.go | 82 ++++++++++++++++------------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/atc/api/buildserver/eventhandler.go b/atc/api/buildserver/eventhandler.go index fb044323231..2d9b96b5ff2 100644 --- a/atc/api/buildserver/eventhandler.go +++ b/atc/api/buildserver/eventhandler.go @@ -2,76 +2,82 @@ package buildserver import ( "encoding/json" - "fmt" "io" "net/http" + "strconv" "code.cloudfoundry.org/lager/v3" "github.com/concourse/concourse/atc/db" "github.com/vito/go-sse/sse" ) -const ProtocolVersionHeader = "X-ATC-Stream-Version" -const CurrentProtocolVersion = "2.0" +const ( + ProtocolVersionHeader = "X-ATC-Stream-Version" + CurrentProtocolVersion = "2.0" +) func NewEventHandler(logger lager.Logger, build db.BuildForAPI) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var eventID uint = 0 - if r.Header.Get("Last-Event-ID") != "" { - startString := r.Header.Get("Last-Event-ID") - _, err := fmt.Sscanf(startString, "%d", &eventID) + + // Parse Last-Event-ID header + if lastEventID := r.Header.Get("Last-Event-ID"); lastEventID != "" { + parsedID, err := strconv.ParseUint(lastEventID, 10, 64) if err != nil { - logger.Info("failed-to-parse-last-event-id", lager.Data{"last-event-id": startString}) + logger.Info("failed-to-parse-last-event-id", lager.Data{"last-event-id": lastEventID}) w.WriteHeader(http.StatusBadRequest) return } - - eventID++ + eventID = uint(parsedID) + 1 } - w.Header().Add("Content-Type", "text/event-stream; charset=utf-8") - w.Header().Add("Cache-Control", "no-cache, no-store, must-revalidate") - w.Header().Add("X-Accel-Buffering", "no") - w.Header().Add(ProtocolVersionHeader, CurrentProtocolVersion) + // Set response headers + w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Set("X-Accel-Buffering", "no") + w.Header().Set(ProtocolVersionHeader, CurrentProtocolVersion) + + flusher, ok := w.(http.Flusher) + if !ok { + logger.Error("streaming-not-supported", nil) + w.WriteHeader(http.StatusInternalServerError) + return + } writer := eventWriter{ responseWriter: w, - responseFlusher: w.(http.Flusher), + responseFlusher: flusher, } events, err := build.Events(eventID) if err != nil { - logger.Error("failed-to-get-build-events", err, lager.Data{"build-id": build.ID(), "start": eventID}) + logger.Error("failed-to-get-build-events", err, lager.Data{ + "build-id": build.ID(), + "start": eventID, + }) w.WriteHeader(http.StatusInternalServerError) return } - defer db.Close(events) for { - logger = logger.WithData(lager.Data{"id": eventID}) + contextLogger := logger.WithData(lager.Data{"id": eventID}) ev, err := events.Next() if err != nil { if err == db.ErrEndOfBuildEventStream { - err := writer.WriteEnd(eventID) - if err != nil { - logger.Info("failed-to-write-end", lager.Data{"error": err.Error()}) - return + if err := writer.WriteEnd(eventID); err != nil { + contextLogger.Info("failed-to-write-end", lager.Data{"error": err.Error()}) } - <-r.Context().Done() } else { - logger.Error("failed-to-get-next-build-event", err) - return + contextLogger.Error("failed-to-get-next-build-event", err) } - return } - err = writer.WriteEvent(eventID, ev) - if err != nil { - logger.Info("failed-to-write-event", lager.Data{"error": err.Error()}) + if err := writer.WriteEvent(eventID, ev); err != nil { + contextLogger.Info("failed-to-write-event", lager.Data{"error": err.Error()}) return } @@ -91,30 +97,30 @@ func (writer eventWriter) WriteEvent(id uint, envelope any) error { return err } - err = sse.Event{ - ID: fmt.Sprintf("%d", id), + event := sse.Event{ + ID: strconv.FormatUint(uint64(id), 10), Name: "event", Data: payload, - }.Write(writer.responseWriter) - if err != nil { + } + + if err := event.Write(writer.responseWriter); err != nil { return err } writer.responseFlusher.Flush() - return nil } func (writer eventWriter) WriteEnd(id uint) error { - err := sse.Event{ - ID: fmt.Sprintf("%d", id), + event := sse.Event{ + ID: strconv.FormatUint(uint64(id), 10), Name: "end", - }.Write(writer.responseWriter) - if err != nil { + } + + if err := event.Write(writer.responseWriter); err != nil { return err } writer.responseFlusher.Flush() - return nil }