diff --git a/.gitignore b/.gitignore index 220d374d..c26604f2 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ *.dll *.so *.dylib +/bktec # Test binary, built with `go test -c` *.test diff --git a/go.mod b/go.mod index 03c0406c..a3f349af 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/buildkite/test-engine-client -go 1.21 +go 1.23.0 -toolchain go1.22.4 +toolchain go1.24.0 require ( github.com/buildkite/roko v1.3.1 @@ -11,18 +11,22 @@ require ( require ( drjosh.dev/zzglob v0.4.0 + github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c + github.com/google/uuid v1.6.0 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/olekukonko/tablewriter v0.0.5 github.com/pact-foundation/pact-go/v2 v2.0.10 + golang.org/x/net v0.35.0 golang.org/x/sys v0.30.0 + google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.5 ) require ( github.com/hashicorp/logutils v1.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect - google.golang.org/grpc v1.67.3 // indirect - google.golang.org/protobuf v1.36.3 // indirect + golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e // indirect ) diff --git a/go.sum b/go.sum index 3ee73105..1eebf23f 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,21 @@ drjosh.dev/zzglob v0.4.0 h1:gOb46aIHyHG8BlYpvZZM4dqR2dpsbKtI82IbYVAYIj4= drjosh.dev/zzglob v0.4.0/go.mod h1:c3V3WPyfG+81h/bNOalEaba0jEQl16i9efSAmWOeOw8= +github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c h1:qLnyVD+ND7Ll3p9Lw0Z7Vk5HirKRZcBRJzHELYe5Z84= +github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c/go.mod h1:GHZ5lGzUtz9LQ2oHt8EweXn0zS8t2sCD9bNBw9R9s8E= github.com/buildkite/roko v1.3.1 h1:t7K30ceLLYn6k7hQP4oq1c7dVlhgD5nRcuSRDEEnY1s= github.com/buildkite/roko v1.3.1/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= @@ -20,18 +30,32 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8= -google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE= +google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e h1:nsxey/MfoGzYNduN0NN/+hqP9iiCIYsrVbXb/8hjFM8= +google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e/go.mod h1:Xsh8gBVxGCcbV8ZeTB9wI5XPyZ5RvC6V3CTeeplHbiA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e h1:YA5lmSs3zc/5w+xsRcHqpETkaYyK63ivEPzNTcUUlSA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/internal/bes/bes.go b/internal/bes/bes.go new file mode 100644 index 00000000..6d247263 --- /dev/null +++ b/internal/bes/bes.go @@ -0,0 +1,151 @@ +// Package bes implements a Bazel Build Event Service gRPC listener: +// https://bazel.build/remote/bep#build-event-service +// It listens for TestResult events, and uploads their XML report to Test +// Engine. +package bes + +import ( + "context" + "fmt" + "io" + "sort" + + slog "github.com/buildkite/test-engine-client/internal/bes/quietslog" + + "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/emptypb" +) + +type BuildEventServer struct { + handler *BuildEventHandler +} + +// PublishLifecycleEvent is copied verbatim from: +// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go +// +// PublishLifecycleEvent handles life cycle events. +func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { + slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) + return &emptypb.Empty{}, nil +} + +// PublishBuildToolEventStream is copied verbatim from: +// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go +// The BuildEventHandler and BuildEventChannel that it passes events to mimicks +// the expected interfaces, but provide a bktec-specific implementation. +// +// PublishBuildToolEventStream handles a build tool event stream. +// bktec thanks buildbarn/bb-portal for the basis of this :D +func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { + slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) + + // List of SequenceIds we've received. + // We'll want to ack these once all events are received, as we don't support resumption. + seqNrs := make([]int64, 0) + + ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) { + if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ + StreamId: streamID, + SequenceNumber: sequenceNumber, + }); err != nil { + + // with the option --bes_upload_mode=fully_async or nowait_for_upload_complete + // its not an error when the send fails. the bes gracefully terminated the close + // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) + // the context is processed in the background, so by the time we are acknowledging these + // requests, the client connection may have already timed out and these errors can be + // safely ignored + grpcErr := status.Convert(err) + if isClosing && + grpcErr.Code() == codes.Unavailable && + grpcErr.Message() == "transport is closing" { + return + } + + slog.ErrorContext( + stream.Context(), + "Send failed", + "err", + err, + "streamid", + streamID, + "sequenceNumber", + sequenceNumber, + ) + } + } + + var streamID *build.StreamId + reqCh := make(chan *build.PublishBuildToolEventStreamRequest) + errCh := make(chan error) + var eventCh BuildEventChannel + + go func() { + for { + req, err := stream.Recv() + if err != nil { + errCh <- err + return + } + reqCh <- req + } + }() + + for { + select { + case err := <-errCh: + if err == io.EOF { + slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) + + if eventCh == nil { + slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context()) + return nil + } + + // Validate that all events were received + sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] }) + + // TODO: Find out if initial sequence number can be != 1 + expected := int64(1) + for _, seqNr := range seqNrs { + if seqNr != expected { + return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected)) + } + expected++ + } + + err := eventCh.Finalize() + if err != nil { + return err + } + + // Ack all events + for _, seqNr := range seqNrs { + ack(streamID, seqNr, true) + } + + return nil + } + + slog.ErrorContext(stream.Context(), "Recv failed", "err", err) + return err + + case req := <-reqCh: + // First event + if streamID == nil { + streamID = req.OrderedBuildEvent.GetStreamId() + eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent) + } + + seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) + + if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { + slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) + return err + } + } + } +} diff --git a/internal/bes/bes_test.go b/internal/bes/bes_test.go new file mode 100644 index 00000000..57495e2a --- /dev/null +++ b/internal/bes/bes_test.go @@ -0,0 +1,14 @@ +package bes + +import "testing" + +func TestPathFromURI(t *testing.T) { + path, err := pathFromURI("file:///hello/world.txt") + if err != nil { + t.Errorf("pathFromURI error: %v", err) + } + + if want := "/hello/world.txt"; want != path { + t.Errorf("wanted %v got %v", want, path) + } +} diff --git a/internal/bes/channel.go b/internal/bes/channel.go new file mode 100644 index 00000000..621e6a5e --- /dev/null +++ b/internal/bes/channel.go @@ -0,0 +1,107 @@ +package bes + +import ( + "context" + "fmt" + "log/slog" + "net/url" + "time" + + "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventChannel in bktec mimics the bb-portal interface so that the +// BuildEventServer.PublishBuildEventServer code can be used verbatim. +// +// BuildEventChannel handles a single BuildEvent stream +type BuildEventChannel interface { + // HandleBuildEvent processes a single BuildEvent + // This method should be called for each received event. + HandleBuildEvent(event *build.BuildEvent) error + + // Finalize does post-processing of a stream of BuildEvents. + // This method should be called after receiving the EOF event. + Finalize() error +} + +type buildEventChannel struct { + ctx context.Context + streamID *build.StreamId + filenames chan<- string +} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + if event.GetBazelEvent() == nil { + return nil + } + var bazelEvent bes.BuildEvent + if err := event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil { + slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err) + return err + } + + payload := bazelEvent.GetPayload() + if testResult, ok := payload.(*bes.BuildEvent_TestResult); ok { + r := testResult.TestResult + files := []string{} + for _, x := range r.GetTestActionOutput() { + if x.GetName() == "test.xml" { + path, err := pathFromURI(x.GetUri()) + if err != nil { + return err // maybe just a log a warning? + } + files = append(files, path) + c.filenames <- path + } + } + slog.Info("TestResult", + "status", r.GetStatus(), + "cached", r.GetCachedLocally(), + "dur", r.GetTestAttemptDuration().AsDuration().String(), + "files", files, + ) + } + + return nil +} + +func pathFromURI(uri string) (string, error) { + url, err := url.Parse(uri) + if err != nil { + return "", err + } + if url.Scheme != "file" { + return "", fmt.Errorf("expected file://..., got %v://...", url.Scheme) + } + return url.Path, nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *buildEventChannel) Finalize() error { + // defer the ctx so its not reaped when the client closes the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) + defer cancel() + + slog.Info("finalizing build event channel") + _ = ctx + // TODO: finalize anything that needs finalizing? + + cancel() + return nil +} + +// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events. +// It is used when receiving a stream of events that we wish to ack without processing. +type noOpBuildEventChannel struct{} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + return nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *noOpBuildEventChannel) Finalize() error { + return nil +} diff --git a/internal/bes/handler.go b/internal/bes/handler.go new file mode 100644 index 00000000..95134283 --- /dev/null +++ b/internal/bes/handler.go @@ -0,0 +1,39 @@ +package bes + +import ( + "context" + + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventHandler in bktec mimics the bb-portal handler so that the +// BuildEventServer.PublishBuildToolEventStream code can be used verbatim. +// +// BuildEventHandler orchestrates the handling of incoming Build Event streams. +// For each incoming stream, and BuildEventChannel is created, which handles that stream. +// BuildEventHandler is responsible for managing the things that are common to these event streams. +type BuildEventHandler struct { + filenames chan<- string +} + +// NewBuildEventHandler constructs a new BuildEventHandler +func NewBuildEventHandler(filenames chan<- string) *BuildEventHandler { + return &BuildEventHandler{ + filenames: filenames, + } +} + +// CreateEventChannel creates a new BuildEventChannel +func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel { + // If the first event does not have sequence number 1, we have processed this + // invocation previously, and should skip all processing. + if initialEvent.SequenceNumber != 1 { + return &noOpBuildEventChannel{} + } + + return &buildEventChannel{ + ctx: ctx, + streamID: initialEvent.StreamId, + filenames: h.filenames, + } +} diff --git a/internal/bes/listen.go b/internal/bes/listen.go new file mode 100644 index 00000000..2a74452b --- /dev/null +++ b/internal/bes/listen.go @@ -0,0 +1,101 @@ +package bes + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/upload" + "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc" +) + +func ListenCLI(argv []string, env env.Env) error { + flags := flag.NewFlagSet("bktec bazel listen", flag.ExitOnError) + portFlag := flags.Int("port", 0, "gRPC port to listen") + listenHostFlag := flags.String("listen-host", "127.0.0.1", "gRPC host to listen") + debugFlag := flags.Bool("debug", false, "debug logging") + flags.Parse(argv) + + if *debugFlag { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // a channel to propagate OS signals + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + // configure uploader + cfg, err := upload.ConfigFromEnv(env) + if err != nil { + return fmt.Errorf("uploader configuration: %w", err) + } + runEnv, err := upload.RunEnvFromEnv(env) + if err != nil { + return fmt.Errorf("uploader run_env configuration: %w", err) + } + uploader := NewUploader(cfg, runEnv, "junit") + go uploader.Start(ctx) + + // configure gRPC Bazel BES server + addr := fmt.Sprintf("%s:%d", *listenHostFlag, *portFlag) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("listening on %s: %w", addr, err) + } + opts := []grpc.ServerOption{} + srv := grpc.NewServer(opts...) + build.RegisterPublishBuildEventServer(srv, BuildEventServer{ + handler: &BuildEventHandler{ + filenames: uploader.Filenames, + }, + }) + slog.Info("Bazel BES listener", "addr", "grpc://"+listener.Addr().String()) + go serve(srv, listener) + + // main loop + run := true + sigCount := 0 + for run { + select { + case url, ok := <-uploader.Responses: + if !ok { + slog.Debug("Response channel closed") + run = false + continue + } + slog.Info("Uploaded", "url", url) + case err := <-uploader.Errs: + slog.Error("Upload error", "error", err) + case sig := <-signals: + sigCount++ + srv.Stop() + if sigCount == 1 { + slog.Info("Stopping (again to force)...", "signal", sig) + uploader.Stop() + } else { + slog.Info("Stopping forcefully...", "signal", sig) + cancel() + } + } + } + + slog.Debug("done") + return nil +} + +func serve(s *grpc.Server, listener net.Listener) { + err := s.Serve(listener) + if err != nil { + slog.Error("gRPC server error", "err", err) + } +} diff --git a/internal/bes/quietslog/quietslog.go b/internal/bes/quietslog/quietslog.go new file mode 100644 index 00000000..4bf11c62 --- /dev/null +++ b/internal/bes/quietslog/quietslog.go @@ -0,0 +1,26 @@ +// Package quietslog provides a replacement for slog which downgrades Info +// messages to Debug instead, so that the log output is quieter. This is done +// specifically so that bes.BuildEventServer.PublishBuildToolEventStream() +// source code can be kept unmodified from the bb-portal upstream it's copied +// from. +package quietslog + +import ( + "context" + "log/slog" +) + +// InfoContext delegates to DebugContext of the real logger, making this logger quiet. +func InfoContext(ctx context.Context, msg string, args ...any) { + slog.DebugContext(ctx, msg, args...) +} + +// WarnContext wraps the direct logger directly. +func WarnContext(ctx context.Context, msg string, args ...any) { + slog.WarnContext(ctx, msg, args...) +} + +// ErrorContext wraps the direct logger directly. +func ErrorContext(ctx context.Context, msg string, args ...any) { + slog.ErrorContext(ctx, msg, args...) +} diff --git a/internal/bes/uploader.go b/internal/bes/uploader.go new file mode 100644 index 00000000..8f7fc5a1 --- /dev/null +++ b/internal/bes/uploader.go @@ -0,0 +1,76 @@ +package bes + +import ( + "context" + "log/slog" + + "github.com/buildkite/test-engine-client/internal/upload" +) + +type Uploader struct { + Config upload.Config + RunEnv upload.RunEnvMap + Format string + Filenames chan string + Responses chan string + Errs chan error + + stopping bool +} + +func NewUploader(cfg upload.Config, runEnv upload.RunEnvMap, format string) *Uploader { + // a channel to pass filenames from BES server to uploader + filenames := make(chan string, 1024) + + // a channel to receive response upload URLs + responses := make(chan string) + + // a channel to receive errors from the uploader + errs := make(chan error) + + return &Uploader{ + Config: cfg, + RunEnv: runEnv, + Format: format, + Filenames: filenames, + Responses: responses, + Errs: errs, + } +} + +func (u *Uploader) Start(ctx context.Context) { + for filename := range u.Filenames { + if ctx.Err() != nil { + slog.Debug("Uploader context canceled") + break + } + resp, err := u.UploadFile(ctx, filename) + if err != nil { + u.Errs <- err + continue + } + u.Responses <- resp["upload_url"] + } + slog.Debug("Uploader finished") + close(u.Responses) +} + +// Stop closes the Filenames channel; filenames already buffered on the channel +// will be uploaded before finishing. +func (u *Uploader) Stop() { + if u.stopping { + slog.Warn("Uploader GracefulStop: already stopping") + return + } + slog.Debug("Uploader GracefulStop") + u.stopping = true + close(u.Filenames) +} + +func (u *Uploader) UploadFile(ctx context.Context, filename string) (map[string]string, error) { + resp, err := upload.Upload(ctx, u.Config, u.RunEnv, u.Format, filename) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/internal/config/config.go b/internal/config/config.go index 8c8753fd..19200a14 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,5 +1,7 @@ package config +import "github.com/buildkite/test-engine-client/internal/env" + // Config is the internal representation of the complete test engine client configuration. type Config struct { // AccessToken is the access token for the API. @@ -36,17 +38,23 @@ type Config struct { Branch string // JobRetryCount is the count of the number of times the job has been retried. JobRetryCount int + // Env provides access to environment variables. + // It's public because many tests in other packages reference it (perhaps they should not). + Env env.Env // errs is a map of environment variables name and the validation errors associated with them. errs InvalidConfigError } // New wraps the readFromEnv and validate functions to create a new Config struct. // It returns Config struct and an InvalidConfigError if there is an invalid configuration. -func New() (Config, error) { - c := Config{errs: InvalidConfigError{}} +func New(env env.Env) (Config, error) { + c := Config{ + Env: env, + errs: InvalidConfigError{}, + } // TODO: remove error from readFromEnv and validate functions - _ = c.readFromEnv() + _ = c.readFromEnv(env) _ = c.validate() if len(c.errs) > 0 { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 03cc75aa..eb963300 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -2,34 +2,34 @@ package config import ( "errors" - "os" "testing" + "github.com/buildkite/test-engine-client/internal/env" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" ) -func setEnv(t *testing.T) { - t.Helper() - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "60") - os.Setenv("BUILDKITE_PARALLEL_JOB", "7") - os.Setenv("BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN", "my_token") - os.Setenv("BUILDKITE_TEST_ENGINE_BASE_URL", "https://build.kite") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_CMD", "bin/rspec {{testExamples}}") - os.Setenv("BUILDKITE_ORGANIZATION_SLUG", "my_org") - os.Setenv("BUILDKITE_TEST_ENGINE_SUITE_SLUG", "my_suite") - os.Setenv("BUILDKITE_BUILD_ID", "123") - os.Setenv("BUILDKITE_STEP_ID", "456") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_RUNNER", "rspec") - os.Setenv("BUILDKITE_TEST_ENGINE_RESULT_PATH", "tmp/rspec.json") - os.Setenv("BUILDKITE_RETRY_COUNT", "0") +func getExampleEnv() env.Env { + return env.Map{ + "BUILDKITE_PARALLEL_JOB_COUNT": "60", + "BUILDKITE_PARALLEL_JOB": "7", + "BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN": "my_token", + "BUILDKITE_TEST_ENGINE_BASE_URL": "https://build.kite", + "BUILDKITE_TEST_ENGINE_TEST_CMD": "bin/rspec {{testExamples}}", + "BUILDKITE_ORGANIZATION_SLUG": "my_org", + "BUILDKITE_TEST_ENGINE_SUITE_SLUG": "my_suite", + "BUILDKITE_BUILD_ID": "123", + "BUILDKITE_STEP_ID": "456", + "BUILDKITE_TEST_ENGINE_TEST_RUNNER": "rspec", + "BUILDKITE_TEST_ENGINE_RESULT_PATH": "tmp/rspec.json", + "BUILDKITE_RETRY_COUNT": "0", + } } func TestNewConfig(t *testing.T) { - setEnv(t) - defer os.Clearenv() + env := getExampleEnv() - c, err := New() + c, err := New(env) if err != nil { t.Errorf("config.New() error = %v", err) } @@ -46,6 +46,7 @@ func TestNewConfig(t *testing.T) { SuiteSlug: "my_suite", TestRunner: "rspec", JobRetryCount: 0, + Env: env, errs: InvalidConfigError{}, } @@ -55,9 +56,7 @@ func TestNewConfig(t *testing.T) { } func TestNewConfig_EmptyConfig(t *testing.T) { - os.Clearenv() - - _, err := New() + _, err := New(env.Map{}) if !errors.As(err, new(InvalidConfigError)) { t.Errorf("config.Validate() error = %v, want InvalidConfigError", err) @@ -65,13 +64,12 @@ func TestNewConfig_EmptyConfig(t *testing.T) { } func TestNewConfig_MissingConfigWithDefault(t *testing.T) { - setEnv(t) - os.Unsetenv("BUILDKITE_TEST_ENGINE_MODE") - os.Unsetenv("BUILDKITE_TEST_ENGINE_BASE_URL") - os.Unsetenv("BUILDKITE_TEST_ENGINE_TEST_CMD") - defer os.Clearenv() + env := getExampleEnv() + env.Delete("BUILDKITE_TEST_ENGINE_MODE") + env.Delete("BUILDKITE_TEST_ENGINE_BASE_URL") + env.Delete("BUILDKITE_TEST_ENGINE_TEST_CMD") - c, err := New() + c, err := New(env) if err != nil { t.Errorf("config.New() error = %v", err) } @@ -87,6 +85,7 @@ func TestNewConfig_MissingConfigWithDefault(t *testing.T) { TestRunner: "rspec", ResultPath: "tmp/rspec.json", JobRetryCount: 0, + Env: env, } if diff := cmp.Diff(c, want, cmpopts.IgnoreUnexported(Config{})); diff != "" { @@ -95,12 +94,11 @@ func TestNewConfig_MissingConfigWithDefault(t *testing.T) { } func TestNewConfig_InvalidConfig(t *testing.T) { - setEnv(t) - os.Setenv("BUILDKITE_TEST_ENGINE_MODE", "dynamic") - os.Unsetenv("BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN") - defer os.Clearenv() + env := getExampleEnv() + env.Set("BUILDKITE_TEST_ENGINE_MODE", "dynamic") + env.Delete("BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN") - _, err := New() + _, err := New(env) var invConfigError InvalidConfigError if !errors.As(err, &invConfigError) { diff --git a/internal/config/env.go b/internal/config/env.go index 596c50ba..9183d7d4 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -1,15 +1,16 @@ package config import ( - "os" "strconv" + + "github.com/buildkite/test-engine-client/internal/env" ) // getEnvWithDefault retrieves the value of the environment variable named by the key. // If the variable is present and not empty, the value is returned. // Otherwise the returned value will be the default value. -func getEnvWithDefault(key string, defaultValue string) string { - value, ok := os.LookupEnv(key) +func getEnvWithDefault(env env.Env, key string, defaultValue string) string { + value, ok := env.Lookup(key) if !ok { return defaultValue } @@ -19,8 +20,8 @@ func getEnvWithDefault(key string, defaultValue string) string { return value } -func getIntEnvWithDefault(key string, defaultValue int) (int, error) { - value := os.Getenv(key) +func getIntEnvWithDefault(env env.Env, key string, defaultValue int) (int, error) { + value := env.Get(key) // If the environment variable is not set, return the default value. if value == "" { return defaultValue, nil @@ -57,7 +58,7 @@ func (c Config) DumpEnv() map[string]string { envs := make(map[string]string) for _, key := range keys { - envs[key] = os.Getenv(key) + envs[key] = c.Env.Get(key) } envs["BUILDKITE_TEST_ENGINE_IDENTIFIER"] = c.Identifier diff --git a/internal/config/env_test.go b/internal/config/env_test.go index faa184af..fb4d1ed4 100644 --- a/internal/config/env_test.go +++ b/internal/config/env_test.go @@ -2,20 +2,18 @@ package config import ( "errors" - "os" "strconv" "testing" + + "github.com/buildkite/test-engine-client/internal/env" ) func TestGetIntEnvWithDefault(t *testing.T) { - os.Setenv("MY_KEY", "10") - defer os.Unsetenv("MY_KEY") - - os.Setenv("EMPTY_KEY", "") - defer os.Unsetenv("EMPTY_KEY") - - os.Setenv("INVALID_KEY", "invalid_value") - defer os.Unsetenv("INVALID_KEY") + env := env.Map{ + "MY_KEY": "10", + "EMPTY_KEY": "", + "INVALID_KEY": "invalid_value", + } tests := []struct { key string @@ -51,7 +49,7 @@ func TestGetIntEnvWithDefault(t *testing.T) { for _, tt := range tests { t.Run(tt.key, func(t *testing.T) { - got, err := getIntEnvWithDefault(tt.key, tt.defaultValue) + got, err := getIntEnvWithDefault(env, tt.key, tt.defaultValue) if err != nil && !errors.Is(err, tt.err) { t.Errorf("getIntEnvWithDefault(%q, %d) error = %v, want %v", tt.key, tt.defaultValue, err, tt.err) } @@ -63,14 +61,11 @@ func TestGetIntEnvWithDefault(t *testing.T) { } func TestGetEnvWithDefault(t *testing.T) { - os.Setenv("MY_KEY", "my_value") - defer os.Unsetenv("MY_KEY") - - os.Setenv("EMPTY_KEY", "") - defer os.Unsetenv("EMPTY_KEY") - - os.Setenv("OTHER_KEY", "other_value") - defer os.Unsetenv("OTHER_KEY") + env := env.Map{ + "MY_KEY": "my_value", + "EMPTY_KEY": "", + "OTHER_KEY": "other_value", + } tests := []struct { key string @@ -94,14 +89,14 @@ func TestGetEnvWithDefault(t *testing.T) { }, { key: "EMPTY_KEY", - defaultValue: os.Getenv("OTHER_KEY"), + defaultValue: env.Get("OTHER_KEY"), want: "other_value", }, } for _, tt := range tests { t.Run(tt.key, func(t *testing.T) { - if got := getEnvWithDefault(tt.key, tt.defaultValue); got != tt.want { + if got := getEnvWithDefault(env, tt.key, tt.defaultValue); got != tt.want { t.Errorf("getEnvWithDefault(%q, %q) = %q, want %q", tt.key, tt.defaultValue, got, tt.want) } }) diff --git a/internal/config/read.go b/internal/config/read.go index cfd10238..680a2b54 100644 --- a/internal/config/read.go +++ b/internal/config/read.go @@ -2,9 +2,10 @@ package config import ( "fmt" - "os" "strconv" "strings" + + "github.com/buildkite/test-engine-client/internal/env" ) // readFromEnv reads the configuration from environment variables and sets it to the Config struct. @@ -30,57 +31,56 @@ import ( // // If we are going to support other CI environment in the future, // we will need to change where we read the configuration from. -func (c *Config) readFromEnv() error { - - c.AccessToken = os.Getenv("BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN") - c.OrganizationSlug = os.Getenv("BUILDKITE_ORGANIZATION_SLUG") - c.SuiteSlug = os.Getenv("BUILDKITE_TEST_ENGINE_SUITE_SLUG") +func (c *Config) readFromEnv(env env.Env) error { + c.AccessToken = env.Get("BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN") + c.OrganizationSlug = env.Get("BUILDKITE_ORGANIZATION_SLUG") + c.SuiteSlug = env.Get("BUILDKITE_TEST_ENGINE_SUITE_SLUG") - buildId := os.Getenv("BUILDKITE_BUILD_ID") + buildId := env.Get("BUILDKITE_BUILD_ID") if buildId == "" { c.errs.appendFieldError("BUILDKITE_BUILD_ID", "must not be blank") } - stepId := os.Getenv("BUILDKITE_STEP_ID") + stepId := env.Get("BUILDKITE_STEP_ID") if stepId == "" { c.errs.appendFieldError("BUILDKITE_STEP_ID", "must not be blank") } c.Identifier = fmt.Sprintf("%s/%s", buildId, stepId) - c.ServerBaseUrl = getEnvWithDefault("BUILDKITE_TEST_ENGINE_BASE_URL", "https://api.buildkite.com") - c.TestCommand = os.Getenv("BUILDKITE_TEST_ENGINE_TEST_CMD") - c.TestFilePattern = os.Getenv("BUILDKITE_TEST_ENGINE_TEST_FILE_PATTERN") - c.TestFileExcludePattern = os.Getenv("BUILDKITE_TEST_ENGINE_TEST_FILE_EXCLUDE_PATTERN") - c.TestRunner = os.Getenv("BUILDKITE_TEST_ENGINE_TEST_RUNNER") - c.ResultPath = os.Getenv("BUILDKITE_TEST_ENGINE_RESULT_PATH") + c.ServerBaseUrl = getEnvWithDefault(env, "BUILDKITE_TEST_ENGINE_BASE_URL", "https://api.buildkite.com") + c.TestCommand = env.Get("BUILDKITE_TEST_ENGINE_TEST_CMD") + c.TestFilePattern = env.Get("BUILDKITE_TEST_ENGINE_TEST_FILE_PATTERN") + c.TestFileExcludePattern = env.Get("BUILDKITE_TEST_ENGINE_TEST_FILE_EXCLUDE_PATTERN") + c.TestRunner = env.Get("BUILDKITE_TEST_ENGINE_TEST_RUNNER") + c.ResultPath = env.Get("BUILDKITE_TEST_ENGINE_RESULT_PATH") - c.SplitByExample = strings.ToLower(os.Getenv("BUILDKITE_TEST_ENGINE_SPLIT_BY_EXAMPLE")) == "true" + c.SplitByExample = strings.ToLower(env.Get("BUILDKITE_TEST_ENGINE_SPLIT_BY_EXAMPLE")) == "true" // used by Buildkite only, for experimental plans - c.Branch = os.Getenv("BUILDKITE_BRANCH") + c.Branch = env.Get("BUILDKITE_BRANCH") - JobRetryCount, err := getIntEnvWithDefault("BUILDKITE_RETRY_COUNT", 0) + JobRetryCount, err := getIntEnvWithDefault(env, "BUILDKITE_RETRY_COUNT", 0) c.JobRetryCount = JobRetryCount if err != nil { - c.errs.appendFieldError("BUILDKITE_RETRY_COUNT", "was %q, must be a number", os.Getenv("BUILDKITE_RETRY_COUNT")) + c.errs.appendFieldError("BUILDKITE_RETRY_COUNT", "was %q, must be a number", env.Get("BUILDKITE_RETRY_COUNT")) } - MaxRetries, err := getIntEnvWithDefault("BUILDKITE_TEST_ENGINE_RETRY_COUNT", 0) + MaxRetries, err := getIntEnvWithDefault(env, "BUILDKITE_TEST_ENGINE_RETRY_COUNT", 0) c.MaxRetries = MaxRetries if err != nil { - c.errs.appendFieldError("BUILDKITE_TEST_ENGINE_RETRY_COUNT", "was %q, must be a number", os.Getenv("BUILDKITE_TEST_ENGINE_RETRY_COUNT")) + c.errs.appendFieldError("BUILDKITE_TEST_ENGINE_RETRY_COUNT", "was %q, must be a number", env.Get("BUILDKITE_TEST_ENGINE_RETRY_COUNT")) } - c.RetryCommand = os.Getenv("BUILDKITE_TEST_ENGINE_RETRY_CMD") + c.RetryCommand = env.Get("BUILDKITE_TEST_ENGINE_RETRY_CMD") - parallelism := os.Getenv("BUILDKITE_PARALLEL_JOB_COUNT") + parallelism := env.Get("BUILDKITE_PARALLEL_JOB_COUNT") parallelismInt, err := strconv.Atoi(parallelism) if err != nil { c.errs.appendFieldError("BUILDKITE_PARALLEL_JOB_COUNT", "was %q, must be a number", parallelism) } c.Parallelism = parallelismInt - nodeIndex := os.Getenv("BUILDKITE_PARALLEL_JOB") + nodeIndex := env.Get("BUILDKITE_PARALLEL_JOB") nodeIndexInt, err := strconv.Atoi(nodeIndex) if err != nil { c.errs.appendFieldError("BUILDKITE_PARALLEL_JOB", "was %q, must be a number", nodeIndex) diff --git a/internal/config/read_test.go b/internal/config/read_test.go index 4aa0c8e2..d574b71e 100644 --- a/internal/config/read_test.go +++ b/internal/config/read_test.go @@ -2,33 +2,32 @@ package config import ( "errors" - "os" "testing" + "github.com/buildkite/test-engine-client/internal/env" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" ) func TestConfigReadFromEnv(t *testing.T) { - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "10") - os.Setenv("BUILDKITE_PARALLEL_JOB", "0") - os.Setenv("BUILDKITE_TEST_ENGINE_BASE_URL", "https://buildkite.localhost") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_CMD", "bin/rspec {{testExamples}}") - os.Setenv("BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN", "my_token") - os.Setenv("BUILDKITE_ORGANIZATION_SLUG", "my_org") - os.Setenv("BUILDKITE_TEST_ENGINE_SUITE_SLUG", "my_suite") - os.Setenv("BUILDKITE_TEST_ENGINE_RETRY_COUNT", "3") - os.Setenv("BUILDKITE_TEST_ENGINE_SPLIT_BY_EXAMPLE", "TRUE") - os.Setenv("BUILDKITE_BUILD_ID", "123") - os.Setenv("BUILDKITE_STEP_ID", "456") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_FILE_PATTERN", "spec/unit/**/*_spec.rb") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_FILE_EXCLUDE_PATTERN", "spec/feature/**/*_spec.rb") - os.Setenv("BUILDKITE_TEST_ENGINE_RESULT_PATH", "result.json") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_RUNNER", "rspec") - defer os.Clearenv() - c := Config{} - err := c.readFromEnv() + err := c.readFromEnv(env.Map{ + "BUILDKITE_PARALLEL_JOB_COUNT": "10", + "BUILDKITE_PARALLEL_JOB": "0", + "BUILDKITE_TEST_ENGINE_BASE_URL": "https://buildkite.localhost", + "BUILDKITE_TEST_ENGINE_TEST_CMD": "bin/rspec {{testExamples}}", + "BUILDKITE_TEST_ENGINE_API_ACCESS_TOKEN": "my_token", + "BUILDKITE_ORGANIZATION_SLUG": "my_org", + "BUILDKITE_TEST_ENGINE_SUITE_SLUG": "my_suite", + "BUILDKITE_TEST_ENGINE_RETRY_COUNT": "3", + "BUILDKITE_TEST_ENGINE_SPLIT_BY_EXAMPLE": "TRUE", + "BUILDKITE_BUILD_ID": "123", + "BUILDKITE_STEP_ID": "456", + "BUILDKITE_TEST_ENGINE_TEST_FILE_PATTERN": "spec/unit/**/*_spec.rb", + "BUILDKITE_TEST_ENGINE_TEST_FILE_EXCLUDE_PATTERN": "spec/feature/**/*_spec.rb", + "BUILDKITE_TEST_ENGINE_RESULT_PATH": "result.json", + "BUILDKITE_TEST_ENGINE_TEST_RUNNER": "rspec", + }) want := Config{ Parallelism: 10, @@ -58,16 +57,16 @@ func TestConfigReadFromEnv(t *testing.T) { } func TestConfigReadFromEnv_MissingConfigWithDefault(t *testing.T) { - os.Setenv("BUILDKITE_TEST_ENGINE_BASE_URL", "") - os.Setenv("BUILDKITE_TEST_ENGINE_MODE", "") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_CMD", "") - os.Setenv("BUILDKITE_TEST_ENGINE_RETRY_COUNT", "") - os.Setenv("BUILDKITE_BUILD_ID", "123") - os.Setenv("BUILDKITE_STEP_ID", "456") - defer os.Clearenv() - c := Config{errs: InvalidConfigError{}} - c.readFromEnv() + c.readFromEnv(env.Map{ + "BUILDKITE_TEST_ENGINE_BASE_URL": "", + "BUILDKITE_TEST_ENGINE_MODE": "", + "BUILDKITE_TEST_ENGINE_TEST_CMD": "", + "BUILDKITE_TEST_ENGINE_RETRY_COUNT": "", + "BUILDKITE_BUILD_ID": "123", + "BUILDKITE_STEP_ID": "456", + }) + if c.ServerBaseUrl != "https://api.buildkite.com" { t.Errorf("ServerBaseUrl = %v, want %v", c.ServerBaseUrl, "https://api.buildkite.com") } @@ -82,17 +81,13 @@ func TestConfigReadFromEnv_MissingConfigWithDefault(t *testing.T) { } func TestConfigReadFromEnv_NotInteger(t *testing.T) { - os.Setenv("BUILDKITE_BUILD_ID", "abc") - os.Setenv("BUILDKITE_STEP_ID", "123") - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "foo") - os.Setenv("BUILDKITE_PARALLEL_JOB", "bar") - defer os.Unsetenv("BUILDKITE_BUILD_ID") - defer os.Unsetenv("BUILDKITE_STEP_ID") - defer os.Unsetenv("BUILDKITE_PARALLEL_JOB_COUNT") - defer os.Unsetenv("BUILDKITE_PARALLEL_JOB") - c := Config{errs: InvalidConfigError{}} - err := c.readFromEnv() + err := c.readFromEnv(env.Map{ + "BUILDKITE_BUILD_ID": "abc", + "BUILDKITE_STEP_ID": "123", + "BUILDKITE_PARALLEL_JOB_COUNT": "foo", + "BUILDKITE_PARALLEL_JOB": "bar", + }) var invConfigError InvalidConfigError if !errors.As(err, &invConfigError) { @@ -106,17 +101,16 @@ func TestConfigReadFromEnv_NotInteger(t *testing.T) { } func TestConfigReadFromEnv_MissingBuildId(t *testing.T) { - os.Setenv("BUILDKITE_TEST_ENGINE_BASE_URL", "") - os.Setenv("BUILDKITE_TEST_ENGINE_MODE", "") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_CMD", "") - os.Setenv("BUILDKITE_TEST_ENGINE_RETRY_COUNT", "") - os.Setenv("BUILDKITE_STEP_ID", "123") - os.Setenv("BUILDKITE_PARALLEL_JOB", "1") - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "10") - defer os.Clearenv() - c := Config{errs: InvalidConfigError{}} - err := c.readFromEnv() + err := c.readFromEnv(env.Map{ + "BUILDKITE_TEST_ENGINE_BASE_URL": "", + "BUILDKITE_TEST_ENGINE_MODE": "", + "BUILDKITE_TEST_ENGINE_TEST_CMD": "", + "BUILDKITE_TEST_ENGINE_RETRY_COUNT": "", + "BUILDKITE_STEP_ID": "123", + "BUILDKITE_PARALLEL_JOB": "1", + "BUILDKITE_PARALLEL_JOB_COUNT": "10", + }) var invConfigError InvalidConfigError if !errors.As(err, &invConfigError) { @@ -131,17 +125,16 @@ func TestConfigReadFromEnv_MissingBuildId(t *testing.T) { } func TestConfigReadFromEnv_MissingStepId(t *testing.T) { - os.Setenv("BUILDKITE_TEST_ENGINE_BASE_URL", "") - os.Setenv("BUILDKITE_TEST_ENGINE_MODE", "") - os.Setenv("BUILDKITE_TEST_ENGINE_TEST_CMD", "") - os.Setenv("BUILDKITE_TEST_ENGINE_RETRY_COUNT", "") - os.Setenv("BUILDKITE_BUILD_ID", "123") - os.Setenv("BUILDKITE_PARALLEL_JOB", "1") - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "10") - defer os.Clearenv() - c := Config{errs: InvalidConfigError{}} - err := c.readFromEnv() + err := c.readFromEnv(env.Map{ + "BUILDKITE_TEST_ENGINE_BASE_URL": "", + "BUILDKITE_TEST_ENGINE_MODE": "", + "BUILDKITE_TEST_ENGINE_TEST_CMD": "", + "BUILDKITE_TEST_ENGINE_RETRY_COUNT": "", + "BUILDKITE_BUILD_ID": "123", + "BUILDKITE_PARALLEL_JOB": "1", + "BUILDKITE_PARALLEL_JOB_COUNT": "10", + }) var invConfigError InvalidConfigError if !errors.As(err, &invConfigError) { @@ -158,14 +151,13 @@ func TestConfigReadFromEnv_MissingStepId(t *testing.T) { } func TestConfigReadFromEnv_InvalidParallelJob(t *testing.T) { - os.Setenv("BUILDKITE_BUILD_ID", "123") - os.Setenv("BUILDKITE_STEP_ID", "456") - os.Setenv("BUILDKITE_PARALLEL_JOB", "") - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "10") - defer os.Clearenv() - c := Config{errs: InvalidConfigError{}} - err := c.readFromEnv() + err := c.readFromEnv(env.Map{ + "BUILDKITE_BUILD_ID": "123", + "BUILDKITE_STEP_ID": "456", + "BUILDKITE_PARALLEL_JOB": "", + "BUILDKITE_PARALLEL_JOB_COUNT": "10", + }) var invConfigError InvalidConfigError if !errors.As(err, &invConfigError) { @@ -180,14 +172,13 @@ func TestConfigReadFromEnv_InvalidParallelJob(t *testing.T) { } func TestConfigReadFromEnv_InvalidParallelJobCount(t *testing.T) { - os.Setenv("BUILDKITE_BUILD_ID", "123") - os.Setenv("BUILDKITE_STEP_ID", "456") - os.Setenv("BUILDKITE_PARALLEL_JOB", "10") - os.Setenv("BUILDKITE_PARALLEL_JOB_COUNT", "") - defer os.Clearenv() - c := Config{errs: InvalidConfigError{}} - err := c.readFromEnv() + err := c.readFromEnv(env.Map{ + "BUILDKITE_BUILD_ID": "123", + "BUILDKITE_STEP_ID": "456", + "BUILDKITE_PARALLEL_JOB": "10", + "BUILDKITE_PARALLEL_JOB_COUNT": "", + }) var invConfigError InvalidConfigError if !errors.As(err, &invConfigError) { diff --git a/internal/env/env.go b/internal/env/env.go new file mode 100644 index 00000000..77aae33e --- /dev/null +++ b/internal/env/env.go @@ -0,0 +1,54 @@ +package env + +import "os" + +type Env interface { + Get(key string) string + Set(key string, value string) error + Delete(key string) error + Lookup(key string) (string, bool) +} + +// OS is an Env backed by real operating system environment +type OS struct{} + +func (OS) Get(key string) string { + return os.Getenv(key) +} + +func (OS) Set(key string, value string) error { + return os.Setenv(key, value) +} + +func (OS) Delete(key string) error { + return os.Unsetenv(key) +} + +func (OS) Lookup(key string) (string, bool) { + return os.LookupEnv(key) +} + +// Map is an Env backed by a map[string]string for testing etc +type Map map[string]string + +func (env Map) Get(key string) string { + return env[key] +} + +func (env Map) Set(key string, value string) error { + env[key] = value + return nil +} + +func (env Map) Delete(key string) error { + delete(env, key) + return nil +} + +func (env Map) Lookup(key string) (string, bool) { + if val, ok := env[key]; ok { + return val, true + } else { + return "", false + } +} diff --git a/internal/env/env_test.go b/internal/env/env_test.go new file mode 100644 index 00000000..a0f4ab6a --- /dev/null +++ b/internal/env/env_test.go @@ -0,0 +1,108 @@ +package env_test + +import ( + "os" + "testing" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/google/go-cmp/cmp" +) + +// Note: out of the two implementations of interface env: +// - I'm testing env.OS because it's used by real code, +// - I'm not testing env.Map because it's only used in tests. + +func TestOSGet(t *testing.T) { + defer setenvWithUnset("BKTEC_ENV_TEST_VALUE", "hello")() + + env := env.OS{} + + got, want := env.Get("BKTEC_ENV_TEST_VALUE"), "hello" + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("env.Get() diff (-got +want):\n%s", diff) + } +} + +func TestOSGetMissing(t *testing.T) { + os.Unsetenv("BKTEC_ENV_TEST_VALUE") // just in case + + env := env.OS{} + + got, want := env.Get("BKTEC_ENV_TEST_VALUE"), "" + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("env.Get() diff (-got +want):\n%s", diff) + } +} + +func TestOSLookup(t *testing.T) { + defer setenvWithUnset("BKTEC_ENV_TEST_VALUE", "hello")() + + env := env.OS{} + + got, ok := env.Lookup("BKTEC_ENV_TEST_VALUE") + want := "hello" + + if !ok { + t.Errorf("env.Lookup() ok value should be true: %v", ok) + } + + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("env.Lookup() diff (-got +want):\n%s", diff) + } +} + +func TestOSLookupMissing(t *testing.T) { + os.Unsetenv("BKTEC_ENV_TEST_VALUE") // just in case + + env := env.OS{} + + got, ok := env.Lookup("BKTEC_ENV_TEST_VALUE") + want := "" + + if ok { + t.Errorf("env.Lookup() ok value should be false: %v", ok) + } + + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("env.Lookup() diff (-got +want):\n%s", diff) + } +} + +func TestOSDelete(t *testing.T) { + defer setenvWithUnset("BKTEC_ENV_TEST_VALUE", "hello")() + + env := env.OS{} + + err := env.Delete("BKTEC_ENV_TEST_VALUE") + if err != nil { + t.Error(err) + } + + got, want := os.Getenv("BKTEC_ENV_TEST_VALUE"), "" + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("os.Getenv() diff (-got +want):\n%s", diff) + } +} + +func TestOSSet(t *testing.T) { + os.Unsetenv("BKTEC_ENV_TEST_VALUE") // ensure pre-condition + defer os.Unsetenv("BKTEC_ENV_TEST_VALUE") // ensure post-condition (cleanup) + + env := env.OS{} + + err := env.Set("BKTEC_ENV_TEST_VALUE", "Set()") + if err != nil { + t.Error(err) + } + + got, want := os.Getenv("BKTEC_ENV_TEST_VALUE"), "Set()" + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("os.Getenv() diff (-got +want):\n%s", diff) + } +} + +// intended to be called like: `defer setenvWithUnset(...)()` +func setenvWithUnset(key string, value string) func() { + os.Setenv(key, value) + return func() { os.Unsetenv(key) } +} diff --git a/internal/upload/upload.go b/internal/upload/upload.go new file mode 100644 index 00000000..21f0b2bf --- /dev/null +++ b/internal/upload/upload.go @@ -0,0 +1,245 @@ +package upload + +import ( + "bytes" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "log/slog" + "maps" + "mime/multipart" + "net/http" + "os" + "path/filepath" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/version" + "github.com/google/uuid" + "golang.org/x/net/context" +) + +type RunEnvMap map[string]string + +// Config is upload-specific configuration, but may also contain configuration +// that is redundant with config.Config, since package upload isn't really +// unified/integrated with the rest of bktec yet. +type Config struct { + // UploadUrl is the Test Engine upload API endpoint e.g. https://analytics-api.buildkite.com/v1/uploads + UploadUrl string + + // SuiteToken is the Test Engine upload API suite authentication token + SuiteToken string +} + +func ConfigFromEnv(env env.Env) (Config, error) { + url := env.Get("BUILDKITE_TEST_ENGINE_UPLOAD_URL") + if url == "" { + url = "https://analytics-api.buildkite.com/v1/uploads" + } + + token := env.Get("BUILDKITE_ANALYTICS_TOKEN") + if token == "" { + return Config{}, fmt.Errorf("BUILDKITE_ANALYTICS_TOKEN missing") + } + + return Config{ + UploadUrl: url, + SuiteToken: token, + }, nil +} + +// UploadCLI is a CLI entrypoint for uploading results to Test Engine. +func UploadCLI(flag *flag.FlagSet, env env.Env) error { + cfg, err := ConfigFromEnv(env) + if err != nil { + return fmt.Errorf("configuration error: %w", err) + } + + filename := flag.Arg(1) + if filename == "" { + return fmt.Errorf("expected path to JUnit XML or JSON file") + } + + info, err := os.Stat(filename) + if err != nil { + return fmt.Errorf("file does not exist: %s", filename) + } else if !info.Mode().IsRegular() { + return fmt.Errorf("not a regular file: %s", filename) + } + + var format string + switch filepath.Ext(filename) { + case ".xml": + format = "junit" + case ".json": + format = "json" + default: + return fmt.Errorf("could not infer format (JUnit / JSON) from filename") + } + + runEnv, err := RunEnvFromEnv(env) + if err != nil { + return fmt.Errorf("unable to derive runEnv: %w", err) + } + + slog.Info("Uploading", "key", runEnv["key"], "format", format, "filename", filename) + + ctx := context.Background() + respData, err := Upload(ctx, cfg, runEnv, format, filename) + if err != nil { + return err + } + + slog.Info("Upload successful", "url", respData["upload_url"]) + + return nil +} + +// Upload sends test result data to Test Engine. +func Upload(ctx context.Context, cfg Config, runEnv RunEnvMap, format string, filename string) (map[string]string, error) { + body, err := buildUploadData(runEnv, format, filename) + if err != nil { + return nil, fmt.Errorf("preparing upload data: %w", err) + } + + req, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + cfg.UploadUrl, + body.buf, + ) + if err != nil { + return nil, fmt.Errorf("creating HTTP request: %w", err) + } + + req.Header.Set("Content-Type", body.writer.FormDataContentType()) + req.Header.Set("Authorization", fmt.Sprintf(`Token token="%s"`, cfg.SuiteToken)) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP error: %w", err) + } + defer resp.Body.Close() + + status := resp.Status + + // Currently this should get HTTP 202 Accepted, but let's be a bit permissive to future changes. + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf( + "expected HTTP %d or %d from Upload API, got %s", + http.StatusCreated, + http.StatusAccepted, + status, + ) + } + + // try to parse the response, but just warn if that fails + respData := make(map[string]string) + err = json.NewDecoder(resp.Body).Decode(&respData) + if err != nil && !errors.Is(err, io.EOF) { + slog.Warn("failed to parse response", "status", status, "error", err) + } + + return respData, nil +} + +func RunEnvFromEnv(env env.Env) (RunEnvMap, error) { + runEnv := RunEnvMap{ + "collector": "bktec", + "version": version.Version, + } + + if _, ok := env.Lookup("BUILDKITE_BUILD_ID"); ok { + maps.Copy(runEnv, RunEnvMap{ + "CI": "buildkite", + "branch": env.Get("BUILDKITE_BRANCH"), + "commit_sha": env.Get("BUILDKITE_COMMIT"), + "job_id": env.Get("BUILDKITE_JOB_ID"), + "key": env.Get("BUILDKITE_BUILD_ID"), + "message": env.Get("BUILDKITE_MESSAGE"), + "number": env.Get("BUILDKITE_BUILD_NUMBER"), + "url": env.Get("BUILDKITE_BUILD_URL"), + }) + } else { + key, err := uuid.NewV7() + if err != nil { + return nil, fmt.Errorf("UUID generation failed; broken PRNG? %w", err) + } + maps.Copy(runEnv, RunEnvMap{ + "CI": "generic", + "key": key.String(), + }) + } + return runEnv, nil +} + +func buildUploadData(runEnv RunEnvMap, format string, filename string) (*MultipartBody, error) { + var err error + + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("opening %s for reading: %w", filename, err) + } + defer file.Close() + + body := NewMultipartBody() + + if err = body.WriteFormat(format); err != nil { + return nil, err + } + + if err = body.WriteRunEnv(runEnv); err != nil { + return nil, err + } + + if err = body.WriteDataFromFile(file); err != nil { + return nil, err + } + + if err = body.Close(); err != nil { + return nil, err + } + + return body, nil +} + +type MultipartBody struct { + writer multipart.Writer + buf *bytes.Buffer +} + +func NewMultipartBody() *MultipartBody { + buf := &bytes.Buffer{} + return &MultipartBody{ + writer: *multipart.NewWriter(buf), + buf: buf, + } +} + +func (b *MultipartBody) WriteFormat(format string) error { + return b.writer.WriteField("format", format) +} + +func (b *MultipartBody) WriteRunEnv(runEnv RunEnvMap) error { + for k, v := range runEnv { + if err := b.writer.WriteField("run_env["+k+"]", v); err != nil { + return err + } + } + return nil +} + +func (b *MultipartBody) WriteDataFromFile(file *os.File) error { + part, err := b.writer.CreateFormFile("data", file.Name()) + if err != nil { + return fmt.Errorf("MultipartBody: %w", err) + } + _, err = io.Copy(part, file) + return err +} + +func (b *MultipartBody) Close() error { + return b.writer.Close() +} diff --git a/internal/upload/upload_test.go b/internal/upload/upload_test.go new file mode 100644 index 00000000..c774cf88 --- /dev/null +++ b/internal/upload/upload_test.go @@ -0,0 +1,252 @@ +package upload + +import ( + "context" + "fmt" + "io" + "mime" + "mime/multipart" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/version" + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" +) + +func TestConfigFromEnv(t *testing.T) { + cfg, err := ConfigFromEnv(env.Map{ + "BUILDKITE_ANALYTICS_TOKEN": "hunter2", + }) + if err != nil { + t.Errorf("ConfigFromEnv(): %v", err) + } + + want := Config{ + UploadUrl: "https://analytics-api.buildkite.com/v1/uploads", + SuiteToken: "hunter2", + } + + if diff := cmp.Diff(want, cfg); diff != "" { + t.Errorf("ConfigFromEnv() (-want +got)\n%s", diff) + } +} + +func TestConfigFromEnv_missingToken(t *testing.T) { + _, err := ConfigFromEnv(env.Map{}) + if err == nil { + t.Fatal("expected error from ConfigFromEnv with no token") + } + + want, got := "BUILDKITE_ANALYTICS_TOKEN missing", err.Error() + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("ConfigFromEnv() (-want +got):\n%s", diff) + } +} + +func TestConfigFromEnv_uploadURL(t *testing.T) { + cfg, _ := ConfigFromEnv(env.Map{ + "BUILDKITE_TEST_ENGINE_UPLOAD_URL": "http://localhost:1234/foo", + "BUILDKITE_ANALYTICS_TOKEN": "hello", + }) + + want := Config{ + UploadUrl: "http://localhost:1234/foo", + SuiteToken: "hello", + } + + if diff := cmp.Diff(want, cfg); diff != "" { + t.Errorf("ConfigFromEnv (-want +got)\n%s", diff) + } +} + +func TestBuildRunEnv(t *testing.T) { + runEnv, err := RunEnvFromEnv(env.Map{ + "BUILDKITE_BUILD_ID": "thebuild", + "BUILDKITE_BRANCH": "trunk", + "BUILDKITE_COMMIT": "cafe", + "BUILDKITE_JOB_ID": "thejob", + "BUILDKITE_MESSAGE": "hello world", + "BUILDKITE_BUILD_NUMBER": "42", + "BUILDKITE_BUILD_URL": "http://localhost/builds/42", + }) + if err != nil { + t.Errorf("buildRunEnv(): %v", err) + } + + want := RunEnvMap{ + "collector": "bktec", + "version": version.Version, + "CI": "buildkite", + "branch": "trunk", + "commit_sha": "cafe", + "job_id": "thejob", + "key": "thebuild", + "message": "hello world", + "number": "42", + "url": "http://localhost/builds/42", + } + + if diff := cmp.Diff(want, runEnv); diff != "" { + t.Errorf("buildRunEnv() (-want +got):\n%s", diff) + } +} + +func TestBuildRunEnv_generic(t *testing.T) { + runEnv, err := RunEnvFromEnv(env.Map{}) + if err != nil { + t.Errorf("buildRunEnv(): %v", err) + } + + want := RunEnvMap{ + "collector": "bktec", + "version": version.Version, + "CI": "generic", + "key": "00000000-0000-0000-0000-000000000000", // placeholder + } + + if diff := cmp.Diff(want, runEnv, cmpKeyValidUUID()); diff != "" { + t.Errorf("buildRunEnv() (-want +got):\n%s", diff) + } +} + +func TestUpload(t *testing.T) { + filename, xml := createTestXML(t) + defer os.Remove(filename) + + // receive request details from the HTTP handler + type requestInfo struct { + Method string + Path string + Authorization string + Data map[string]string + } + var gotRequestInfo requestInfo + + // fake API server + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data, err := multipartToMap(r) + if err != nil { + t.Errorf("parsing request: %v", err) + } + + gotRequestInfo = requestInfo{ + Method: r.Method, + Path: r.URL.Path, + Authorization: r.Header.Get("Authorization"), + Data: data, + } + + w.WriteHeader(http.StatusAccepted) + io.WriteString(w, `{"id":"theuuid","url":"http://localhost/path/theuuid"}`) + })) + defer srv.Close() + + // Upload! + cfg := Config{ + UploadUrl: srv.URL + "/path", + SuiteToken: "hunter2", + } + runEnv := RunEnvMap{ + "CI": "buildkite", + "key": "thekey", + } + format := "junit" + ctx := context.Background() + responseData, err := Upload(ctx, cfg, runEnv, format, filename) + if err != nil { + t.Fatalf("upload failed: %v", err) + } + + // verify the HTTP request details + wantRequestInfo := requestInfo{ + Method: "POST", + Path: "/path", + Authorization: `Token token="hunter2"`, + Data: map[string]string{ + "data": xml, + "format": "junit", + "run_env[CI]": "buildkite", + "run_env[key]": "thekey", + }, + } + if diff := cmp.Diff(wantRequestInfo, gotRequestInfo); diff != "" { + t.Errorf("HTTP request (-want +got):\n%s", diff) + } + + wantResponseData := map[string]string{ + "id": "theuuid", + "url": "http://localhost/path/theuuid", + } + if diff := cmp.Diff(wantResponseData, responseData); diff != "" { + t.Errorf("HTTP response data (-want +got):\n%s", diff) + } +} + +// cmpKeyValidUUID is an Option for cmp.Diff that validates the values of `key` +// in two maps being compared are both valid UUIDs. Note that Comparer +// functions must be symmetric; they're run as fn(a,b) and fn(b,a). +func cmpKeyValidUUID() cmp.Option { + return cmp.FilterPath(func(path cmp.Path) bool { + return path.Last().String() == `["key"]` + }, cmp.Comparer(func(a, b string) bool { + return uuid.Validate(a) == nil && uuid.Validate(b) == nil + })) +} + +func createTestXML(t *testing.T) (string, string) { + data := `` + f, err := os.CreateTemp("", "test.xml") + if err != nil { + t.Fatal(err) + } + _, err = f.WriteString(data) + if err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + return f.Name(), data +} + +func getMultipartBoundary(contentType string) (string, error) { + mt, params, err := mime.ParseMediaType(contentType) + if err != nil { + return "", err + } + if want := "multipart/form-data"; mt != want { + return "", fmt.Errorf("Content-Type: wanted %s, got %s", want, mt) + } + boundary := params["boundary"] + if boundary == "" { + return "", fmt.Errorf("missing multipart boundary") + } + return boundary, nil +} + +func multipartToMap(r *http.Request) (map[string]string, error) { + boundary, err := getMultipartBoundary(r.Header.Get("Content-Type")) + if err != nil { + return nil, fmt.Errorf("getMultipartBoundary: %w", err) + } + mr := multipart.NewReader(r.Body, boundary) + parsed := map[string]string{} + for { + p, err := mr.NextPart() + if err == io.EOF { + break + } else if err != nil { + return nil, fmt.Errorf("multipartToMap; NextPart: %w", err) + } + partData, err := io.ReadAll(p) + if err != nil { + return nil, fmt.Errorf("multipartToMap; ReadAll: %w", err) + } + parsed[p.FormName()] = string(partData) + } + return parsed, nil +} diff --git a/main.go b/main.go index 3c3c94a6..668be0c9 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "errors" "flag" "fmt" + "log" "os" "os/exec" "strconv" @@ -14,10 +15,13 @@ import ( "time" "github.com/buildkite/test-engine-client/internal/api" + "github.com/buildkite/test-engine-client/internal/bes" "github.com/buildkite/test-engine-client/internal/config" "github.com/buildkite/test-engine-client/internal/debug" + "github.com/buildkite/test-engine-client/internal/env" "github.com/buildkite/test-engine-client/internal/plan" "github.com/buildkite/test-engine-client/internal/runner" + "github.com/buildkite/test-engine-client/internal/upload" "github.com/buildkite/test-engine-client/internal/version" "github.com/olekukonko/tablewriter" "golang.org/x/sys/unix" @@ -46,7 +50,9 @@ type TestRunner interface { } func main() { - debug.SetDebug(os.Getenv("BUILDKITE_TEST_ENGINE_DEBUG_ENABLED") == "true") + env := env.OS{} + + debug.SetDebug(env.Get("BUILDKITE_TEST_ENGINE_DEBUG_ENABLED") == "true") versionFlag := flag.Bool("version", false, "print version information") @@ -59,8 +65,21 @@ func main() { printStartUpMessage() + // TODO: proper subcommands + if flag.Arg(0) == "upload" { + if err := upload.UploadCLI(flag.CommandLine, env); err != nil { + logErrorAndExit(16, "upload: %v", err) + } + os.Exit(0) + } else if flag.Arg(0) == "bazel" && flag.Arg(1) == "listen" { + if err := bes.ListenCLI(os.Args[3:], env); err != nil { + log.Fatal(err) + } + os.Exit(0) + } + // get config - cfg, err := config.New() + cfg, err := config.New(env) if err != nil { logErrorAndExit(16, "Invalid configuration...\n%v", err) } diff --git a/main_test.go b/main_test.go index c235849d..e5d49fce 100644 --- a/main_test.go +++ b/main_test.go @@ -15,6 +15,7 @@ import ( "github.com/buildkite/test-engine-client/internal/api" "github.com/buildkite/test-engine-client/internal/config" + "github.com/buildkite/test-engine-client/internal/env" "github.com/buildkite/test-engine-client/internal/plan" "github.com/buildkite/test-engine-client/internal/runner" "github.com/buildkite/test-engine-client/internal/version" @@ -346,6 +347,7 @@ func TestFetchOrCreateTestPlan(t *testing.T) { Parallelism: 10, Identifier: "identifier", ServerBaseUrl: svr.URL, + Env: env.Map{}, } apiClient := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -416,6 +418,7 @@ func TestFetchOrCreateTestPlan_CachedPlan(t *testing.T) { OrganizationSlug: "org", SuiteSlug: "suite", Branch: "tat-123/my-cool-feature", + Env: env.Map{}, } apiClient := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -463,6 +466,7 @@ func TestFetchOrCreateTestPlan_PlanError(t *testing.T) { Identifier: "identifier", Branch: "tat-123/my-cool-feature", ServerBaseUrl: svr.URL, + Env: env.Map{}, } apiClient := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -501,6 +505,7 @@ func TestFetchOrCreateTestPlan_InternalServerError(t *testing.T) { Identifier: "identifier", Branch: "tat-123/my-cool-feature", ServerBaseUrl: svr.URL, + Env: env.Map{}, } apiClient := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -536,6 +541,7 @@ func TestFetchOrCreateTestPlan_BadRequest(t *testing.T) { Identifier: "identifier", Branch: "", ServerBaseUrl: svr.URL, + Env: env.Map{}, } apiClient := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -571,6 +577,7 @@ func TestFetchOrCreateTestPlan_BillingError(t *testing.T) { Identifier: "identifier", Branch: "", ServerBaseUrl: svr.URL, + Env: env.Map{}, } apiClient := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -607,6 +614,7 @@ func TestCreateRequestParams(t *testing.T) { Parallelism: 7, Branch: "", TestRunner: "rspec", + Env: env.Map{}, } client := api.NewClient(api.ClientConfig{ @@ -700,6 +708,7 @@ func TestCreateRequestParams_NonRSpec(t *testing.T) { Parallelism: 7, Branch: "", TestRunner: r.Name(), + Env: env.Map{}, } client := api.NewClient(api.ClientConfig{ @@ -752,6 +761,7 @@ func TestCreateRequestParams_FilterTestsError(t *testing.T) { Parallelism: 7, Branch: "", SplitByExample: true, + Env: env.Map{}, } client := api.NewClient(api.ClientConfig{ @@ -790,6 +800,7 @@ func TestCreateRequestParams_NoFilteredFiles(t *testing.T) { Parallelism: 7, Branch: "", SplitByExample: true, + Env: env.Map{}, } client := api.NewClient(api.ClientConfig{ @@ -849,7 +860,7 @@ func TestSendMetadata(t *testing.T) { {Event: "test_end", Timestamp: "2024-06-20T04:49:09.609793Z"}, } - env := map[string]string{ + env := env.Map{ "BUILDKITE_BUILD_ID": "xyz", "BUILDKITE_JOB_ID": "abc", "BUILDKITE_STEP_ID": "pqr", @@ -864,10 +875,6 @@ func TestSendMetadata(t *testing.T) { "BUILDKITE_TEST_ENGINE_TEST_RUNNER": "rspec", "BUILDKITE_RETRY_COUNT": "0", } - for k, v := range env { - _ = os.Setenv(k, v) - } - defer os.Clearenv() svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { b, err := io.ReadAll(r.Body) @@ -927,6 +934,7 @@ func TestSendMetadata(t *testing.T) { SuiteSlug: "rspec", Identifier: "fruitsabc", ServerBaseUrl: svr.URL, + Env: env, } client := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl, @@ -950,6 +958,7 @@ func TestSendMetadata_Unauthorized(t *testing.T) { SuiteSlug: "my-suite", Identifier: "identifier", ServerBaseUrl: svr.URL, + Env: env.Map{}, } client := api.NewClient(api.ClientConfig{ ServerBaseUrl: cfg.ServerBaseUrl,