Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
go.viam.com/api v0.1.555
go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37
go.viam.com/test v1.2.4
go.viam.com/utils v0.6.1
goji.io v2.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,8 @@ go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.viam.com/api v0.1.555 h1:ncBlAwDpEk658aouQLoeq1RLDqdi3spWVFN0y6nsvXM=
go.viam.com/api v0.1.555/go.mod h1:nVe4WXrtc8aupJ8OWXSYx6KhCiOkr3VCbkwxD4D41xQ=
go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37 h1:Pbxumk5u7yRfEjcWG8S9/HljfpQNqDBnnsyiT/lN/QQ=
go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37/go.mod h1:nVe4WXrtc8aupJ8OWXSYx6KhCiOkr3VCbkwxD4D41xQ=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.6.1 h1:xJhq+S2ADMTDj9538blmhI0otZCRAZUonRBkb+zHIHo=
Expand Down
35 changes: 35 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/robot/v1"
"go.viam.com/utils"
Expand Down Expand Up @@ -1352,6 +1353,40 @@ func (rc *RobotClient) SendTraces(ctx context.Context, spans []*otlpv1.ResourceS
return err
}

// UploadFromPathResult is the aggregated result of an UploadDataFromPath call.
type UploadFromPathResult struct {
FilesUploaded uint64
FilesFailed uint64
BytesUploaded uint64
BytesTotal uint64
IDs []string
}

// UploadDataFromPath uploads a file or directory from the robot to the cloud via the data manager.
func (rc *RobotClient) UploadDataFromPath(ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}) (
UploadFromPathResult, error,
) {
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return UploadFromPathResult{}, err
}
resp, err := rc.client.UploadDataFromPath(ctx, &pb.UploadDataFromPathRequest{
Path: path,
UploadMetadata: md,
Extra: ext,
})
if err != nil {
return UploadFromPathResult{}, err
}
return UploadFromPathResult{
FilesUploaded: resp.GetFilesUploaded(),
FilesFailed: resp.GetFilesFailed(),
BytesUploaded: resp.GetBytesUploaded(),
BytesTotal: resp.GetBytesTotal(),
IDs: resp.GetIds(),
}, nil
}

// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This
// function will close the connection passed in as part of cleanup.
func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest int) error {
Expand Down
53 changes: 53 additions & 0 deletions robot/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/google/uuid"
"github.com/jhump/protoreflect/grpcreflect"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
armpb "go.viam.com/api/component/arm/v1"
basepb "go.viam.com/api/component/base/v1"
Expand Down Expand Up @@ -2363,3 +2364,55 @@ func TestListTunnels(t *testing.T) {
test.That(t, err, test.ShouldBeNil)
test.That(t, ttes, test.ShouldResemble, expectedTTEs)
}

func TestUploadDataFromPath(t *testing.T) {
logger := logging.NewTestLogger(t)
listener, err := net.Listen("tcp", "localhost:0")
test.That(t, err, test.ShouldBeNil)
gServer := grpc.NewServer()

var capturedPath string
var capturedMD *datasyncpb.UploadMetadata
var capturedExtra map[string]interface{}
injectRobot := &inject.Robot{
ResourceNamesFunc: func() []resource.Name { return nil },
ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil },
MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) {
return robot.MachineStatus{State: robot.StateRunning}, nil
},
UploadDataFromPathFunc: func(
ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{},
) (uint64, uint64, uint64, uint64, []string, error) {
capturedPath = path
capturedMD = md
capturedExtra = extra
return 2, 0, 512, 512, []string{"a", "b"}, nil
},
}

pb.RegisterRobotServiceServer(gServer, server.New(injectRobot))

go gServer.Serve(listener)
defer gServer.Stop()

client, err := New(context.Background(), listener.Addr().String(), logger)
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
}()

md := &datasyncpb.UploadMetadata{Tags: []string{"tag1"}}
extra := map[string]interface{}{"foo": "bar"}
res, err := client.UploadDataFromPath(context.Background(), "/data/foo", md, extra)
test.That(t, err, test.ShouldBeNil)
test.That(t, res.FilesUploaded, test.ShouldEqual, uint64(2))
test.That(t, res.FilesFailed, test.ShouldEqual, uint64(0))
test.That(t, res.BytesUploaded, test.ShouldEqual, uint64(512))
test.That(t, res.BytesTotal, test.ShouldEqual, uint64(512))
test.That(t, res.IDs, test.ShouldResemble, []string{"a", "b"})

// request fields actually crossed the wire
test.That(t, capturedPath, test.ShouldEqual, "/data/foo")
test.That(t, capturedMD.GetTags(), test.ShouldResemble, []string{"tag1"})
test.That(t, capturedExtra, test.ShouldResemble, map[string]interface{}{"foo": "bar"})
}
28 changes: 28 additions & 0 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/multierr"
datasyncpb "go.viam.com/api/app/datasync/v1"
packagespb "go.viam.com/api/app/packages/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils/perf"
Expand Down Expand Up @@ -58,6 +59,7 @@ import (
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/robot/web"
weboptions "go.viam.com/rdk/robot/web/options"
"go.viam.com/rdk/services/datamanager"
"go.viam.com/rdk/session"
"go.viam.com/rdk/utils"
)
Expand Down Expand Up @@ -171,6 +173,32 @@ func (r *localRobot) WriteTraceMessages(ctx context.Context, spans []*otlpv1.Res
return err
}

// dataFromPathUploader is the capability interface localRobot type-asserts the configured
// data manager service for when UploadDataFromPath is called.
type dataFromPathUploader interface {
UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, extra map[string]interface{}) (
uint64, uint64, uint64, uint64, []string, error)
}

// UploadDataFromPath uploads a file or directory to the cloud via the configured data manager service.
func (r *localRobot) UploadDataFromPath(ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{}) (
uint64, uint64, uint64, uint64, []string, error,
) {
names := datamanager.NamesFromRobot(r)
if len(names) == 0 {
return 0, 0, 0, 0, nil, errors.New("no data manager service configured")
}
svc, err := datamanager.FromProvider(r, names[0])
if err != nil {
return 0, 0, 0, 0, nil, err
}
uploader, ok := svc.(dataFromPathUploader)
if !ok {
return 0, 0, 0, 0, nil, errors.New("data manager does not support UploadDataFromPath")
}
return uploader.UploadDataFromPath(ctx, path, md, extra)
}

// FindBySimpleNameAndAPI finds a resource by its simple name and API. This is queried
// through the resourceGetterForAPI for _all_ incoming gRPC requests related to a
// resource. A nil resource and an error is returned in the case of no resource found, or
Expand Down
30 changes: 30 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3251,6 +3251,36 @@ func TestCloudMetadata(t *testing.T) {
})
}

func TestUploadDataFromPath(t *testing.T) {
logger := logging.NewTestLogger(t)
ctx := context.Background()

t.Run("no data manager configured", func(t *testing.T) {
r := setupLocalRobot(t, ctx, &config.Config{}, logger)
_, _, _, _, _, err := r.UploadDataFromPath(ctx, "/tmp/whatever", nil, nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "no data manager")
})

t.Run("data manager does not support upload", func(t *testing.T) {
cfg := &config.Config{
Services: []resource.Config{
{
Name: "dm",
API: datamanager.API,
Model: resource.DefaultServiceModel,
ConvertedAttributes: &builtin.Config{},
DependsOn: []string{internalcloud.InternalServiceName.String()},
},
},
}
r := setupLocalRobot(t, ctx, cfg, logger)
_, _, _, _, _, err := r.UploadDataFromPath(ctx, "/tmp/whatever", nil, nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "does not support")
})
}

func TestReconfigureOnModuleRename(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)
Expand Down
5 changes: 5 additions & 0 deletions robot/robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"

"go.viam.com/rdk/cloud"
"go.viam.com/rdk/config"
Expand Down Expand Up @@ -196,6 +197,10 @@ type LocalRobot interface {

// WriteTraceMessages writes trace spans to any configured exporters.
WriteTraceMessages(context.Context, []*otlpv1.ResourceSpans) error

// UploadDataFromPath uploads a file or directory at path to the cloud via the data manager.
UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, extra map[string]interface{}) (
filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, ids []string, err error)
}

// A RemoteRobot is a Robot that was created through a connection.
Expand Down
17 changes: 17 additions & 0 deletions robot/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ func (s *Server) SendTraces(ctx context.Context, req *pb.SendTracesRequest) (*pb
return nil, s.robot.WriteTraceMessages(ctx, req.ResourceSpans)
}

// UploadDataFromPath uploads a file or directory from the robot to the cloud via the data manager.
func (s *Server) UploadDataFromPath(ctx context.Context, req *pb.UploadDataFromPathRequest) (
*pb.UploadDataFromPathResponse, error,
) {
fu, ff, bu, bt, ids, err := s.robot.UploadDataFromPath(ctx, req.GetPath(), req.GetUploadMetadata(), req.Extra.AsMap())
if err != nil {
return nil, err
}
return &pb.UploadDataFromPathResponse{
FilesUploaded: fu,
FilesFailed: ff,
BytesUploaded: bu,
BytesTotal: bt,
Ids: ids,
}, nil
}

// Tunnel tunnels traffic to/from the client from/to a specified port on the server.
func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
req, err := srv.Recv()
Expand Down
41 changes: 41 additions & 0 deletions robot/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/google/uuid"
"github.com/jhump/protoreflect/grpcreflect"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
armpb "go.viam.com/api/component/arm/v1"
pb "go.viam.com/api/robot/v1"
Expand Down Expand Up @@ -640,6 +641,46 @@ func TestServer(t *testing.T) {
test.That(t, resp.GetVersion(), test.ShouldEqual, "dev-unknown")
test.That(t, resp.GetApiVersion(), test.ShouldEqual, "?")
})

t.Run("UploadDataFromPath", func(t *testing.T) {
injectRobot := &inject.Robot{}
server := server.New(injectRobot)

ext, err := utilsproto.StructToStructPb(map[string]interface{}{"foo": "bar"})
test.That(t, err, test.ShouldBeNil)

// success: request fields (incl. extra) reach the robot, and counts/ids map back through.
injectRobot.UploadDataFromPathFunc = func(
ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{},
) (uint64, uint64, uint64, uint64, []string, error) {
test.That(t, path, test.ShouldEqual, "/data/foo")
test.That(t, md.GetTags(), test.ShouldResemble, []string{"tag1"})
test.That(t, extra, test.ShouldResemble, map[string]interface{}{"foo": "bar"})
return 3, 1, 1024, 2048, []string{"id1", "id2", "id3"}, nil
}

resp, err := server.UploadDataFromPath(context.Background(), &pb.UploadDataFromPathRequest{
Path: "/data/foo",
UploadMetadata: &datasyncpb.UploadMetadata{Tags: []string{"tag1"}},
Extra: ext,
})
test.That(t, err, test.ShouldBeNil)
test.That(t, resp.GetFilesUploaded(), test.ShouldEqual, uint64(3))
test.That(t, resp.GetFilesFailed(), test.ShouldEqual, uint64(1))
test.That(t, resp.GetBytesUploaded(), test.ShouldEqual, uint64(1024))
test.That(t, resp.GetBytesTotal(), test.ShouldEqual, uint64(2048))
test.That(t, resp.GetIds(), test.ShouldResemble, []string{"id1", "id2", "id3"})

// error is surfaced to the caller.
injectRobot.UploadDataFromPathFunc = func(
ctx context.Context, path string, md *datasyncpb.UploadMetadata, extra map[string]interface{},
) (uint64, uint64, uint64, uint64, []string, error) {
return 0, 0, 0, 0, nil, errors.New("no data manager service configured")
}
_, err = server.UploadDataFromPath(context.Background(), &pb.UploadDataFromPathRequest{Path: "/data/foo"})
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "no data manager")
})
}

func TestModuleLogTimestamp(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions testutils/inject/robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/google/uuid"
datasyncpb "go.viam.com/api/app/datasync/v1"
"go.viam.com/utils/pexec"

"go.viam.com/rdk/cloud"
Expand Down Expand Up @@ -59,6 +60,11 @@ type Robot struct {
MachineStatusFunc func(ctx context.Context) (robot.MachineStatus, error)
ShutdownFunc func(ctx context.Context) error
ListTunnelsFunc func(ctx context.Context) ([]config.TrafficTunnelEndpoint, error)
UploadDataFromPathFunc func(
ctx context.Context,
path string,
uploadMetadata *datasyncpb.UploadMetadata, extra map[string]interface{},
) (uint64, uint64, uint64, uint64, []string, error)

ops *operation.Manager
SessMgr session.Manager
Expand Down Expand Up @@ -373,6 +379,19 @@ func (r *Robot) ListTunnels(ctx context.Context) ([]config.TrafficTunnelEndpoint
return r.ListTunnelsFunc(ctx)
}

// UploadDataFromPath calls the injected UploadDataFromPath or the real one.
func (r *Robot) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata,
extra map[string]interface{}) (
uint64, uint64, uint64, uint64, []string, error,
) {
r.Mu.RLock()
defer r.Mu.RUnlock()
if r.UploadDataFromPathFunc == nil {
return r.LocalRobot.UploadDataFromPath(ctx, path, uploadMetadata, extra)
}
return r.UploadDataFromPathFunc(ctx, path, uploadMetadata, extra)
}

type noopSessionManager struct{}

func (m noopSessionManager) Start(ctx context.Context, ownerID string) (*session.Session, error) {
Expand Down
Loading