8000 add support for prometheus metrics listener in engine by sipsma · Pull Request #10555 · dagger/dagger · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

add support for prometheus metrics listener in engine #10555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/engine/.dagger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ replace (
)

require (
dagger.io/dagger v0.18.10
github.com/99designs/gqlgen v0.17.74
github.com/Khan/genqlient v0.8.1
github.com/containerd/platforms v1.0.0-rc.1
Expand All @@ -39,12 +40,14 @@ require (
)

require (
github.com/adrg/xdg v0.5.3 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sosodev/duration v1.3.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions cmd/engine/.dagger/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
dagger.io/dagger v0.18.10 h1:Ibyz5LqxjjEHfLMlaU9PJ3xt3ju7p29RWy0lVfvSNU0=
dagger.io/dagger v0.18.10/go.mod h1:VSj+2HMd/EnaCVt7gTY70p8LBW+oQDYjA1XTadr8vBE=
github.com/99designs/gqlgen v0.17.74 h1:1FuVtkXxOc87xpKio3f6sohREmec+Jvy86PcYOuwgWo=
github.com/99designs/gqlgen v0.17.74/go.mod h1:a+iR6mfRLNRp++kDpooFHiPWYiWX3Yu1BIilQRHgh10=
github.com/Khan/genqlient v0.8.1 h1:wtOCc8N9rNynRLXN3k3CnfzheCUNKBcvXmVv5zt6WCs=
github.com/Khan/genqlient v0.8.1/go.mod h1:R2G6DzjBvCbhjsEajfRjbWdVglSH/73kSivC9TLWVjU=
github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand All @@ -26,6 +30,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/moby/buildkit v0.21.0 h1:+z4vVqgt0spLrOSxi4DLedRbIh2gbNVlZ5q4rsnNp60=
github.com/moby/buildkit v0.21.0/go.mod h1:mBq0D44uCyz2PdX8T/qym5LBbkBO3GGv0wqgX9ABYYw=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down
9 changes: 9 additions & 0 deletions cmd/engine/.dagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (e *DaggerEngine) Service(
gpuSupport bool,
// +optional
sharedCache bool,
// +optional
metrics bool,
) (*dagger.Service, error) {
cacheVolumeName := "dagger-dev-engine-state"
if !sharedCache {
Expand All @@ -156,6 +158,7 @@ func (e *DaggerEngine) Service(
if err != nil {
return nil, err
}

devEngine = devEngine.
WithExposedPort(1234, dagger.ContainerWithExposedPortOpts{Protocol: dagger.NetworkProtocolTcp}).
WithMountedCache(distconsts.EngineDefaultStateDir, dag.CacheVolume(cacheVolumeName), dagger.ContainerWithMountedCacheOpts{
Expand All @@ -165,6 +168,12 @@ func (e *DaggerEngine) Service(
Sharing: dagger.CacheSharingModePrivate,
})

if metrics {
devEngine = devEngine.
WithEnvVariable("_EXPERIMENTAL_DAGGER_METRICS_ADDR", "0.0.0.0:9090").
WithEnvVariable("_EXPERIMENTAL_DAGGER_METRICS_CACHE_UPDATE_INTERVAL", "10s")
}

return devEngine.AsService(dagger.ContainerAsServiceOpts{
Args: []string{
"--addr", "tcp://0.0.0.0:1234",
Expand Down
7 changes: 7 additions & 0 deletions cmd/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ func main() { //nolint:gocyclo
}
defer srv.Close()

// start Prometheus metrics server if configured
if metricsAddr := os.Getenv("_EXPERIMENTAL_DAGGER_METRICS_ADDR"); metricsAddr != "" {
if err := setupMetricsServer(ctx, srv, metricsAddr); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
}

go logMetrics(context.Background(), bkcfg.Root, srv)
if bkcfg.Trace {
go logTraceMetrics(context.Background())
Expand Down
99 changes: 99 additions & 0 deletions cmd/engine/metrics.go
6D40
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"context"
"net/http"
"os"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/dagger/dagger/engine/server"
"github.com/dagger/dagger/engine/slog"
)

var (
connectedClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "dagger_connected_clients",
Help: "Number of currently connected clients",
})

localCacheTotalDiskSizeGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "dagger_local_cache_total_disk_size_bytes",
Help: "Total disk space consumed by the local cache in bytes",
})

localCacheEntriesGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "dagger_local_cache_entries",
Help: "Number of entries in the local cache",
})
)

// setupMetricsServer starts an HTTP server to expose Prometheus metrics
func setupMetricsServer(ctx context.Context, srv *server.Server, addr string) error {
if err := prometheus.Register(connectedClientsGauge); err != nil {
return err
}
if err := prometheus.Register(localCacheTotalDiskSizeGauge); err != nil {
return err
}
if err := prometheus.Register(localCacheEntriesGauge); err != nil {
return err
}

// Only update local cache metrics at most every 5 minutes to avoid excessive holding
// of buildkit's DiskUsage lock.
// Support an override of the default 5 minute interval; mainly used now so integ tests
// don't have to wait 5 minutes for the metrics to be updated.
cacheMetricsInterval := 5 * time.Minute
if intervalStr, ok := os.LookupEnv("_EXPERIMENTAL_DAGGER_METRICS_CACHE_UPDATE_INTERVAL"); ok {
if interval, err := time.ParseDuration(intervalStr); err == nil {
cacheMetricsInterval = interval
} else {
slog.Warn("invalid _EXPERIMENTAL_DAGGER_METRICS_CACHE_UPDATE_INTERVAL value, using default 5 minutes", "error", err)
}
}
go func() {
updateMetrics := func() {
entrySet, err := srv.EngineLocalCacheEntries(ctx)
if err == nil {
localCacheTotalDiskSizeGauge.Set(float64(entrySet.DiskSpaceBytes))
localCacheEntriesGauge.Set(float64(entrySet.EntryCount))
} else {
slog.Error("failed to get local cache entries for prometheus metrics", "error", err)
}
}

// do an initial update immediately
updateMetrics()

ticker := time.NewTicker(cacheMetricsInterval)
for {
select {
case <-ctx.Done():
slog.Info("metrics server context done, stopping cache metrics updates")
return
case <-ticker.C:
updateMetrics()
}
}
}()

// Set up HTTP server
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
connectedClientsGauge.Set(float64(srv.ConnectedClients()))

promhttp.Handler().ServeHTTP(w, r)
})

// Start server in a goroutine
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
slog.Error("metrics server failed", "error", err)
}
}()

slog.Info("metrics server started", "address", addr)
return nil
}
118 changes: 118 additions & 0 deletions core/integration/engine_test.go
10000
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package core
import (
"bytes"
"context"< F438 /td>
"crypto/rand"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"testing"
Expand All @@ -17,6 +19,7 @@ import (
bkconfig "github.com/moby/buildkit/cmd/buildkitd/config"
"github.com/moby/buildkit/identity"
"github.com/pelletier/go-toml"
"golang.org/x/sync/errgroup"

"dagger.io/dagger"
"github.com/dagger/dagger/engine"
Expand Down Expand Up @@ -765,3 +768,118 @@ func (EngineSuite) TestConcurrentCallContextCanceled(ctx context.Context, t *tes
t.Fatal("timed out waiting for errCh2")
}
}

func (EngineSuite) TestPrometheusMetrics(ctx context.Context, t *testctx.T) {
c := connect(ctx, t)

devEngineCtr := devEngineContainer(c, func(c *dagger.Container) *dagger.Container {
return c.
WithEnvVariable("_EXPERIMENTAL_DAGGER_METRICS_ADDR", "0.0.0.0:9090").
WithEnvVariable("_EXPERIMENTAL_DAGGER_METRICS_CACHE_UPDATE_INTERVAL", "3s").
WithExposedPort(9090, dagger.ContainerWithExposedPortOpts{
Protocol: dagger.NetworkProtocolTcp,
})
})
devEngine := devEngineContainerAsService(devEngineCtr)

clientCtr := engineClientContainer(ctx, t, c, devEngine)

var eg errgroup.Group
clientCtx, clientCancel := context.WithCancel(ctx)
t.Cleanup(clientCancel)
eg.Go(func() error {
_, err := clientCtr.
With(daggerNonNestedExec("listen")).
Sync(clientCtx)
if strings.Contains(err.Error(), "context canceled") {
return nil // expected, we cancel it later
}
if err != nil {
t.Logf("error running dagger listen: %v", err)
}
return err
})

var foundAll bool
for range 30 {
out, err := clientCtr.
WithExec([]string{"apk", "add", "curl"}).
WithEnvVariable("CACHEBUST", rand.Text()).
WithExec([]string{"sh", "-c", "curl -s http://dev-engine:9090/metrics"}).
Stdout(ctx)
if err != nil {
t.Logf("error fetching metrics: %v", err)
time.Sleep(1 * time.Second)
continue
}

// find the lines with metrics we care about testing
soughtMetrics := map[string]struct{}{
"dagger_connected_clients": {},
"dagger_local_cache_total_disk_size_bytes": {},
"dagger_local_cache_entries": {},
}
foundMetrics := map[string]int{}
for _, line := range strings.Split(out, "\n") {
line = strings.TrimSpace(line)

for metricName := range soughtMetrics {
numStr, found := strings.CutPrefix(line, metricName+" ")
if !found {
continue
}
num, err := strconv.Atoi(numStr)
require.NoError(t, err)

delete(soughtMetrics, metricName)
foundMetrics[metricName] = num
}

if len(soughtMetrics) == 0 {
break
}
}

if len(soughtMetrics) != 0 {
t.Logf("did not find all sought metrics in output: %v", soughtMetrics)
time.Sleep(1 * time.Second)
continue
}

// found everything, but validate values
validatedAll := true
for metricName, num := range foundMetrics {
switch metricName {
case "dagger_connected_clients":
if num != 1 {
t.Logf("expected dagger_connected_clients = 1, got %d", num)
validatedAll = false
}
case "dagger_local_cache_total_disk_size_bytes":
if num <= 0 {
t.Logf("expected dagger_local_cache_total_disk_size_bytes > 0, got %d", num)
validatedAll = false
}
case "dagger_local_cache_entries":
if num <= 0 {
t.Logf("expected dagger_local_cache_entries >= 0, got %d", num)
validatedAll = false
}
default:
t.Fatalf("unexpected metric %q found in output", metricName)
}
}

if validatedAll {
foundAll = true
break // everything found + validated, exit retry loop
}

// retry again in a second
time.Sleep(1 * time.Second)
}
require.True(t, foundAll, "did not find all expected metrics in output after 30 attempts")

clientCancel()
require.NoError(t, eg.Wait(), "error from client exec")
}
7 changes: 7 additions & 0 deletions engine/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,13 @@ func (srv *Server) Register(server *grpc.Server) {
controlapi.RegisterControlServer(server, srv)
}

// ConnectedClients returns the number of currently connected clients
func (srv *Server) ConnectedClients() int {
srv.daggerSessionsMu.RLock()
defer srv.daggerSessionsMu.RUnlock()
return len(srv.daggerSessions)
}

func (srv *Server) Locker() *locker.Locker {
return srv.locker
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ require (
github.com/pelletier/go-toml v1.9.5
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/procfs v0.16.1
github.com/psanford/memfs v0.0.0-20230130182539-4dbf7e3e865e
github.com/rs/cors v1.11.1
Expand Down Expand Up @@ -288,7 +289,6 @@ require (
github.com/pjbgf/sha1cd v0.3.2 // indirect
github.com/pkg/profile v1.7.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand Down
4 changes: 4 additions & 0 deletions modules/metrics/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/dagger.gen.go linguist-generated
/internal/dagger/** linguist-generated
/internal/querybuilder/** linguist-generated
/internal/telemetry/** linguist-generated
5 changes: 5 additions & 0 deletions modules/metrics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/dagger.gen.go
/internal/dagger
/internal/querybuilder
/internal/telemetry
/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: 1

datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: false
jsonData:
httpMethod: POST
Loading
0