Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 34 additions & 46 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ type ClientOptions struct {
type Client struct {
mu sync.RWMutex
options ClientOptions
dsn *Dsn
dsn *protocol.Dsn
eventProcessors []EventProcessor
integrations []Integration
externalTraceResolver externalContextTraceResolver
Expand Down Expand Up @@ -395,10 +395,10 @@ func NewClient(options ClientOptions) (*Client, error) {
}
}

var dsn *Dsn
var dsn *protocol.Dsn
if options.Dsn != "" {
var err error
dsn, err = NewDsn(options.Dsn)
dsn, err = protocol.NewDsn(options.Dsn)
if err != nil {
return nil, err
}
Expand All @@ -419,25 +419,29 @@ func NewClient(options ClientOptions) (*Client, error) {
client.reportProvider = a
}

client.setupTransport()

// noop Telemetry Buffers and Processor fow now
// if !options.DisableTelemetryBuffer {
// client.setupTelemetryProcessor()
// } else
if options.EnableLogs {
client.batchLogger = newLogBatchProcessor(&client)
client.batchLogger.Start()
}
// We currently disallow using custom Transport with the new Telemetry Processor, due to the difference in transport signatures.
// The option should be enabled when the new Transport interface signature changes.
if !options.DisableTelemetryBuffer && client.options.Transport == nil {
client.setupTelemetryProcessor()
Comment thread
sentry[bot] marked this conversation as resolved.
} else {
if client.options.Transport != nil {
debuglog.Println("Cannot enable Telemetry Processor with custom Transport: fallback to old transport")
}
client.setupTransport()

if !options.DisableMetrics {
client.batchMeter = newMetricBatchProcessor(&client)
client.batchMeter.Start()
if options.EnableLogs {
client.batchLogger = newLogBatchProcessor(&client)
client.batchLogger.Start()
}
if !options.DisableMetrics {
client.batchMeter = newMetricBatchProcessor(&client)
client.batchMeter.Start()
}
}
client.setupIntegrations()
if options.OrgID != 0 && client.dsn != nil {
client.dsn.SetOrgID(options.OrgID)
}
client.setupIntegrations()

return &client, nil
}
Expand Down Expand Up @@ -474,31 +478,19 @@ func (client *Client) setupTransport() {
client.Transport = transport
}

func (client *Client) setupTelemetryProcessor() { // nolint: unused
if client.options.DisableTelemetryBuffer {
return
}

if client.dsn == nil {
debuglog.Println("Telemetry buffer disabled: no DSN configured")
return
}

// We currently disallow using custom Transport with the new Telemetry Processor, due to the difference in transport signatures.
// The option should be enabled when the new Transport interface signature changes.
if client.options.Transport != nil {
debuglog.Println("Cannot enable Telemetry Processor/Buffers with custom Transport: fallback to old transport")
if client.options.EnableLogs {
client.batchLogger = newLogBatchProcessor(client)
client.batchLogger.Start()
}
if !client.options.DisableMetrics {
client.batchMeter = newMetricBatchProcessor(client)
client.batchMeter.Start()
}
return
func (client *Client) sdkInfo() *protocol.SdkInfo {
return &protocol.SdkInfo{
Name: client.GetSDKIdentifier(),
Version: SDKVersion,
Integrations: client.listIntegrations(),
Comment thread
cursor[bot] marked this conversation as resolved.
Packages: []SdkPackage{{
Name: "sentry-go",
Version: SDKVersion,
}},
}
}

func (client *Client) setupTelemetryProcessor() {
transport := httpInternal.NewAsyncTransport(httpInternal.TransportOptions{
Dsn: client.options.Dsn,
HTTPClient: client.options.HTTPClient,
Expand All @@ -508,6 +500,7 @@ func (client *Client) setupTelemetryProcessor() { // nolint: unused
CaCerts: client.options.CaCerts,
Recorder: client.reportRecorder,
Provider: client.reportProvider,
SdkInfo: client.sdkInfo,
})
client.Transport = &internalAsyncTransportAdapter{transport: transport}

Expand All @@ -519,12 +512,7 @@ func (client *Client) setupTelemetryProcessor() { // nolint: unused
ratelimit.CategoryTraceMetric: telemetry.NewRingBuffer[protocol.TelemetryItem](ratelimit.CategoryTraceMetric, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second, client.reportRecorder),
}

sdkInfo := &protocol.SdkInfo{
Name: client.sdkIdentifier,
Version: client.sdkVersion,
}

client.telemetryProcessor = telemetry.NewProcessor(buffers, transport, &client.dsn.Dsn, sdkInfo, client.reportRecorder)
client.telemetryProcessor = telemetry.NewProcessor(buffers, transport, client.dsn, client.sdkInfo, client.reportRecorder)
}

func (client *Client) setupIntegrations() {
Expand Down
34 changes: 21 additions & 13 deletions client_reports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/getsentry/sentry-go/internal/ratelimit"
"github.com/getsentry/sentry-go/internal/testutils"
"github.com/getsentry/sentry-go/report"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
)

// TestClientReports_Integration tests that client reports are properly generated
// and sent when events are dropped for various reasons.
func TestClientReports_Integration(t *testing.T) {
var receivedBodies [][]byte
var mu sync.Mutex
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
mu.Lock()
receivedBodies = append(receivedBodies, body)
mu.Unlock()
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"test-event-id"}`))
}))
Expand Down Expand Up @@ -73,21 +79,23 @@ func TestClientReports_Integration(t *testing.T) {
}

var got report.ClientReport
found := false
for _, b := range receivedBodies {
for _, line := range bytes.Split(b, []byte("\n")) {
if json.Unmarshal(line, &got) == nil && len(got.DiscardedEvents) > 0 {
found = true
break
require.Eventually(t, func() bool {
mu.Lock()
bodies := make([][]byte, len(receivedBodies))
copy(bodies, receivedBodies)
mu.Unlock()

for _, b := range bodies {
for _, line := range bytes.Split(b, []byte("\n")) {
var report report.ClientReport
if json.Unmarshal(line, &report) == nil && len(report.DiscardedEvents) > 0 {
got = report
return true
}
}
}
if found {
break
}
}
if !found {
t.Fatal("no client report found in envelope bodies")
}
return false
}, time.Second, 10*time.Millisecond, "no client report found in envelope bodies with: %v", got)

if got.Timestamp.IsZero() {
t.Error("client report missing timestamp")
Expand Down
72 changes: 64 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/getsentry/sentry-go/internal/debuglog"
internalHttp "github.com/getsentry/sentry-go/internal/http"
"github.com/getsentry/sentry-go/internal/testutils"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
pkgErrors "github.com/pkg/errors"
Expand Down Expand Up @@ -1063,9 +1068,63 @@ func TestClientSetsUpTransport(t *testing.T) {
Transport: &MockTransport{},
})
require.IsType(t, &MockTransport{}, client.Transport)
}

type namedIntegration struct{ name string }

func (n *namedIntegration) Name() string { return n.name }
func (n *namedIntegration) SetupOnce(_ *Client) {}

func TestTelemetryEnvelopeCarriesIntegrations(t *testing.T) {
var bodies [][]byte
requestReceived := make(chan struct{}, 1)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodies = append(bodies, b)
w.WriteHeader(http.StatusOK)
select {
case requestReceived <- struct{}{}:
default:
}
}))
Comment thread
giortzisg marked this conversation as resolved.
defer srv.Close()

dsn := strings.Replace(srv.URL, "//", "//pubkey@", 1) + "/1"
client, err := NewClient(ClientOptions{
Dsn: dsn,
Integrations: func(defaults []Integration) []Integration {
return append(defaults, &namedIntegration{name: "CustomRegressionIntegration"})
},
})
require.NoError(t, err)
t.Cleanup(func() { client.Close() })

client.CaptureMessage("ping", nil, &MockScope{})
require.True(t, client.Flush(testutils.FlushTimeout()), "flush timed out")

select {
case <-requestReceived:
case <-time.After(testutils.FlushTimeout()):
t.Fatal("server never received an envelope")
}

require.NotEmpty(t, bodies, "expected at least one envelope to be sent")

client, _ = NewClient(ClientOptions{})
require.IsType(t, &noopTransport{}, client.Transport)
body := bodies[0]
nl := bytes.IndexByte(body, '\n')
require.Positive(t, nl, "envelope body missing header newline")
var header struct {
Sdk struct {
Name string `json:"name"`
Integrations []string `json:"integrations"`
} `json:"sdk"`
}
require.NoError(t, json.Unmarshal(body[:nl], &header))

assert.Equal(t, sdkIdentifier, header.Sdk.Name)
assert.Contains(t, header.Sdk.Integrations, "CustomRegressionIntegration")
assert.Contains(t, header.Sdk.Integrations, "ContextifyFrames")
}

func TestClient_SetupTelemetryBuffer_NoDSN(t *testing.T) {
Expand All @@ -1078,13 +1137,10 @@ func TestClient_SetupTelemetryBuffer_NoDSN(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}

if client.telemetryProcessor != nil {
t.Fatal("expected telemetryProcessor to be nil when DSN is missing")
}

if _, ok := client.Transport.(*noopTransport); !ok {
t.Fatalf("expected noopTransport, got %T", client.Transport)
if client.telemetryProcessor == nil {
t.Fatal("expected telemetryProcessor to not be nil when DSN is missing")
}
require.IsType(t, &internalHttp.NoopTransport{}, client.Transport.(*internalAsyncTransportAdapter).transport)
}

type multiClientEnv struct {
Expand Down
3 changes: 2 additions & 1 deletion dynamic_sampling_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/getsentry/sentry-go/internal/protocol"
"github.com/getsentry/sentry-go/internal/testutils"
)

Expand Down Expand Up @@ -192,7 +193,7 @@ func TestDynamicSamplingContextFromScope(t *testing.T) {
},
},
client: func() *Client {
dsn, _ := NewDsn("http://public@example.com/sentry/1")
dsn, _ := protocol.NewDsn("http://public@example.com/sentry/1")
return &Client{
options: ClientOptions{
Dsn: dsn.String(),
Expand Down
2 changes: 1 addition & 1 deletion interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ func TestProcessor_MutationAfterAdd(t *testing.T) {
),
}

proc := telemetry.NewProcessor(buffers, transport, dsn, sdk, nil)
proc := telemetry.NewProcessor(buffers, transport, dsn, func() *protocol.SdkInfo { return sdk }, nil)

extra := map[string]interface{}{
"request_id": "original-123",
Expand Down
Loading
Loading