From ffc83f5df9727c97de7f7c1670c7a68c32c5c0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Thu, 14 Mar 2024 22:32:37 +0100 Subject: [PATCH 1/3] feature: flightrecorder to enable Go trace config: allow configuration for Go runtime/trace.FlightRecorder pass default period to define what means skipper to be slow config: set defaults to 100ms is slow and max 16MB trace file size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- config/config.go | 4 + proxy/breaker_test.go | 2 +- proxy/flightrecorder_test.go | 93 +++++++++++++++++++++ proxy/proxy.go | 152 ++++++++++++++++++++++++++++++++++- skipper.go | 61 +++++++++++++- 5 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 proxy/flightrecorder_test.go diff --git a/config/config.go b/config/config.go index 68de45d094..bbf9c4892c 100644 --- a/config/config.go +++ b/config/config.go @@ -89,6 +89,7 @@ type Config struct { BlockProfileRate int `yaml:"block-profile-rate"` MutexProfileFraction int `yaml:"mutex-profile-fraction"` MemProfileRate int `yaml:"memory-profile-rate"` + FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"` DebugGcMetrics bool `yaml:"debug-gc-metrics"` RuntimeMetrics bool `yaml:"runtime-metrics"` ServeRouteMetrics bool `yaml:"serve-route-metrics"` @@ -439,6 +440,7 @@ func NewConfig() *Config { // logging, metrics, tracing: flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics") + flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation") flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span") flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.") @@ -454,6 +456,7 @@ func NewConfig() *Config { flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction") flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.MemProfileRate, keeps default 512 kB") flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") + flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.") flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats") flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats") flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route") @@ -919,6 +922,7 @@ func (c *Config) ToOptions() skipper.Options { EnableProfile: c.EnableProfile, BlockProfileRate: c.BlockProfileRate, MutexProfileFraction: c.MutexProfileFraction, + FlightRecorderTargetURL: c.FlightRecorderTargetURL, EnableDebugGcMetrics: c.DebugGcMetrics, EnableRuntimeMetrics: c.RuntimeMetrics, EnableServeRouteMetrics: c.ServeRouteMetrics, diff --git a/proxy/breaker_test.go b/proxy/breaker_test.go index 5d1c61d2c6..0b80403375 100644 --- a/proxy/breaker_test.go +++ b/proxy/breaker_test.go @@ -543,7 +543,7 @@ func TestBreakerMultipleHosts(t *testing.T) { checkBackendHostCounter("foo", testRateWindow), checkBackendHostCounter("bar", testRateWindow), setBackendFail, - trace("setting fail"), + traceBreakerTest("setting fail"), setBackendHostFail("foo"), setBackendHostFail("bar"), times(testRateFailures, diff --git a/proxy/flightrecorder_test.go b/proxy/flightrecorder_test.go new file mode 100644 index 0000000000..d05aadb08b --- /dev/null +++ b/proxy/flightrecorder_test.go @@ -0,0 +1,93 @@ +package proxy_test + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "runtime/trace" + "testing" + "time" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/diag" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/proxy/proxytest" +) + +func TestFlightRecorder(t *testing.T) { + ch := make(chan int) + service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "PUT" { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed))) + ch <- http.StatusMethodNotAllowed + return + } + + var buf bytes.Buffer + n, err := io.Copy(&buf, r.Body) + if err != nil { + t.Fatalf("Failed to copy data: %v", err) + } + if n < 100 { + t.Fatalf("Failed to write enough data: %d bytes", n) + } + w.WriteHeader(http.StatusCreated) + w.Write([]byte(http.StatusText(http.StatusCreated))) + ch <- http.StatusCreated + })) + defer service.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(http.StatusText(http.StatusOK))) + })) + defer backend.Close() + + flightRecorder := trace.NewFlightRecorder(trace.FlightRecorderConfig{ + MinAge: time.Second, + }) + flightRecorder.Start() + + spec := diag.NewLatency() + fr := make(filters.Registry) + fr.Register(spec) + + doc := fmt.Sprintf(`r: * -> latency("100ms") -> "%s"`, backend.URL) + rr := eskip.MustParse(doc) + + pr := proxytest.WithParams(fr, proxy.Params{ + FlightRecorder: flightRecorder, + FlightRecorderTargetURL: service.URL, + FlightRecorderPeriod: 90 * time.Millisecond, + }, rr...) + defer pr.Close() + + rsp, err := pr.Client().Get(pr.URL) + if err != nil { + t.Fatalf("Failed to GET %q: %v", pr.URL, err) + } + defer rsp.Body.Close() + _, err = io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + + switch rsp.StatusCode { + case http.StatusOK: + // ok + default: + t.Fatalf("Failed to get status OK: %d", rsp.StatusCode) + } + + statusCode := <-ch + switch statusCode { + case http.StatusCreated: + // ok + default: + t.Fatalf("Failed to get status OK: %d", rsp.StatusCode) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index e326f80ca2..d94ddd1637 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -17,6 +17,7 @@ import ( "os" "runtime" "runtime/pprof" + "runtime/trace" "strconv" "strings" "sync" @@ -377,6 +378,18 @@ type Params struct { // PassiveHealthCheck defines the parameters for the healthy endpoints checker. PassiveHealthCheck *PassiveHealthCheck + + // FlightRecorder is a started instance of https://pkg.go.dev/runtime/trace#FlightRecorder + FlightRecorder *trace.FlightRecorder + + // FlightRecorderTargetURL is the target to write the trace + // to. Supported targets are http URL and file URL. + FlightRecorderTargetURL string + + // FlightRecorderPeriod is the time.Duration that is used to detect + // a slow skipper. If skipper is detected to be slow it tries + // to write out a trace as configured by the FlightRecorderTargetURL. + FlightRecorderPeriod time.Duration } type ( @@ -472,6 +485,10 @@ type Proxy struct { clientTLS *tls.Config hostname string onPanicSometimes rate.Sometimes + flightRecorder *trace.FlightRecorder + flightRecorderURL *url.URL + flightRecorderPeriod time.Duration + flightRecorderCH chan struct{} } // proxyError is used to wrap errors during proxying and to indicate @@ -874,6 +891,52 @@ func WithParams(p Params) *Proxy { maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio, } } + + log := &logging.DefaultLog{} + + var ( + frURL *url.URL + // buffered channel size 10k to allow unblocked requests + frChannel = make(chan struct{}, 10240) + ) + if p.FlightRecorder != nil { + var err error + frURL, err = url.Parse(p.FlightRecorderTargetURL) + if err != nil { + p.FlightRecorder.Stop() + p.FlightRecorder = nil + } else { + // decouple writing a trace from data-plane work + go func() { + foreverHang := 365 * 24 * time.Hour + timer := time.NewTimer(foreverHang) + defer timer.Stop() + + last := time.Now().Add(-time.Hour) + + for { + select { + case <-frChannel: + // range through all notifications until 1ms there is no notification + // reset timer to write trace after handling all the notifications + timer.Reset(time.Millisecond) + continue + case <-quit: + p.FlightRecorder.Stop() + return + case <-timer.C: + if time.Since(last) >= time.Hour { + writeGoTrace(p.FlightRecorder, frURL, log, tr) + } + last = time.Now() + + timer.Reset(foreverHang) + } + } + }() + } + } + return &Proxy{ routing: p.Routing, registry: p.EndpointRegistry, @@ -892,7 +955,7 @@ func WithParams(p Params) *Proxy { maxLoops: p.MaxLoopbacks, breakers: p.CircuitBreakers, limiters: p.RateLimiters, - log: &logging.DefaultLog{}, + log: log, defaultHTTPStatus: defaultHTTPStatus, tracing: newProxyTracing(p.OpenTracing), copyStreamPoolEnabled: p.EnableCopyStreamPoolExperimental, @@ -902,6 +965,87 @@ func WithParams(p Params) *Proxy { clientTLS: tr.TLSClientConfig, hostname: hostname, onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute}, + flightRecorder: p.FlightRecorder, + flightRecorderURL: frURL, + flightRecorderPeriod: p.FlightRecorderPeriod, + flightRecorderCH: frChannel, + } +} + +func (p *Proxy) writeTraceIfTooSlow(took time.Duration, span ot.Span) { + span.SetTag("proxy.took", took) + + if p.flightRecorder == nil { + return + } + + if p.flightRecorderPeriod < 1*time.Millisecond && p.flightRecorderPeriod > took { + return + } + + // signal too slow + p.flightRecorderCH <- struct{}{} +} + +func writeGoTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) { + n, err := flightRecorder.WriteTo(w) + if err != nil { + return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err) + } else { + log.Infof("FlightRecorder wrote %d bytes", n) + } + + return int(n), err +} + +func writeGoTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) { + if flightRecorder == nil || flightRecorderURL == nil { + return + } + + switch flightRecorderURL.Scheme { + case "file": + fd, err := os.Create(flightRecorderURL.Path) + if err != nil { + log.Errorf("Failed to create file %q: %v", err, flightRecorderURL.Path) + return + } + defer fd.Close() + + _, err = writeGoTraceTo(log, flightRecorder, fd) + if err != nil { + log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err) + } + + case "http", "https": + var b bytes.Buffer + _, err := writeGoTraceTo(log, flightRecorder, &b) + if err != nil { + log.Errorf("Failed to write trace into in-memory buffer: %v", err) + return + } + + req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b) + if err != nil { + log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err) + } + + rsp, err := roundTripper.RoundTrip(req) + if err != nil { + log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err) + return + } + + rsp.Body.Close() + + switch rsp.StatusCode { + case 200, 201, 204: + log.Infof("Successful send of a trace to %q", flightRecorderURL.String()) + default: + log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status) + } + default: + log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme) } } @@ -1055,7 +1199,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co proxySpanOpts := []ot.StartSpanOption{ot.Tags{ SpanKindTag: SpanKindClient, }} - if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil { + parentSpan := ot.SpanFromContext(req.Context()) + if parentSpan != nil { proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context())) } ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...) @@ -1082,12 +1227,11 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co req = injectClientTraceByEvent(req, ctx.proxySpan) } + p.writeTraceIfTooSlow(requestStopWatch.elapsed, parentSpan) p.metrics.MeasureBackendRequestHeader(ctx.metricsHost(), snet.SizeOfRequestHeader(req)) requestStopWatch.Stop() - response, err := roundTripper.RoundTrip(req) - responseStopWatch.Start() if endpointMetrics != nil { diff --git a/skipper.go b/skipper.go index 3acf96055a..b4546b2a11 100644 --- a/skipper.go +++ b/skipper.go @@ -12,6 +12,7 @@ import ( "os/signal" "path" "regexp" + "runtime/trace" "strconv" "strings" "syscall" @@ -24,7 +25,7 @@ import ( log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" otBridge "go.opentelemetry.io/otel/bridge/opentracing" - "go.opentelemetry.io/otel/trace" + oteltrace "go.opentelemetry.io/otel/trace" "github.com/zalando/skipper/circuit" "github.com/zalando/skipper/dataclients/kubernetes" @@ -81,7 +82,11 @@ import ( const ( defaultSourcePollTimeout = 30 * time.Millisecond defaultRoutingUpdateBuffer = 1 << 5 - otelTracerName = "skipper" + + defaultFlightRecorderPeriod = 100 * time.Millisecond + defaultFlightRecorderMaxBytes = 1 << 24 // 16 MB + + otelTracerName = "skipper" ) const DefaultPluginDir = "./plugins" @@ -520,6 +525,28 @@ type Options struct { // MemProfileRate calls runtime.MemProfileRate(MemProfileRate) if non zero value, deactivate with <0 MemProfileRate int + // FlightRecorderMaxBytes sets MaxBytes of the FlightRecorderConfig https://pkg.go.dev/runtime/trace#FlightRecorderConfig + FlightRecorderMaxBytes int + + // FlightRecorderPeriod sets the time.Duration that is the + // threshold of skipper being slow to write down a recorded a + // trace. + // + // FlightRecorderPeriod is used to compute MinAge of the FlightRecorderConfig https://pkg.go.dev/runtime/trace#FlightRecorderConfig. + // + // https://go.dev/blog/flight-recorder: + // MinAge configures the duration for which trace data is reliably retained, and we suggest setting it to around 2x the time window of the event. + FlightRecorderPeriod time.Duration + + // FlightRecorderTargetURL is the target to write the trace + // to. Supported targets are http URL and file URL. Skipper + // will try to upload the trace data by an http PUT request to + // this http URL. This is required to set if you want to have + // trace.FlightRecorder + // https://pkg.go.dev/runtime/trace#FlightRecorder + // enabled to support Go tool trace. + FlightRecorderTargetURL string + // Flag that enables reporting of the Go garbage collector statistics exported in debug.GCStats EnableDebugGcMetrics bool @@ -1758,7 +1785,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { var ( tracer ot.Tracer - otelTracer trace.Tracer + otelTracer oteltrace.Tracer ) if o.OpenTelemetry != nil { @@ -2348,6 +2375,29 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { routing := routing.New(ro) defer routing.Close() + var fr *trace.FlightRecorder + if o.FlightRecorderTargetURL != "" { + if o.FlightRecorderPeriod == 0 { + o.FlightRecorderPeriod = defaultFlightRecorderPeriod + } + if o.FlightRecorderMaxBytes <= 0 { + o.FlightRecorderMaxBytes = defaultFlightRecorderMaxBytes + } + fr = trace.NewFlightRecorder(trace.FlightRecorderConfig{ + MinAge: 2 * o.FlightRecorderPeriod, + MaxBytes: uint64(o.FlightRecorderMaxBytes), + }) + + err := fr.Start() + if err != nil { + log.Errorf("Failed to start FlightRecorder: %v", err) + fr.Stop() + fr = nil + } else { + log.Infof("FlightRecorder started with FlightRecorderConfig (%s, %d) and target: %s", o.FlightRecorderPeriod*2, o.FlightRecorderMaxBytes, o.FlightRecorderTargetURL) + } + } + proxyFlags := proxy.Flags(o.ProxyOptions) | o.ProxyFlags proxyParams := proxy.Params{ Routing: routing, @@ -2366,17 +2416,20 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { ExpectContinueTimeout: o.ExpectContinueTimeoutBackend, KeepAlive: o.KeepAliveBackend, DualStack: o.DualStackBackend, + EnableCopyStreamPoolExperimental: o.EnableCopyStreamPoolExperimental, TLSHandshakeTimeout: o.TLSHandshakeTimeoutBackend, MaxIdleConns: o.MaxIdleConnsBackend, DisableHTTPKeepalives: o.DisableHTTPKeepalives, AccessLogDisabled: o.AccessLogDisabled, - EnableCopyStreamPoolExperimental: o.EnableCopyStreamPoolExperimental, ClientTLS: o.ClientTLS, CustomHttpRoundTripperWrap: o.CustomHttpRoundTripperWrap, RateLimiters: ratelimitRegistry, EndpointRegistry: endpointRegistry, EnablePassiveHealthCheck: passiveHealthCheckEnabled, PassiveHealthCheck: passiveHealthCheck, + FlightRecorder: fr, + FlightRecorderTargetURL: o.FlightRecorderTargetURL, + FlightRecorderPeriod: o.FlightRecorderPeriod, } if o.EnableBreakers || len(o.BreakerSettings) > 0 { From ce327e95af6b45bf9c237c6ce6db7cefad26bdf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Fri, 27 Mar 2026 12:28:58 +0100 Subject: [PATCH 2/3] remove tracer function that was used for CB tests, but it spams only test output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- proxy/breaker_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/proxy/breaker_test.go b/proxy/breaker_test.go index 0b80403375..fe30211408 100644 --- a/proxy/breaker_test.go +++ b/proxy/breaker_test.go @@ -280,13 +280,6 @@ func wait(d time.Duration) scenarioStep { } } -func trace(msg string) scenarioStep { - return func(c *breakerTestContext) { - c.t.Helper() - println(msg) - } -} - func TestBreakerConsecutive(t *testing.T) { for _, s := range []breakerScenario{{ title: "no breaker", @@ -543,7 +536,6 @@ func TestBreakerMultipleHosts(t *testing.T) { checkBackendHostCounter("foo", testRateWindow), checkBackendHostCounter("bar", testRateWindow), setBackendFail, - traceBreakerTest("setting fail"), setBackendHostFail("foo"), setBackendHostFail("bar"), times(testRateFailures, From f14d937e88dc5397a5b24b2cf2d463bbbfaa6120 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Sat, 28 Mar 2026 12:54:08 +0100 Subject: [PATCH 3/3] fix: merge error added a second -enable-prometheus-start-label MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- config/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/config/config.go b/config/config.go index bbf9c4892c..aa99428294 100644 --- a/config/config.go +++ b/config/config.go @@ -455,7 +455,6 @@ func NewConfig() *Config { flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate") flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction") flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.MemProfileRate, keeps default 512 kB") - flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.") flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats") flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats")