diff --git a/go.mod b/go.mod index 45b9c5d02f9..24c3d74c00b 100644 --- a/go.mod +++ b/go.mod @@ -101,7 +101,7 @@ require ( go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.555 + go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37 go.viam.com/test v1.2.4 go.viam.com/utils v0.6.1 goji.io v2.0.2+incompatible diff --git a/go.sum b/go.sum index 4a50ad7f762..25243919249 100644 --- a/go.sum +++ b/go.sum @@ -1158,8 +1158,8 @@ go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.555 h1:ncBlAwDpEk658aouQLoeq1RLDqdi3spWVFN0y6nsvXM= -go.viam.com/api v0.1.555/go.mod h1:nVe4WXrtc8aupJ8OWXSYx6KhCiOkr3VCbkwxD4D41xQ= +go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37 h1:Pbxumk5u7yRfEjcWG8S9/HljfpQNqDBnnsyiT/lN/QQ= +go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37/go.mod h1:nVe4WXrtc8aupJ8OWXSYx6KhCiOkr3VCbkwxD4D41xQ= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.6.1 h1:xJhq+S2ADMTDj9538blmhI0otZCRAZUonRBkb+zHIHo= diff --git a/robot/client/client.go b/robot/client/client.go index b87837975da..8b81cf5b031 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -24,6 +24,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" + datasyncpb "go.viam.com/api/app/datasync/v1" commonpb "go.viam.com/api/common/v1" pb "go.viam.com/api/robot/v1" "go.viam.com/utils" @@ -1352,6 +1353,31 @@ func (rc *RobotClient) SendTraces(ctx context.Context, spans []*otlpv1.ResourceS return err } +// UploadDataFromPath uploads a file or directory from the robot to the cloud via the data manager. +func (rc *RobotClient) UploadDataFromPath(ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}) ( + robot.UploadDataFromPathResult, error, +) { + ext, err := protoutils.StructToStructPb(extra) + if err != nil { + return robot.UploadDataFromPathResult{}, err + } + resp, err := rc.client.UploadDataFromPath(ctx, &pb.UploadDataFromPathRequest{ + Path: path, + UploadMetadata: md, + Extra: ext, + }) + if err != nil { + return robot.UploadDataFromPathResult{}, err + } + return robot.UploadDataFromPathResult{ + FilesUploaded: resp.GetFilesUploaded(), + FilesFailed: resp.GetFilesFailed(), + BytesUploaded: resp.GetBytesUploaded(), + BytesTotal: resp.GetBytesTotal(), + IDs: resp.GetIds(), + }, nil +} + // Tunnel tunnels data to/from the read writer from/to the destination port on the server. This // function will close the connection passed in as part of cleanup. func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest int) error { diff --git a/robot/client/client_test.go b/robot/client/client_test.go index 46dff348647..e0f88ff4d9e 100644 --- a/robot/client/client_test.go +++ b/robot/client/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/google/uuid" "github.com/jhump/protoreflect/grpcreflect" "go.uber.org/zap/zapcore" + datasyncpb "go.viam.com/api/app/datasync/v1" commonpb "go.viam.com/api/common/v1" armpb "go.viam.com/api/component/arm/v1" basepb "go.viam.com/api/component/base/v1" @@ -2363,3 +2364,61 @@ func TestListTunnels(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, ttes, test.ShouldResemble, expectedTTEs) } + +func TestUploadDataFromPath(t *testing.T) { + logger := logging.NewTestLogger(t) + listener, err := net.Listen("tcp", "localhost:0") + test.That(t, err, test.ShouldBeNil) + gServer := grpc.NewServer() + + var capturedPath string + var capturedMD *datasyncpb.UploadMetadata + var capturedExtra map[string]interface{} + injectRobot := &inject.Robot{ + ResourceNamesFunc: func() []resource.Name { return nil }, + ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil }, + MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) { + return robot.MachineStatus{State: robot.StateRunning}, nil + }, + UploadDataFromPathFunc: func( + ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}, + ) (robot.UploadDataFromPathResult, error) { + capturedPath = path + capturedMD = md + capturedExtra = extra + return robot.UploadDataFromPathResult{ + FilesUploaded: 2, + FilesFailed: 0, + BytesUploaded: 512, + BytesTotal: 512, + IDs: []string{"a", "b"}, + }, nil + }, + } + + pb.RegisterRobotServiceServer(gServer, server.New(injectRobot)) + + go gServer.Serve(listener) + defer gServer.Stop() + + client, err := New(context.Background(), listener.Addr().String(), logger) + test.That(t, err, test.ShouldBeNil) + defer func() { + test.That(t, client.Close(context.Background()), test.ShouldBeNil) + }() + + md := &datasyncpb.UploadMetadata{Tags: []string{"tag1"}} + extra := map[string]interface{}{"foo": "bar"} + res, err := client.UploadDataFromPath(context.Background(), "/data/foo", md, extra) + test.That(t, err, test.ShouldBeNil) + test.That(t, res.FilesUploaded, test.ShouldEqual, uint64(2)) + test.That(t, res.FilesFailed, test.ShouldEqual, uint64(0)) + test.That(t, res.BytesUploaded, test.ShouldEqual, uint64(512)) + test.That(t, res.BytesTotal, test.ShouldEqual, uint64(512)) + test.That(t, res.IDs, test.ShouldResemble, []string{"a", "b"}) + + // request fields actually crossed the wire + test.That(t, capturedPath, test.ShouldEqual, "/data/foo") + test.That(t, capturedMD.GetTags(), test.ShouldResemble, []string{"tag1"}) + test.That(t, capturedExtra, test.ShouldResemble, map[string]interface{}{"foo": "bar"}) +} diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index 131dbdb5162..08158a74e35 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -27,6 +27,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.37.0" otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1" "go.uber.org/multierr" + datasyncpb "go.viam.com/api/app/datasync/v1" packagespb "go.viam.com/api/app/packages/v1" goutils "go.viam.com/utils" "go.viam.com/utils/perf" @@ -58,6 +59,7 @@ import ( "go.viam.com/rdk/robot/packages" "go.viam.com/rdk/robot/web" weboptions "go.viam.com/rdk/robot/web/options" + "go.viam.com/rdk/services/datamanager" "go.viam.com/rdk/session" "go.viam.com/rdk/utils" ) @@ -171,6 +173,32 @@ func (r *localRobot) WriteTraceMessages(ctx context.Context, spans []*otlpv1.Res return err } +// dataFromPathUploader is the capability interface localRobot type-asserts the configured +// data manager service for when UploadDataFromPath is called. +type dataFromPathUploader interface { + UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, extra map[string]interface{}) ( + robot.UploadDataFromPathResult, error) +} + +// UploadDataFromPath uploads a file or directory to the cloud via the configured data manager service. +func (r *localRobot) UploadDataFromPath(ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}) ( + robot.UploadDataFromPathResult, error, +) { + names := datamanager.NamesFromRobot(r) + if len(names) == 0 { + return robot.UploadDataFromPathResult{}, errors.New("no data manager service configured") + } + svc, err := datamanager.FromProvider(r, names[0]) + if err != nil { + return robot.UploadDataFromPathResult{}, err + } + uploader, ok := svc.(dataFromPathUploader) + if !ok { + return robot.UploadDataFromPathResult{}, errors.New("data manager does not support UploadDataFromPath") + } + return uploader.UploadDataFromPath(ctx, path, md, extra) +} + // FindBySimpleNameAndAPI finds a resource by its simple name and API. This is queried // through the resourceGetterForAPI for _all_ incoming gRPC requests related to a // resource. A nil resource and an error is returned in the case of no resource found, or diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index d21ff01fc7f..9777aafdbb4 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -3251,6 +3251,36 @@ func TestCloudMetadata(t *testing.T) { }) } +func TestUploadDataFromPath(t *testing.T) { + logger := logging.NewTestLogger(t) + ctx := context.Background() + + t.Run("no data manager configured", func(t *testing.T) { + r := setupLocalRobot(t, ctx, &config.Config{}, logger) + _, err := r.UploadDataFromPath(ctx, "/tmp/whatever", nil, nil) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "no data manager") + }) + + t.Run("data manager does not support upload", func(t *testing.T) { + cfg := &config.Config{ + Services: []resource.Config{ + { + Name: "dm", + API: datamanager.API, + Model: resource.DefaultServiceModel, + ConvertedAttributes: &builtin.Config{}, + DependsOn: []string{internalcloud.InternalServiceName.String()}, + }, + }, + } + r := setupLocalRobot(t, ctx, cfg, logger) + _, err := r.UploadDataFromPath(ctx, "/tmp/whatever", nil, nil) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "does not support") + }) +} + func TestReconfigureOnModuleRename(t *testing.T) { ctx := context.Background() logger := logging.NewTestLogger(t) diff --git a/robot/robot.go b/robot/robot.go index ff26a5532d7..fac37e1b046 100644 --- a/robot/robot.go +++ b/robot/robot.go @@ -12,6 +12,7 @@ import ( "github.com/jhump/protoreflect/dynamic" "github.com/pkg/errors" otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1" + datasyncpb "go.viam.com/api/app/datasync/v1" "go.viam.com/rdk/cloud" "go.viam.com/rdk/config" @@ -43,6 +44,15 @@ func init() { } } +// UploadDataFromPathResult is the aggregated result of an UploadDataFromPath call. +type UploadDataFromPathResult struct { + FilesUploaded uint64 + FilesFailed uint64 + BytesUploaded uint64 + BytesTotal uint64 + IDs []string +} + // A Robot encompasses all functionality of some robot comprised // of parts, local and remote. // @@ -196,6 +206,10 @@ type LocalRobot interface { // WriteTraceMessages writes trace spans to any configured exporters. WriteTraceMessages(context.Context, []*otlpv1.ResourceSpans) error + + // UploadDataFromPath uploads a file or directory at path to the cloud via the data manager. + UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, extra map[string]interface{}) ( + UploadDataFromPathResult, error) } // A RemoteRobot is a Robot that was created through a connection. diff --git a/robot/server/server.go b/robot/server/server.go index 3a16a16087c..001676be4d1 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -70,6 +70,23 @@ func (s *Server) SendTraces(ctx context.Context, req *pb.SendTracesRequest) (*pb return nil, s.robot.WriteTraceMessages(ctx, req.ResourceSpans) } +// UploadDataFromPath uploads a file or directory from the robot to the cloud via the data manager. +func (s *Server) UploadDataFromPath(ctx context.Context, req *pb.UploadDataFromPathRequest) ( + *pb.UploadDataFromPathResponse, error, +) { + res, err := s.robot.UploadDataFromPath(ctx, req.GetPath(), req.GetUploadMetadata(), req.Extra.AsMap()) + if err != nil { + return nil, err + } + return &pb.UploadDataFromPathResponse{ + FilesUploaded: res.FilesUploaded, + FilesFailed: res.FilesFailed, + BytesUploaded: res.BytesUploaded, + BytesTotal: res.BytesTotal, + Ids: res.IDs, + }, nil +} + // Tunnel tunnels traffic to/from the client from/to a specified port on the server. func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { req, err := srv.Recv() diff --git a/robot/server/server_test.go b/robot/server/server_test.go index 74a2aa4de67..b7f5f5ffc7e 100644 --- a/robot/server/server_test.go +++ b/robot/server/server_test.go @@ -14,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/jhump/protoreflect/grpcreflect" "go.uber.org/zap/zapcore" + datasyncpb "go.viam.com/api/app/datasync/v1" commonpb "go.viam.com/api/common/v1" armpb "go.viam.com/api/component/arm/v1" pb "go.viam.com/api/robot/v1" @@ -640,6 +641,52 @@ func TestServer(t *testing.T) { test.That(t, resp.GetVersion(), test.ShouldEqual, "dev-unknown") test.That(t, resp.GetApiVersion(), test.ShouldEqual, "?") }) + + t.Run("UploadDataFromPath", func(t *testing.T) { + injectRobot := &inject.Robot{} + server := server.New(injectRobot) + + ext, err := utilsproto.StructToStructPb(map[string]interface{}{"foo": "bar"}) + test.That(t, err, test.ShouldBeNil) + + // success: request fields (incl. extra) reach the robot, and counts/ids map back through. + injectRobot.UploadDataFromPathFunc = func( + ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}, + ) (robot.UploadDataFromPathResult, error) { + test.That(t, path, test.ShouldEqual, "/data/foo") + test.That(t, md.GetTags(), test.ShouldResemble, []string{"tag1"}) + test.That(t, extra, test.ShouldResemble, map[string]interface{}{"foo": "bar"}) + return robot.UploadDataFromPathResult{ + FilesUploaded: 2, + FilesFailed: 0, + BytesUploaded: 512, + BytesTotal: 512, + IDs: []string{"a", "b"}, + }, nil + } + + resp, err := server.UploadDataFromPath(context.Background(), &pb.UploadDataFromPathRequest{ + Path: "/data/foo", + UploadMetadata: &datasyncpb.UploadMetadata{Tags: []string{"tag1"}}, + Extra: ext, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp.GetFilesUploaded(), test.ShouldEqual, uint64(2)) + test.That(t, resp.GetFilesFailed(), test.ShouldEqual, uint64(0)) + test.That(t, resp.GetBytesUploaded(), test.ShouldEqual, uint64(512)) + test.That(t, resp.GetBytesTotal(), test.ShouldEqual, uint64(512)) + test.That(t, resp.GetIds(), test.ShouldResemble, []string{"a", "b"}) + + // error is surfaced to the caller. + injectRobot.UploadDataFromPathFunc = func( + ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}, + ) (robot.UploadDataFromPathResult, error) { + return robot.UploadDataFromPathResult{}, errors.New("no data manager service configured") + } + _, err = server.UploadDataFromPath(context.Background(), &pb.UploadDataFromPathRequest{Path: "/data/foo"}) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "no data manager") + }) } func TestModuleLogTimestamp(t *testing.T) { diff --git a/testutils/inject/robot.go b/testutils/inject/robot.go index 01548bea4b9..ca929b6155f 100644 --- a/testutils/inject/robot.go +++ b/testutils/inject/robot.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + datasyncpb "go.viam.com/api/app/datasync/v1" "go.viam.com/utils/pexec" "go.viam.com/rdk/cloud" @@ -59,6 +60,11 @@ type Robot struct { MachineStatusFunc func(ctx context.Context) (robot.MachineStatus, error) ShutdownFunc func(ctx context.Context) error ListTunnelsFunc func(ctx context.Context) ([]config.TrafficTunnelEndpoint, error) + UploadDataFromPathFunc func( + ctx context.Context, + path string, + uploadMetadata *datasyncpb.UploadMetadata, extra map[string]interface{}, + ) (robot.UploadDataFromPathResult, error) ops *operation.Manager SessMgr session.Manager @@ -373,6 +379,19 @@ func (r *Robot) ListTunnels(ctx context.Context) ([]config.TrafficTunnelEndpoint return r.ListTunnelsFunc(ctx) } +// UploadDataFromPath calls the injected UploadDataFromPath or the real one. +func (r *Robot) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, + extra map[string]interface{}) ( + robot.UploadDataFromPathResult, error, +) { + r.Mu.RLock() + defer r.Mu.RUnlock() + if r.UploadDataFromPathFunc == nil { + return r.LocalRobot.UploadDataFromPath(ctx, path, uploadMetadata, extra) + } + return r.UploadDataFromPathFunc(ctx, path, uploadMetadata, extra) +} + type noopSessionManager struct{} func (m noopSessionManager) Start(ctx context.Context, ownerID string) (*session.Session, error) {