Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
go.viam.com/api v0.1.555
go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37
go.viam.com/test v1.2.4
go.viam.com/utils v0.6.1
goji.io v2.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,8 @@ go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.viam.com/api v0.1.555 h1:ncBlAwDpEk658aouQLoeq1RLDqdi3spWVFN0y6nsvXM=
go.viam.com/api v0.1.555/go.mod h1:nVe4WXrtc8aupJ8OWXSYx6KhCiOkr3VCbkwxD4D41xQ=
go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37 h1:Pbxumk5u7yRfEjcWG8S9/HljfpQNqDBnnsyiT/lN/QQ=
go.viam.com/api v0.1.559-0.20260609195308-d8b5e1f67e37/go.mod h1:nVe4WXrtc8aupJ8OWXSYx6KhCiOkr3VCbkwxD4D41xQ=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.6.1 h1:xJhq+S2ADMTDj9538blmhI0otZCRAZUonRBkb+zHIHo=
Expand Down
20 changes: 20 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/robot/v1"
"go.viam.com/utils"
Expand Down Expand Up @@ -1352,6 +1353,25 @@ func (rc *RobotClient) SendTraces(ctx context.Context, spans []*otlpv1.ResourceS
return err
}

// UploadDataFromPath uploads a file or directory from the robot to the cloud via the data manager.
func (rc *RobotClient) UploadDataFromPath(ctx context.Context, path string, md *datasyncpb.UploadMetadata) (
Comment thread
angelapredolac marked this conversation as resolved.
Outdated
uint64, uint64, uint64, uint64, []string, error,
Comment thread
angelapredolac marked this conversation as resolved.
Outdated
) {
resp, err := rc.client.UploadDataFromPath(ctx, &pb.UploadDataFromPathRequest{
Path: path,
UploadMetadata: md,
})
if err != nil {
return 0, 0, 0, 0, nil, err
}
return resp.GetFilesUploaded(),
resp.GetFilesFailed(),
resp.GetBytesUploaded(),
resp.GetBytesTotal(),
resp.GetIds(),
nil
}

// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This
// function will close the connection passed in as part of cleanup.
func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest int) error {
Expand Down
49 changes: 49 additions & 0 deletions robot/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/google/uuid"
"github.com/jhump/protoreflect/grpcreflect"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
armpb "go.viam.com/api/component/arm/v1"
basepb "go.viam.com/api/component/base/v1"
Expand Down Expand Up @@ -2363,3 +2364,51 @@ func TestListTunnels(t *testing.T) {
test.That(t, err, test.ShouldBeNil)
test.That(t, ttes, test.ShouldResemble, expectedTTEs)
}

func TestUploadDataFromPath(t *testing.T) {
logger := logging.NewTestLogger(t)
listener, err := net.Listen("tcp", "localhost:0")
test.That(t, err, test.ShouldBeNil)
gServer := grpc.NewServer()

var capturedPath string
var capturedMD *datasyncpb.UploadMetadata
injectRobot := &inject.Robot{
ResourceNamesFunc: func() []resource.Name { return nil },
ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil },
MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) {
return robot.MachineStatus{State: robot.StateRunning}, nil
},
UploadDataFromPathFunc: func(
ctx context.Context, path string, md *datasyncpb.UploadMetadata,
) (uint64, uint64, uint64, uint64, []string, error) {
capturedPath = path
capturedMD = md
return 2, 0, 512, 512, []string{"a", "b"}, nil
},
}

pb.RegisterRobotServiceServer(gServer, server.New(injectRobot))

go gServer.Serve(listener)
defer gServer.Stop()

client, err := New(context.Background(), listener.Addr().String(), logger)
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
}()

md := &datasyncpb.UploadMetadata{Tags: []string{"tag1"}}
fu, ff, bu, bt, ids, err := client.UploadDataFromPath(context.Background(), "/data/foo", md)
test.That(t, err, test.ShouldBeNil)
test.That(t, fu, test.ShouldEqual, uint64(2))
test.That(t, ff, test.ShouldEqual, uint64(0))
test.That(t, bu, test.ShouldEqual, uint64(512))
test.That(t, bt, test.ShouldEqual, uint64(512))
test.That(t, ids, test.ShouldResemble, []string{"a", "b"})

// request fields actually crossed the wire
test.That(t, capturedPath, test.ShouldEqual, "/data/foo")
test.That(t, capturedMD.GetTags(), test.ShouldResemble, []string{"tag1"})
}
27 changes: 27 additions & 0 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/multierr"
datasyncpb "go.viam.com/api/app/datasync/v1"
packagespb "go.viam.com/api/app/packages/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils/perf"
Expand Down Expand Up @@ -58,6 +59,7 @@ import (
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/robot/web"
weboptions "go.viam.com/rdk/robot/web/options"
"go.viam.com/rdk/services/datamanager"
"go.viam.com/rdk/session"
"go.viam.com/rdk/utils"
)
Expand Down Expand Up @@ -171,6 +173,31 @@ func (r *localRobot) WriteTraceMessages(ctx context.Context, spans []*otlpv1.Res
return err
}

// dataFromPathUploader is the subset of the data manager service used by UploadDataFromPath.
Comment thread
angelapredolac marked this conversation as resolved.
Outdated
type dataFromPathUploader interface {
UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata) (
uint64, uint64, uint64, uint64, []string, error)
}

// UploadDataFromPath uploads a file or directory to the cloud via the configured data manager service.
func (r *localRobot) UploadDataFromPath(ctx context.Context, path string, md *datasyncpb.UploadMetadata) (
uint64, uint64, uint64, uint64, []string, error,
) {
names := datamanager.NamesFromRobot(r)
if len(names) == 0 {
return 0, 0, 0, 0, nil, errors.New("no data manager service configured")
}
svc, err := datamanager.FromProvider(r, names[0])
if err != nil {
return 0, 0, 0, 0, nil, err
}
uploader, ok := svc.(dataFromPathUploader)
if !ok {
return 0, 0, 0, 0, nil, errors.New("data manager does not support UploadDataFromPath")
}
return uploader.UploadDataFromPath(ctx, path, md)
}

// FindBySimpleNameAndAPI finds a resource by its simple name and API. This is queried
// through the resourceGetterForAPI for _all_ incoming gRPC requests related to a
// resource. A nil resource and an error is returned in the case of no resource found, or
Expand Down
30 changes: 30 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3251,6 +3251,36 @@ func TestCloudMetadata(t *testing.T) {
})
}

func TestUploadDataFromPath(t *testing.T) {
logger := logging.NewTestLogger(t)
ctx := context.Background()

t.Run("no data manager configured", func(t *testing.T) {
r := setupLocalRobot(t, ctx, &config.Config{}, logger)
_, _, _, _, _, err := r.UploadDataFromPath(ctx, "/tmp/whatever", nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "no data manager")
})

t.Run("data manager does not support upload", func(t *testing.T) {
cfg := &config.Config{
Services: []resource.Config{
{
Name: "dm",
API: datamanager.API,
Model: resource.DefaultServiceModel,
ConvertedAttributes: &builtin.Config{},
DependsOn: []string{internalcloud.InternalServiceName.String()},
},
},
}
r := setupLocalRobot(t, ctx, cfg, logger)
_, _, _, _, _, err := r.UploadDataFromPath(ctx, "/tmp/whatever", nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "does not support")
})
}

func TestReconfigureOnModuleRename(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)
Expand Down
5 changes: 5 additions & 0 deletions robot/robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"

"go.viam.com/rdk/cloud"
"go.viam.com/rdk/config"
Expand Down Expand Up @@ -196,6 +197,10 @@ type LocalRobot interface {

// WriteTraceMessages writes trace spans to any configured exporters.
WriteTraceMessages(context.Context, []*otlpv1.ResourceSpans) error

// UploadDataFromPath uploads a file or directory at path to the cloud via the data manager.
UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata) (
filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, ids []string, err error)
}

// A RemoteRobot is a Robot that was created through a connection.
Expand Down
17 changes: 17 additions & 0 deletions robot/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ func (s *Server) SendTraces(ctx context.Context, req *pb.SendTracesRequest) (*pb
return nil, s.robot.WriteTraceMessages(ctx, req.ResourceSpans)
}

// UploadDataFromPath uploads a file or directory from the robot to the cloud via the data manager.
func (s *Server) UploadDataFromPath(ctx context.Context, req *pb.UploadDataFromPathRequest) (
*pb.UploadDataFromPathResponse, error,
) {
fu, ff, bu, bt, ids, err := s.robot.UploadDataFromPath(ctx, req.GetPath(), req.GetUploadMetadata())
if err != nil {
return nil, err
}
return &pb.UploadDataFromPathResponse{
FilesUploaded: fu,
FilesFailed: ff,
BytesUploaded: bu,
BytesTotal: bt,
Ids: ids,
}, nil
}

// Tunnel tunnels traffic to/from the client from/to a specified port on the server.
func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
req, err := srv.Recv()
Expand Down
36 changes: 36 additions & 0 deletions robot/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/google/uuid"
"github.com/jhump/protoreflect/grpcreflect"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
armpb "go.viam.com/api/component/arm/v1"
pb "go.viam.com/api/robot/v1"
Expand Down Expand Up @@ -640,6 +641,41 @@ func TestServer(t *testing.T) {
test.That(t, resp.GetVersion(), test.ShouldEqual, "dev-unknown")
test.That(t, resp.GetApiVersion(), test.ShouldEqual, "?")
})

t.Run("UploadDataFromPath", func(t *testing.T) {
injectRobot := &inject.Robot{}
server := server.New(injectRobot)

// success: request fields reach the robot, and counts/ids map back through.
injectRobot.UploadDataFromPathFunc = func(
ctx context.Context, path string, md *datasyncpb.UploadMetadata,
) (uint64, uint64, uint64, uint64, []string, error) {
test.That(t, path, test.ShouldEqual, "/data/foo")
test.That(t, md.GetTags(), test.ShouldResemble, []string{"tag1"})
return 3, 1, 1024, 2048, []string{"id1", "id2", "id3"}, nil
}

resp, err := server.UploadDataFromPath(context.Background(), &pb.UploadDataFromPathRequest{
Path: "/data/foo",
UploadMetadata: &datasyncpb.UploadMetadata{Tags: []string{"tag1"}},
})
test.That(t, err, test.ShouldBeNil)
test.That(t, resp.GetFilesUploaded(), test.ShouldEqual, uint64(3))
test.That(t, resp.GetFilesFailed(), test.ShouldEqual, uint64(1))
test.That(t, resp.GetBytesUploaded(), test.ShouldEqual, uint64(1024))
test.That(t, resp.GetBytesTotal(), test.ShouldEqual, uint64(2048))
test.That(t, resp.GetIds(), test.ShouldResemble, []string{"id1", "id2", "id3"})

// error is surfaced to the caller.
injectRobot.UploadDataFromPathFunc = func(
ctx context.Context, path string, md *datasyncpb.UploadMetadata,
) (uint64, uint64, uint64, uint64, []string, error) {
return 0, 0, 0, 0, nil, errors.New("no data manager service configured")
}
_, err = server.UploadDataFromPath(context.Background(), &pb.UploadDataFromPathRequest{Path: "/data/foo"})
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "no data manager")
})
}

func TestModuleLogTimestamp(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions testutils/inject/robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/google/uuid"
datasyncpb "go.viam.com/api/app/datasync/v1"
"go.viam.com/utils/pexec"

"go.viam.com/rdk/cloud"
Expand Down Expand Up @@ -59,6 +60,11 @@ type Robot struct {
MachineStatusFunc func(ctx context.Context) (robot.MachineStatus, error)
ShutdownFunc func(ctx context.Context) error
ListTunnelsFunc func(ctx context.Context) ([]config.TrafficTunnelEndpoint, error)
UploadDataFromPathFunc func(
ctx context.Context,
path string,
uploadMetadata *datasyncpb.UploadMetadata,
) (uint64, uint64, uint64, uint64, []string, error)

ops *operation.Manager
SessMgr session.Manager
Expand Down Expand Up @@ -373,6 +379,18 @@ func (r *Robot) ListTunnels(ctx context.Context) ([]config.TrafficTunnelEndpoint
return r.ListTunnelsFunc(ctx)
}

// UploadDataFromPath calls the injected UploadDataFromPath or the real one.
func (r *Robot) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata) (
uint64, uint64, uint64, uint64, []string, error,
) {
r.Mu.RLock()
defer r.Mu.RUnlock()
if r.UploadDataFromPathFunc == nil {
return r.LocalRobot.UploadDataFromPath(ctx, path, uploadMetadata)
}
return r.UploadDataFromPathFunc(ctx, path, uploadMetadata)
}

type noopSessionManager struct{}

func (m noopSessionManager) Start(ctx context.Context, ownerID string) (*session.Session, error) {
Expand Down
Loading