Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Config struct {
BlockProfileRate int `yaml:"block-profile-rate"`
MutexProfileFraction int `yaml:"mutex-profile-fraction"`
MemProfileRate int `yaml:"memory-profile-rate"`
FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"`
DebugGcMetrics bool `yaml:"debug-gc-metrics"`
RuntimeMetrics bool `yaml:"runtime-metrics"`
ServeRouteMetrics bool `yaml:"serve-route-metrics"`
Expand Down Expand Up @@ -439,6 +440,7 @@ func NewConfig() *Config {

// logging, metrics, tracing:
flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
Comment thread
szuecs marked this conversation as resolved.
flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation")
flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span")
flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.")
Expand All @@ -453,7 +455,7 @@ func NewConfig() *Config {
flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate")
flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction")
flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.MemProfileRate, keeps default 512 kB")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.")
flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats")
flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats")
flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route")
Expand Down Expand Up @@ -919,6 +921,7 @@ func (c *Config) ToOptions() skipper.Options {
EnableProfile: c.EnableProfile,
BlockProfileRate: c.BlockProfileRate,
MutexProfileFraction: c.MutexProfileFraction,
FlightRecorderTargetURL: c.FlightRecorderTargetURL,
EnableDebugGcMetrics: c.DebugGcMetrics,
EnableRuntimeMetrics: c.RuntimeMetrics,
EnableServeRouteMetrics: c.ServeRouteMetrics,
Expand Down
8 changes: 0 additions & 8 deletions proxy/breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,6 @@ func wait(d time.Duration) scenarioStep {
}
}

func trace(msg string) scenarioStep {
return func(c *breakerTestContext) {
c.t.Helper()
println(msg)
}
}

func TestBreakerConsecutive(t *testing.T) {
for _, s := range []breakerScenario{{
title: "no breaker",
Expand Down Expand Up @@ -543,7 +536,6 @@ func TestBreakerMultipleHosts(t *testing.T) {
checkBackendHostCounter("foo", testRateWindow),
checkBackendHostCounter("bar", testRateWindow),
setBackendFail,
trace("setting fail"),
setBackendHostFail("foo"),
setBackendHostFail("bar"),
times(testRateFailures,
Expand Down
93 changes: 93 additions & 0 deletions proxy/flightrecorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package proxy_test

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"runtime/trace"
"testing"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/diag"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/proxy/proxytest"
)

func TestFlightRecorder(t *testing.T) {
ch := make(chan int)
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed)))
ch <- http.StatusMethodNotAllowed
return
}

var buf bytes.Buffer
n, err := io.Copy(&buf, r.Body)
if err != nil {
t.Fatalf("Failed to copy data: %v", err)
}
if n < 100 {
t.Fatalf("Failed to write enough data: %d bytes", n)
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte(http.StatusText(http.StatusCreated)))
ch <- http.StatusCreated
}))
defer service.Close()

backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(http.StatusText(http.StatusOK)))
}))
defer backend.Close()

flightRecorder := trace.NewFlightRecorder(trace.FlightRecorderConfig{
MinAge: time.Second,
})
flightRecorder.Start()

spec := diag.NewLatency()
fr := make(filters.Registry)
fr.Register(spec)

doc := fmt.Sprintf(`r: * -> latency("100ms") -> "%s"`, backend.URL)
rr := eskip.MustParse(doc)

pr := proxytest.WithParams(fr, proxy.Params{
FlightRecorder: flightRecorder,
FlightRecorderTargetURL: service.URL,
FlightRecorderPeriod: 90 * time.Millisecond,
}, rr...)
defer pr.Close()

rsp, err := pr.Client().Get(pr.URL)
if err != nil {
t.Fatalf("Failed to GET %q: %v", pr.URL, err)
}
defer rsp.Body.Close()
_, err = io.ReadAll(rsp.Body)
if err != nil {
t.Fatalf("Failed to read body: %v", err)
}

switch rsp.StatusCode {
case http.StatusOK:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}

statusCode := <-ch
switch statusCode {
case http.StatusCreated:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}
}
152 changes: 148 additions & 4 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"runtime"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -377,6 +378,18 @@ type Params struct {

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck

// FlightRecorder is a started instance of https://pkg.go.dev/runtime/trace#FlightRecorder
FlightRecorder *trace.FlightRecorder

// FlightRecorderTargetURL is the target to write the trace
// to. Supported targets are http URL and file URL.
FlightRecorderTargetURL string

// FlightRecorderPeriod is the time.Duration that is used to detect
// a slow skipper. If skipper is detected to be slow it tries
// to write out a trace as configured by the FlightRecorderTargetURL.
FlightRecorderPeriod time.Duration
}

type (
Expand Down Expand Up @@ -472,6 +485,10 @@ type Proxy struct {
clientTLS *tls.Config
hostname string
onPanicSometimes rate.Sometimes
flightRecorder *trace.FlightRecorder
flightRecorderURL *url.URL
flightRecorderPeriod time.Duration
flightRecorderCH chan struct{}
}

// proxyError is used to wrap errors during proxying and to indicate
Expand Down Expand Up @@ -874,6 +891,52 @@ func WithParams(p Params) *Proxy {
maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio,
}
}

log := &logging.DefaultLog{}

var (
frURL *url.URL
// buffered channel size 10k to allow unblocked requests
frChannel = make(chan struct{}, 10240)
)
if p.FlightRecorder != nil {
var err error
frURL, err = url.Parse(p.FlightRecorderTargetURL)
if err != nil {
p.FlightRecorder.Stop()
p.FlightRecorder = nil
} else {
// decouple writing a trace from data-plane work
go func() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forever runs and tries to be an efficient way of handling multiple goroutines fire "we are slow" and write only once in an hour trace data to check.

foreverHang := 365 * 24 * time.Hour
timer := time.NewTimer(foreverHang)
defer timer.Stop()

last := time.Now().Add(-time.Hour)

for {
select {
case <-frChannel:
// range through all notifications until 1ms there is no notification
// reset timer to write trace after handling all the notifications
timer.Reset(time.Millisecond)
continue
case <-quit:
p.FlightRecorder.Stop()
return
case <-timer.C:
if time.Since(last) >= time.Hour {
writeGoTrace(p.FlightRecorder, frURL, log, tr)
}
last = time.Now()

timer.Reset(foreverHang)
}
}
}()
}
}

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
Expand All @@ -892,7 +955,7 @@ func WithParams(p Params) *Proxy {
maxLoops: p.MaxLoopbacks,
breakers: p.CircuitBreakers,
limiters: p.RateLimiters,
log: &logging.DefaultLog{},
log: log,
defaultHTTPStatus: defaultHTTPStatus,
tracing: newProxyTracing(p.OpenTracing),
copyStreamPoolEnabled: p.EnableCopyStreamPoolExperimental,
Expand All @@ -902,6 +965,87 @@ func WithParams(p Params) *Proxy {
clientTLS: tr.TLSClientConfig,
hostname: hostname,
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
flightRecorder: p.FlightRecorder,
flightRecorderURL: frURL,
flightRecorderPeriod: p.FlightRecorderPeriod,
flightRecorderCH: frChannel,
}
}

func (p *Proxy) writeTraceIfTooSlow(took time.Duration, span ot.Span) {
span.SetTag("proxy.took", took)

if p.flightRecorder == nil {
return
}

if p.flightRecorderPeriod < 1*time.Millisecond && p.flightRecorderPeriod > took {
return
}

// signal too slow
p.flightRecorderCH <- struct{}{}
}

func writeGoTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) {
n, err := flightRecorder.WriteTo(w)
if err != nil {
return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err)
} else {
log.Infof("FlightRecorder wrote %d bytes", n)
}

return int(n), err
}

func writeGoTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) {
if flightRecorder == nil || flightRecorderURL == nil {
return
}

switch flightRecorderURL.Scheme {
case "file":
fd, err := os.Create(flightRecorderURL.Path)
if err != nil {
log.Errorf("Failed to create file %q: %v", err, flightRecorderURL.Path)
return
}
defer fd.Close()

_, err = writeGoTraceTo(log, flightRecorder, fd)
if err != nil {
log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err)
}

case "http", "https":
var b bytes.Buffer
_, err := writeGoTraceTo(log, flightRecorder, &b)
if err != nil {
log.Errorf("Failed to write trace into in-memory buffer: %v", err)
return
}

req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b)
if err != nil {
log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err)
}

rsp, err := roundTripper.RoundTrip(req)
if err != nil {
log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err)
return
}

rsp.Body.Close()

switch rsp.StatusCode {
case 200, 201, 204:
log.Infof("Successful send of a trace to %q", flightRecorderURL.String())
default:
log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
}
default:
log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme)
}
}

Expand Down Expand Up @@ -1055,7 +1199,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
proxySpanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindClient,
}}
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
parentSpan := ot.SpanFromContext(req.Context())
if parentSpan != nil {
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
}
ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...)
Expand All @@ -1082,12 +1227,11 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
req = injectClientTraceByEvent(req, ctx.proxySpan)
}

p.writeTraceIfTooSlow(requestStopWatch.elapsed, parentSpan)
p.metrics.MeasureBackendRequestHeader(ctx.metricsHost(), snet.SizeOfRequestHeader(req))

requestStopWatch.Stop()

response, err := roundTripper.RoundTrip(req)

responseStopWatch.Start()

if endpointMetrics != nil {
Expand Down
Loading
Loading