Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ on:
jobs:
build:
name: Build
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
Comment thread
fals marked this conversation as resolved.
Outdated
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Go 1.18
- name: Setup Go 1.24.3
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.24.3

- name: Cache Go modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
.vscode
17 changes: 14 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,25 @@ type WriteResponse struct {

// Write sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics.
func (p *Client) Write(ctx context.Context, req *WriteRequest, options ...WriteOption) (*WriteResponse, error) {
return p.internalWrite(ctx, &prompb.WriteRequest{
Timeseries: toProtoTimeSeries(req.TimeSeries),
}, options...)
}

// WriteProto sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics.
// The difference between Write and WriteProto is that WriteProto allows you to write the full prompb.WriteRequest, which supports all metrics.
func (p *Client) WriteProto(ctx context.Context, req *prompb.WriteRequest, options ...WriteOption) (*WriteResponse, error) {
return p.internalWrite(ctx, req, options...)
}

// Write sends HTTP requests to Prometheus Remote Write compatible API endpoint including Prometheus, Cortex and VictoriaMetrics.
func (p *Client) internalWrite(ctx context.Context, req *prompb.WriteRequest, options ...WriteOption) (*WriteResponse, error) {
opts := writeOptions{}
for _, opt := range options {
opt(&opts)
}
// Marshal proto and compress.
pbBytes, err := proto.Marshal(&prompb.WriteRequest{
Timeseries: toProtoTimeSeries(req.TimeSeries),
})
pbBytes, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("promwrite: marshaling remote write request proto: %w", err)
}
Expand Down
176 changes: 175 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/castai/promwrite"
)

func TestClient(t *testing.T) {
func TestWrite(t *testing.T) {
t.Run("write with default options", func(t *testing.T) {
r := require.New(t)

Expand Down Expand Up @@ -202,6 +202,180 @@ func TestClient(t *testing.T) {
})
}

func TestWriteProto(t *testing.T) {
t.Run("write with default options", func(t *testing.T) {
r := require.New(t)

receivedWriteRequest := make(chan *prompb.WriteRequest, 1)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
b, _ := io.ReadAll(req.Body)
parsed, err := parseWriteRequest(b)
r.NoError(err)
receivedWriteRequest <- parsed
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

client := promwrite.NewClient(srv.URL)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

nowUnixMilli := time.Now().UTC().UnixMilli()
req := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
{
Name: "custom_label_a",
Value: "custom_value_a",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 123},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_b",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 456},
},
},
},
}
_, err := client.WriteProto(ctx, req)
r.NoError(err)

res := <-receivedWriteRequest
r.Len(res.Timeseries, 2)
r.Equal(prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
{
Name: "custom_label_a",
Value: "custom_value_a",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 123},
},
}, res.Timeseries[0])
r.Equal(prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_b",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 456},
},
}, res.Timeseries[1])
})

t.Run("write with custom options", func(t *testing.T) {
r := require.New(t)

receivedWriteRequest := make(chan *prompb.WriteRequest, 1)
receivedHeaders := make(chan http.Header, 1)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
b, _ := ioutil.ReadAll(req.Body)
parsed, err := parseWriteRequest(b)
r.NoError(err)
receivedWriteRequest <- parsed
receivedHeaders <- req.Header
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

sentRequest := make(chan *http.Request, 1)
client := promwrite.NewClient(
srv.URL,
promwrite.HttpClient(&http.Client{
Timeout: 5 * time.Second,
Transport: &customTestHttpClientTransport{
reqChan: sentRequest,
next: http.DefaultTransport,
},
}),
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

nowUnixMilli := time.Now().UTC().UnixMilli()
req := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
},
Samples: []prompb.Sample{
{Timestamp: nowUnixMilli, Value: 123},
},
},
},
}
_, err := client.WriteProto(ctx, req, promwrite.WriteHeaders(map[string]string{"X-Scope-OrgID": "tenant1"}))
r.NoError(err)

res := <-receivedWriteRequest
r.Len(res.Timeseries, 1)
r.Equal(prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric_a",
},
},
Samples: []prompb.Sample{
{
Timestamp: nowUnixMilli,
Value: 123,
},
},
}, res.Timeseries[0])

sentReq := <-sentRequest
reqHeaders := sentReq.Header
recvHeaders := <-receivedHeaders
r.Equal("tenant1", reqHeaders.Get("X-Scope-OrgID"))
r.Equal("tenant1", recvHeaders.Get("X-Scope-OrgID"))
})

t.Run("write error", func(t *testing.T) {
r := require.New(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("ups"))
}))
defer srv.Close()

client := promwrite.NewClient(srv.URL)

_, err := client.WriteProto(context.Background(), &prompb.WriteRequest{})
r.EqualError(err, "promwrite: expected status 200, got 400: ups")
var writeErr *promwrite.WriteError
r.True(errors.As(err, &writeErr))
r.Equal(http.StatusBadRequest, writeErr.StatusCode())
})
}

func parseWriteRequest(src []byte) (*prompb.WriteRequest, error) {
decompressed, err := snappy.Decode(nil, src)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
module github.com/castai/promwrite

go 1.18
go 1.24.3

require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/prometheus/prometheus v0.40.3
github.com/stretchr/testify v1.8.1
github.com/golang/snappy v1.0.0
github.com/prometheus/prometheus v0.304.1
github.com/stretchr/testify v1.10.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.63.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
47 changes: 29 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/prometheus v0.40.3 h1:oMw1vVyrxHTigXAcFY6QHrGUnQEbKEOKo737cPgYBwY=
github.com/prometheus/prometheus v0.40.3/go.mod h1:/UhsWkOXkO11wqTW2Bx5YDOwRweSDcaFBlTIzFe7P0Y=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
github.com/prometheus/prometheus v0.304.1 h1:e4kpJMb2Vh/PcR6LInake+ofcvFYHT+bCfmBvOkaZbY=
github.com/prometheus/prometheus v0.304.1/go.mod h1:ioGx2SGKTY+fLnJSQCdTHqARVldGNS8OlIe3kvp98so=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -45,6 +50,8 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand All @@ -53,8 +60,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=