8000 Added graceful shutdown timeout by almostinf · Pull Request #1825 · grafana/beyla · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added graceful shutdown timeout #1825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 16, 2025
1 change: 1 addition & 0 deletions docs/sources/configure/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The following sections explain the global configuration properties that apply to
| -------------------------------------------------------------- | ----------------------------------------------------------------------------- 10000 ---------------------------------------- | ------- | --------------------- |
| `executable_name`<br>`BEYLA_EXECUTABLE_NAME` | Selects the process to instrument by regular expression matching against the full executable path. | string | unset |
| `open_port`<br>`BEYLA_OPEN_PORT` | Selects a process to instrument by open ports. Accepts comma-separated lists of ports and port ranges. | string | unset |
| `shutdown_timeout`<br>`BEYLA_SHUTDOWN_TIMEOUT` | Sets the timeout for a graceful shutdown | string | "10s" |
| `service_name`<br>`BEYLA_SERVICE_NAME` | **Deprecated** Overrides the name of the instrumented service for metrics export. | string | see service discovery |
| `service_namespace`<br>`BEYLA_SERVICE_NAMESPACE` | **Deprecated** Assigns a namespace for the selected service. | string | see service discovery |
| `log_level`<br>`BEYLA_LOG_LEVEL` | Sets process logger verbosity. Valid values: `DEBUG`, `INFO`, `WARN`, `ERROR`. | string | `INFO` |
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
golang.org/x/arch v0.7.0
golang.org/x/mod v0.23.0
golang.org/x/net v0.37.0
golang.org/x/sync v0.12.0
golang.org/x/sys v0.31.0
google.golang.org/grpc v1.71.0
google.golang.org/protobuf v1.36.5
Expand Down
4 changes: 4 additions & 0 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
var DefaultConfig = Config{
ChannelBufferLen: 10,
LogLevel: "INFO",
ShutdownTimeout: 10 * time.Second,
EnforceSysCaps: false,
EBPF: config.EBPFTracer{
BatchLength: 100,
Expand Down Expand Up @@ -182,6 +183,9 @@ type Config struct {

LogLevel string `yaml:"log_level" env:"BEYLA_LOG_LEVEL"`

// Timeout for a graceful shutdown
ShutdownTimeout time.Duration `yaml:"shutdown_timeout" env:"BEYLA_SHUTDOWN_TIMEOUT"`

// Check for required system capabilities and bail if they are not
// present. If set to 'false', Beyla will still print a list of missing
// capabilities, but the execution will continue
Expand Down
11 changes: 10 additions & 1 deletion pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type envMap map[string]string
func TestConfig_Overrides(t *testing.T) {
userConfig := bytes.NewBufferString(`
trace_printer: json
shutdown_timeout: 30s
channel_buffer_len: 33
ebpf:
functions:
Expand Down Expand Up @@ -116,6 +117,7 @@ network:
ServiceName: "svc-name",
ChannelBufferLen: 33,
LogLevel: "INFO",
ShutdownTimeout: 30 * time.Second,
EnforceSysCaps: false,
TracePrinter: "json",
EBPF: config.EBPFTracer{
Expand Down Expand Up @@ -236,12 +238,19 @@ func TestConfig_ServiceName(t *testing.T) {
assert.Equal(t, "some-svc-name", cfg.ServiceName)
}

func TestConfig_ShutdownTimeout(t *testing.T) {
require.NoError(t, os.Setenv("BEYLA_SHUTDOWN_TIMEOUT", "1m"))
cfg, err := LoadConfig(bytes.NewReader(nil))
require.NoError(t, err)
assert.Equal(t, time.Minute, cfg.ShutdownTimeout)
}

func TestConfigValidate(t *testing.T) {
testCases := []envMap{
{"OTEL_EXPORTER_OTLP_ENDPOINT": "localhost:1234", "BEYLA_EXECUTABLE_NAME": "foo", "INSTRUMENT_FUNC_NAME": "bar"},
{"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": "localhost:1234", "BEYLA_EXECUTABLE_NAME": "foo", "INSTRUMENT_FUNC_NAME": "bar"},
{"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "localhost:1234", "BEYLA_EXECUTABLE_NAME": "foo", "INSTRUMENT_FUNC_NAME": "bar"},
{"BEYLA_TRACE_PRINTER": "text", "BEYLA_EXECUTABLE_NAME": "foo"},
{"BEYLA_TRACE_PRINTER": "text", "BEYLA_SHUTDOWN_TIMEOUT": "1m", "BEYLA_EXECUTABLE_NAME": "foo"},
{"BEYLA_TRACE_PRINTER": "json", "BEYLA_EXECUTABLE_NAME": "foo"},
{"BEYLA_TRACE_PRINTER": "json_indent", "BEYLA_EXECUTABLE_NAME": "foo"},
{"BEYLA_TRACE_PRINTER": "counter", "BEYLA_EXECUTABLE_NAME": "foo"},
Expand Down
83 changes: 40 additions & 43 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"log/slog"
"sync"

"golang.org/x/sync/errgroup"

"github.com/grafana/beyla/v2/pkg/beyla"
"github.com/grafana/beyla/v2/pkg/export/attributes"
"github.com/grafana/beyla/v2/pkg/export/otel"
Expand All @@ -26,67 +28,58 @@ func RunBeyla(ctx context.Context, cfg *beyla.Config) error {
return fmt.Errorf("can't build common context info: %w", err)
}

wg := sync.WaitGroup{}
app := cfg.Enabled(beyla.FeatureAppO11y)
if app {
wg.Add(1)
}
net := cfg.Enabled(beyla.FeatureNetO11y)
if net {
wg.Add(1)
}

// of one of both nodes fail, the other should stop
ctx, cancel := context.WithCancel(ctx)
errs := make(chan error, 2)
// if one of nodes fail, the other should stop
g, ctx := errgroup.WithContext(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!!! Didn't know this tool.


if app {
go func() {
defer wg.Done()
g.Go(func() error {
if err := setupAppO11y(ctx, ctxInfo, cfg); err != nil {
cancel()
errs <- err
return fmt.Errorf("setupAppO11y: %w", err)
}
}()
return nil
})
}

if net {
go func() {
defer wg.Done()
g.Go(func() error {
if err := setupNetO11y(ctx, ctxInfo, cfg); err != nil {
cancel()
errs <- err
return fmt.Errorf("setupNetO11y: %w", err)
}
}()
return nil
})
}
wg.Wait()
cancel()
select {
case err := <-errs:

if err := g.Wait(); err != nil {
return err
default:
return nil
}

return nil
}

func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) error {
slog.Info("starting Beyla in Application Observability mode")

ctx, cancel := context.WithCancel(ctx)
defer cancel()
instr := appolly.New(ctx, ctxInfo, config)
if finderDone, err := instr.FindAndInstrument(ctx); err != nil {
slog.Debug("can't find target process", "error", err)
instr, err := appolly.New(ctx, ctxInfo, config)
if err != nil {
slog.Debug("can't create new instrumenter", "error", err)
return fmt.Errorf("can't create new instrumenter: %w", err)
}

err = instr.FindAndInstrument(ctx)
if err != nil {
slog.Debug("can't find target process", "error", err)
return fmt.Errorf("can't find target process: %w", err)
} else {
defer func() {
// before exiting, waits for all the resources to be freed
<-finderDone
}()
}
if err := instr.ReadAndForward(ctx); err != nil {
cancel()
slog.Debug("can't start read and forwarding", "error", err)
return fmt.Errorf("can't start read and forwarding: %w", err)

err = instr.ReadAndForward(ctx)
if err != nil {
slog.Debug("can't read and forward auto-instrumenter", "error", err)
return fmt.Errorf("can't read and forward auto-instrumente: %w", err)
}

return nil
}

Expand All @@ -95,16 +88,20 @@ func setupNetO11y(ctx context.Context, ctxInfo *global.ContextInfo, cfg *beyla.C
slog.Warn(msg + ". Skipping Network metrics component")
return nil
}

slog.Info("starting Beyla in Network metrics mode")
flowsAgent, err := agent.FlowsAgent(ctxInfo, cfg)
if err != nil {
slog.Debug("can't start network metrics capture", "error", err)
return fmt.Errorf("can't start network metrics capture: %w", err)
}
if err := flowsAgent.Run(ctx); err != nil {
slog.Debug("can't start network metrics capture", "error", err)
return fmt.Errorf("can't start network metrics capture: %w", err)

err = flowsAgent.Run(ctx)
if err != nil {
slog.Debug("can't run network metrics capture", "error", err)
return fmt.Errorf("can't run network metrics capture: %w", err)
}

return nil
}

Expand Down
77 changes: 55 additions & 22 deletions pkg/internal/appolly/appolly.go
10000
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ package appolly

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/grafana/beyla/v2/pkg/beyla"
"github.com/grafana/beyla/v2/pkg/internal/discover"
Expand All @@ -16,54 +18,63 @@ import (
"github.com/grafana/beyla/v2/pkg/pipe/msg"
)

var errShutdownTimeout = errors.New("graceful shutdown has timed out")

func log() *slog.Logger {
return slog.With("component", "beyla.Instrumenter")
}

// Instrumenter finds and instrument a service/process, and forwards the traces as
// configured by the user
type Instrumenter struct {
config *beyla.Config
ctxInfo *global.ContextInfo
config *beyla.Config
ctxInfo *global.ContextInfo
tracersWg *sync.WaitGroup
bp *pipe.Instrumenter

// tracesInput is used to communicate the found traces between the ProcessFinder and
// the ProcessTracer.
tracesInput *msg.Queue[[]request.Span]
}

// New Instrumenter, given a Config
func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {
func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) (*Instrumenter, error) {
setupFeatureContextInfo(ctx, ctxInfo, config)

tracesInput := msg.NewQueue[[]request.Span](msg.ChannelBufferLen(config.ChannelBufferLen))

bp, err := pipe.Build(ctx, config, ctxInfo, tracesInput)
if err != nil {
return nil, fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
}

return &Instrumenter{
config: config,
ctxInfo: ctxInfo,
tracesInput: msg.NewQueue[[]request.Span](msg.ChannelBufferLen(config.ChannelBufferLen)),
}
tracersWg: &sync.WaitGroup{},
tracesInput: tracesInput,
bp: bp,
}, nil
}

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
// Returns a channel that is closed when the Instrumenter completed all its tasks.
// This is: when the context is cancelled, it has unloaded all the eBPF probes.
func (i *Instrumenter) FindAndInstrument(ctx context.Context) (<-chan struct{}, error) {
func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
finder := discover.NewProcessFinder(i.config, i.ctxInfo, i.tracesInput)
processEvents, err := finder.Start(ctx)
if err != nil {
return nil, fmt.Errorf("couldn't start Process Finder: %w", err)
return fmt.Errorf("couldn't start Process Finder: %w", err)
}
done := make(chan struct{})

// In background, listen indefinitely for each new process and run its
// associated ebpf.ProcessTracer once it is found.
wg := sync.WaitGroup{}
go func() {
log := log()
for {
select {
case <-ctx.Done():
log.Debug("stopped searching for new processes to instrument. Waiting for the eBPF tracers to be unloaded")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd anyway log the messages, for debugging purposes.

Copy link
Contributor Author
@almostinf almostinf Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is also still there, just moved to Instrumenter.stop() :)

wg.Wait()
close(done)
log.Debug("tracers unloaded, exiting FindAndInstrument")
return
case ev := <-processEvents:
switch ev.Type {
Expand All @@ -72,9 +83,9 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) (<-chan struct{},
log.Debug("running tracer for new process",
"inode", pt.FileInfo.Ino, "pid", pt.FileInfo.Pid, "exec", pt.FileInfo.CmdExePath)
if pt.Tracer != nil {
wg.Add(1)
i.tracersWg.Add(1)
go func() {
defer wg.Done()
defer i.tracersWg.Done()
pt.Tracer.Run(ctx, i.tracesInput)
}()
}
Expand All @@ -91,8 +102,9 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) (<-chan struct{},
}
}
}()

// TODO: wait until all the resources have been freed/unmounted
return done, nil
return nil
}

// ReadAndForward keeps listening for traces in the BPF map, then reads,
Expand All @@ -101,20 +113,41 @@ func (i *Instrumenter) ReadAndForward(ctx context.Context) error {
log := log()
log.Debug("creating instrumentation pipeline")

bp, err := pipe.Build(ctx, i.config, i.ctxInfo, i.tracesInput)
if err != nil {
return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
}

log.Info("Starting main node")

bp.Run(ctx)
i.bp.Run(ctx)

<-ctx.Done()

log.Info("exiting auto-instrumenter")

err := i.stop()
if err != nil {
return fmt.Errorf("failed to stop auto-instrumenter: %w", err)
}

return nil
}

func (i *Instrumenter) stop() error {
log := log()

stopped := make(chan struct{})
go func() {
log.Debug("stopped searching for new processes to instrument. Waiting for the eBPF tracers to be unloaded")
i.tracersWg.Wait()
stopped <- struct{}{}
log.Debug("tracers unloaded, exiting FindAndInstrument")
}()

select {
case <-time.After(i.config.ShutdownTimeout):
return errShutdownTimeout
case <-stopped:
return nil
}
}

func setupFeatureContextInfo(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) {
ctxInfo.AppO11y.ReportRoutes = config.Routes != nil
setupKubernetes(ctx, ctxInfo)
Expand Down
Loading
Loading
0