diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index cb7c6f4bee..82f859d5b0 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -136,6 +136,29 @@ func CommandContext(ctx context.Context, host cow.ProcessHost, name string, arg return cmd } +// Attach wires IO relays to a process the caller has already resolved. +// Counterpart of [Command] / [CommandContext] for the destination side +// of live migration: caller obtains `p` via the host's restore path +// (e.g. gcs.Container.OpenProcessWithIO) and Attach binds the +// process's stdio to the supplied destination streams. +func Attach(ctx context.Context, p cow.Process, stdin io.Reader, stdout, stderr io.Writer) (*Cmd, error) { + cmd := &Cmd{ + Process: p, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Log: log.G(ctx).WithField("pid", p.Pid()), + Context: ctx, + ExitState: &ExitState{}, + allDoneCh: make(chan struct{}), + CopyAfterExitTimeout: time.Second, + } + if err := cmd.startRelay(); err != nil { + return nil, err + } + return cmd, nil +} + // Start starts a command. The caller must ensure that if Start succeeds, // Wait is eventually called to clean up resources. func (c *Cmd) Start() error { @@ -209,7 +232,13 @@ func (c *Cmd) Start() error { c.Log = c.Log.WithField("pid", p.Pid()) } - // Start relaying process IO. + return c.startRelay() +} + +// startRelay wires the IO relay goroutines and the context-cancel +// killer to [Cmd.Process]. +func (c *Cmd) startRelay() error { + p := c.Process stdin, stdout, stderr := p.Stdio() if c.Stdin != nil { // Do not make stdin part of the error group because there is no way for diff --git a/internal/cmd/cmd_test.go b/internal/cmd/cmd_test.go index d576d32778..c47deb4f85 100644 --- a/internal/cmd/cmd_test.go +++ b/internal/cmd/cmd_test.go @@ -137,6 +137,12 @@ func (p *localProcess) Pid() int { return p.p.Pid } +// MigrationState returns the zero value: the test fake doesn't use vsock +// or a GCS bridge. +func (p *localProcess) MigrationState() cow.MigrationState { + return cow.MigrationState{} +} + func (p *localProcess) ResizeConsole(ctx context.Context, x, y uint16) error { return errors.New("not supported") } @@ -280,3 +286,53 @@ func TestCmdStuckIo(t *testing.T) { t.Fatalf("expected: %v; got: %v", errIOTimeOut, err) } } + +// TestCmdAttach verifies that Attach binds a Cmd to a caller-supplied +// process and the resulting Cmd can be Wait'd to completion. Mirrors +// the migration restore path: caller obtains the process via the +// host's restore API (e.g. gcs.OpenProcessWithIO) and Attach wires IO. +func TestCmdAttach(t *testing.T) { + host := &localProcessHost{} + p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{ + CommandLine: "cmd /c exit /b 0", + }) + if err != nil { + t.Fatal(err) + } + + cmd, err := Attach(context.Background(), p, nil, nil, nil) + if err != nil { + t.Fatalf("Attach: %v", err) + } + if cmd.Process != p { + t.Fatal("Cmd.Process does not match the supplied process") + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait: %v", err) + } +} + +// TestCmdAttachIO verifies that Attach's IO relays flow process output +// to caller-supplied destination streams. +func TestCmdAttachIO(t *testing.T) { + host := &localProcessHost{} + p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{ + CommandLine: "cmd /c echo hello", + CreateStdOutPipe: true, + }) + if err != nil { + t.Fatal(err) + } + + var stdout bytes.Buffer + cmd, err := Attach(context.Background(), p, nil, &stdout, nil) + if err != nil { + t.Fatalf("Attach: %v", err) + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait: %v", err) + } + if got := stdout.String(); got != "hello\r\n" { + t.Fatalf("stdout=%q, want %q", got, "hello\r\n") + } +} diff --git a/internal/controller/process/mocks/mock_cow.go b/internal/controller/process/mocks/mock_cow.go index be3c0c63bd..199d1d70ea 100644 --- a/internal/controller/process/mocks/mock_cow.go +++ b/internal/controller/process/mocks/mock_cow.go @@ -130,6 +130,20 @@ func (mr *MockProcessMockRecorder) Kill(ctx any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Kill", reflect.TypeOf((*MockProcess)(nil).Kill), ctx) } +// MigrationState mocks base method. +func (m *MockProcess) MigrationState() cow.MigrationState { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MigrationState") + ret0, _ := ret[0].(cow.MigrationState) + return ret0 +} + +// MigrationState indicates an expected call of MigrationState. +func (mr *MockProcessMockRecorder) MigrationState() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MigrationState", reflect.TypeOf((*MockProcess)(nil).MigrationState)) +} + // Pid mocks base method. func (m *MockProcess) Pid() int { m.ctrl.T.Helper() diff --git a/internal/cow/cow.go b/internal/cow/cow.go index b60cd383b6..147958195f 100644 --- a/internal/cow/cow.go +++ b/internal/cow/cow.go @@ -10,6 +10,14 @@ import ( hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" ) +// MigrationState captures the host-side identifiers needed to rebind a +// process during live migration. Zero fields mean the host doesn't use the +// corresponding facility (e.g. vsock or a GCS bridge). +type MigrationState struct { + StdinPort, StdoutPort, StderrPort uint32 + WaitCallID int64 +} + // Process is the interface for an OS process running in a container or utility VM. type Process interface { // Close releases resources associated with the process and closes the @@ -27,6 +35,10 @@ type Process interface { CloseStderr(ctx context.Context) error // Pid returns the process ID. Pid() int + // MigrationState returns the host-side identifiers (vsock stdio ports + // and GCS bridge wait-call id) needed by the live-migration save path. + // Zero fields indicate the host doesn't use those facilities. + MigrationState() MigrationState // Stdio returns the stdio streams for a process. These may be nil if a stream // was not requested during CreateProcess. Stdio() (_ io.Writer, _ io.Reader, _ io.Reader) diff --git a/internal/gcs/bridge.go b/internal/gcs/bridge.go index 5b611f262e..072f694b46 100644 --- a/internal/gcs/bridge.go +++ b/internal/gcs/bridge.go @@ -451,3 +451,44 @@ func (brdg *bridge) sendRPC(buf *bytes.Buffer, enc *json.Encoder, call *rpc) err } return nil } + +// NextID returns the bridge's next request id. Used by the migration save +// path so the destination can seed its counter above all source ids. +func (brdg *bridge) NextID() int64 { + brdg.mu.Lock() + defer brdg.mu.Unlock() + return brdg.nextID +} + +// SeedNextID raises the request id allocator to at least next. No-op if +// already higher. +func (brdg *bridge) SeedNextID(next int64) { + brdg.mu.Lock() + defer brdg.mu.Unlock() + if next > brdg.nextID { + brdg.nextID = next + } +} + +// PreregisterRPC inserts a stub rpc into the response table without sending +// anything. Used on the migration destination to adopt requests (today: +// WaitForProcess) the source had outstanding in the guest, so their +// eventual responses route normally. +func (brdg *bridge) PreregisterRPC(id int64, proc prot.RPCProc, resp responseMessage) (*rpc, error) { + call := &rpc{ + ch: make(chan struct{}), + id: id, + proc: proc, + resp: resp, + } + brdg.mu.Lock() + defer brdg.mu.Unlock() + if brdg.rpcs == nil { + return nil, ErrBridgeClosed + } + if _, dup := brdg.rpcs[id]; dup { + return nil, fmt.Errorf("preregister rpc: id %d already in use", id) + } + brdg.rpcs[id] = call + return call, nil +} diff --git a/internal/gcs/container.go b/internal/gcs/container.go index 549abd35a2..c1152893fb 100644 --- a/internal/gcs/container.go +++ b/internal/gcs/container.go @@ -5,6 +5,7 @@ package gcs import ( "context" "errors" + "fmt" "sync" "time" @@ -67,9 +68,9 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf return c, nil } -// CloneContainer just creates the wrappers and sets up notification requests for a -// container that is already running inside the UVM (after cloning). -func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) { +// OpenContainer attaches a host-side wrapper to a container already +// running inside the UVM. +func (gc *GuestConnection) OpenContainer(_ context.Context, cid string) (_ *Container, err error) { c := &Container{ gc: gc, id: cid, @@ -118,6 +119,67 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co return c.gc.exec(ctx, c.id, config) } +// OpenProcessWithIO is the live-migration restore counterpart of +// [Container.CreateProcess]: it attaches to a process already running +// in this container, re-listens on the supplied vsock ports, and +// pre-registers the source bridge's WaitForProcess id so the guest's +// still-outstanding response is routed without arming a duplicate wait. +func (c *Container) OpenProcessWithIO(ctx context.Context, pid uint32, stdinPort, stdoutPort, stderrPort uint32, waitCallID int64) (_ *Process, err error) { + ctx, span := oc.StartSpan(ctx, "gcs::Container::OpenProcessWithIO", oc.WithClientSpanKind) + defer span.End() + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes( + trace.StringAttribute("cid", c.id), + trace.Int64Attribute("pid", int64(pid)), + trace.Int64Attribute("waitCallID", waitCallID)) + + if waitCallID == 0 { + return nil, fmt.Errorf("open process pid %d in container %s: waitCallID is required", pid, c.id) + } + + p := &Process{ + gc: c.gc, + cid: c.id, + id: pid, + stdinPort: stdinPort, + stdoutPort: stdoutPort, + stderrPort: stderrPort, + } + defer func() { + if err != nil { + p.Close() + } + }() + + listen := func(port uint32) (*ioChannel, error) { + if port == 0 { + return nil, nil + } + l, err := c.gc.ioListenFn(port) + if err != nil { + return nil, fmt.Errorf("listen vsock port %d: %w", port, err) + } + return newIoChannel(l), nil + } + if p.stdin, err = listen(stdinPort); err != nil { + return nil, err + } + if p.stdout, err = listen(stdoutPort); err != nil { + return nil, err + } + if p.stderr, err = listen(stderrPort); err != nil { + return nil, err + } + + p.waitCall, err = c.gc.brdg.PreregisterRPC(waitCallID, prot.RPCWaitForProcess, &p.waitResp) + if err != nil { + return nil, fmt.Errorf("preregister wait for pid %d in container %s (id %d): %w", pid, c.id, waitCallID, err) + } + go p.waitBackground() + log.G(ctx).WithField("pid", p.id).Debug("opened existing process with IO") + return p, nil +} + // ID returns the container's ID. func (c *Container) ID() string { return c.id diff --git a/internal/gcs/guestconnection.go b/internal/gcs/guestconnection.go index 35e6709d15..7f24a892a9 100644 --- a/internal/gcs/guestconnection.go +++ b/internal/gcs/guestconnection.go @@ -265,6 +265,45 @@ func (gc *GuestConnection) newIoChannel() (*ioChannel, uint32, error) { return newIoChannel(l), port, nil } +// SetNextPort raises the new-process IO port allocator floor. Called +// by the live-migration restore path after [Connect] to skip past +// vsock ports already in use by restored processes. Never goes +// backwards. +func (gc *GuestConnection) SetNextPort(p uint32) { + gc.mu.Lock() + defer gc.mu.Unlock() + if p > gc.nextPort { + gc.nextPort = p + } +} + +// NextPort returns the current allocator floor. Used by the +// live-migration save path to record what [SetNextPort] should be +// seeded with on the destination. +func (gc *GuestConnection) NextPort() uint32 { + gc.mu.Lock() + defer gc.mu.Unlock() + return gc.nextPort +} + +// BridgeNextID returns the bridge's next request id allocator value. +func (gc *GuestConnection) BridgeNextID() int64 { + return gc.brdg.NextID() +} + +// SeedBridgeNextID raises the bridge's request id allocator floor. +func (gc *GuestConnection) SeedBridgeNextID(next int64) { + gc.brdg.SeedNextID(next) +} + +// PreregisterWaitForProcess registers a stub WaitForProcess rpc against the +// source-issued id so a later [Container.OpenProcessWithIO] (or the bridge +// itself) routes the guest's response without issuing a duplicate wait. +func (gc *GuestConnection) PreregisterWaitForProcess(id int64, resp *prot.ContainerWaitForProcessResponse) error { + _, err := gc.brdg.PreregisterRPC(id, prot.RPCWaitForProcess, resp) + return err +} + func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error { gc.mu.Lock() defer gc.mu.Unlock() diff --git a/internal/gcs/process.go b/internal/gcs/process.go index 4c2428a657..2c4ea89190 100644 --- a/internal/gcs/process.go +++ b/internal/gcs/process.go @@ -34,6 +34,9 @@ type Process struct { stdin, stdout, stderr *ioChannel stdinCloseWriteOnce sync.Once stdinCloseWriteErr error + // stdinPort, stdoutPort, stderrPort are the vsock ports gc.exec + // allocated for this process's stdio relay. + stdinPort, stdoutPort, stderrPort uint32 } var _ cow.Process = &Process{} @@ -100,6 +103,9 @@ func (gc *GuestConnection) exec(ctx context.Context, cid string, params interfac g := winio.VsockServiceID(vsockSettings.StdErr) hvsockSettings.StdErr = &g } + // Snapshot the per-stream vsock ports so the live-migration snapshot + // can re-establish the same host-side listeners on the destination. + p.stdinPort, p.stdoutPort, p.stderrPort = vsockSettings.StdIn, vsockSettings.StdOut, vsockSettings.StdErr var resp prot.ContainerExecuteProcessResponse err = gc.brdg.RPC(ctx, prot.RPCExecuteProcess, &req, &resp, false) @@ -131,14 +137,20 @@ func (p *Process) Close() error { trace.StringAttribute("cid", p.cid), trace.Int64Attribute("pid", int64(p.id))) - if err := p.stdin.Close(); err != nil { - log.G(ctx).WithError(err).Warn("close stdin failed") + if p.stdin != nil { + if err := p.stdin.Close(); err != nil { + log.G(ctx).WithError(err).Warn("close stdin failed") + } } - if err := p.stdout.Close(); err != nil { - log.G(ctx).WithError(err).Warn("close stdout failed") + if p.stdout != nil { + if err := p.stdout.Close(); err != nil { + log.G(ctx).WithError(err).Warn("close stdout failed") + } } - if err := p.stderr.Close(); err != nil { - log.G(ctx).WithError(err).Warn("close stderr failed") + if p.stderr != nil { + if err := p.stderr.Close(); err != nil { + log.G(ctx).WithError(err).Warn("close stderr failed") + } } return nil } @@ -187,7 +199,10 @@ func (p *Process) ExitCode() (_ int, err error) { return -1, errors.New("process not exited") } if err := p.waitCall.Err(); err != nil { - return -1, err + var rerr *rpcError + if !errors.As(err, &rerr) || uint32(rerr.result) != hrNotFound { + return -1, err + } } return int(p.waitResp.ExitCode), nil } @@ -211,6 +226,20 @@ func (p *Process) Pid() int { return int(p.id) } +// MigrationState returns the vsock stdio ports and outstanding +// WaitForProcess bridge request id needed to rebind this process. +func (p *Process) MigrationState() cow.MigrationState { + s := cow.MigrationState{ + StdinPort: p.stdinPort, + StdoutPort: p.stdoutPort, + StderrPort: p.stderrPort, + } + if p.waitCall != nil { + s.WaitCallID = p.waitCall.id + } + return s +} + // ResizeConsole requests that the pty associated with the process resize its // window. func (p *Process) ResizeConsole(ctx context.Context, width, height uint16) (err error) { @@ -274,7 +303,8 @@ func (p *Process) Stdio() (stdin io.Writer, stdout, stderr io.Reader) { // Wait waits for the process (or guest connection) to terminate. func (p *Process) Wait() error { p.waitCall.Wait() - return p.waitCall.Err() + _, err := p.ExitCode() + return err } func (p *Process) waitBackground() { diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index d8d57e786d..648d6e4f8a 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -289,6 +289,10 @@ func (b *Bridge) AssignHandlers(mux *Mux, host *hcsv2.Host) { // messages and dispatches the appropriate handlers to handle each // event in an asynchronous manner. func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser) error { + // Each new connection starts unnegotiated, so the next NegotiateProtocol + // request dispatches to the PvInvalid handler registered in AssignHandlers. + b.protVer = prot.PvInvalid + requestChan := make(chan *Request) requestErrChan := make(chan error) b.responseChan = make(chan bridgeResponse) diff --git a/internal/guest/bridge/bridge_reconnect_test.go b/internal/guest/bridge/bridge_reconnect_test.go index 2a8c8b702a..3042bc5ff5 100644 --- a/internal/guest/bridge/bridge_reconnect_test.go +++ b/internal/guest/bridge/bridge_reconnect_test.go @@ -3,11 +3,14 @@ package bridge import ( + "encoding/json" "io" "testing" "time" + "github.com/Microsoft/hcsshim/internal/bridgeutils/gcserr" "github.com/Microsoft/hcsshim/internal/guest/prot" + "github.com/sirupsen/logrus" ) func TestBridge_NotificationQueuedWhenDisconnected(t *testing.T) { @@ -163,3 +166,78 @@ func TestBridge_FullReconnectCycle(t *testing.T) { t.Fatal("timed out waiting for drained notification") } } + +// TestBridge_ListenAndServeResetsProtocolVersion verifies that ListenAndServe +// resets protVer to PvInvalid on entry, so a fresh NegotiateProtocol after a +// reconnect dispatches to the PvInvalid-registered handler instead of falling +// through to UnknownMessageHandler. +func TestBridge_ListenAndServeResetsProtocolVersion(t *testing.T) { + logrus.SetOutput(io.Discard) + + lc := newLoopbackConnection() + defer lc.close() + + // Mirror AssignHandlers: negotiate handler is registered only at PvInvalid. + mux := NewBridgeMux() + var dispatchedVer prot.ProtocolVersion + mux.HandleFunc(prot.ComputeSystemNegotiateProtocolV1, prot.PvInvalid, + func(r *Request) (RequestResponse, error) { + dispatchedVer = r.Version + return &prot.NegotiateProtocolResponse{ + MessageResponseBase: prot.MessageResponseBase{ActivityID: r.ActivityID}, + Version: uint32(prot.PvV4), + }, nil + }) + + // Simulate a Bridge whose protVer survived the previous connection. + b := &Bridge{ + Handler: mux, + protVer: prot.PvV4, + } + + serveErr := make(chan error, 1) + go func() { + serveErr <- b.ListenAndServe(lc.SRead(), lc.SWrite()) + }() + defer func() { + // Fire-and-forget quit; ListenAndServe's teardown handles the reader. + b.quitChan <- true + select { + case err := <-serveErr: + if err != nil { + t.Errorf("ListenAndServe returned: %v", err) + } + default: + } + }() + + req := &prot.NegotiateProtocol{ + MessageBase: prot.MessageBase{ActivityID: "00000000-0000-0000-0000-000000000002"}, + MinimumVersion: uint32(prot.PvV4), + MaximumVersion: uint32(prot.PvV4), + } + if err := serverSend(lc.CWrite(), prot.ComputeSystemNegotiateProtocolV1, prot.SequenceID(1), req); err != nil { + t.Fatalf("send NegotiateProtocol: %v", err) + } + + header, body, err := serverRead(lc.CRead()) + if err != nil { + t.Fatalf("read NegotiateProtocol response: %v", err) + } + if header.Type != prot.ComputeSystemResponseNegotiateProtocolV1 { + t.Fatalf("unexpected response header type: %v", header.Type) + } + // Receiving the response guarantees the handler ran. + if dispatchedVer != prot.PvInvalid { + t.Fatalf("expected dispatch at PvInvalid, got %v", dispatchedVer) + } + + // Must not be UnknownMessageHandler's HrNotImpl. + resp := &prot.MessageResponseBase{} + if err := json.Unmarshal(body, resp); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if resp.Result == int32(gcserr.HrNotImpl) { + t.Fatalf("unexpected HrNotImpl response; body=%s", string(body)) + } +} diff --git a/internal/guest/bridge/bridge_unit_test.go b/internal/guest/bridge/bridge_unit_test.go index 613eaf326b..e969825ecf 100644 --- a/internal/guest/bridge/bridge_unit_test.go +++ b/internal/guest/bridge/bridge_unit_test.go @@ -546,10 +546,10 @@ func Test_Bridge_ListenAndServe_CorrectHandler_Success(t *testing.T) { ActivityID: rBody.ActivityID, }, nil } - mux.HandleFunc(prot.ComputeSystemResizeConsoleV1, prot.PvV4, resizeFn) + // ListenAndServe resets protVer to PvInvalid on entry. + mux.HandleFunc(prot.ComputeSystemResizeConsoleV1, prot.PvInvalid, resizeFn) b := &Bridge{ Handler: mux, - protVer: prot.PvV4, } go func() { @@ -615,12 +615,12 @@ func Test_Bridge_ListenAndServe_HandlersAreAsync_Success(t *testing.T) { Result: 10, }, nil } - mux.HandleFunc(prot.ComputeSystemResizeConsoleV1, prot.PvV4, firstFn) - mux.HandleFunc(prot.ComputeSystemModifySettingsV1, prot.PvV4, secondFn) + // ListenAndServe resets protVer to PvInvalid on entry. + mux.HandleFunc(prot.ComputeSystemResizeConsoleV1, prot.PvInvalid, firstFn) + mux.HandleFunc(prot.ComputeSystemModifySettingsV1, prot.PvInvalid, secondFn) b := &Bridge{ Handler: mux, - protVer: prot.PvV4, } go func() { diff --git a/internal/hcs/process.go b/internal/hcs/process.go index fef2bf546c..41a25026e6 100644 --- a/internal/hcs/process.go +++ b/internal/hcs/process.go @@ -57,6 +57,12 @@ func (process *Process) Pid() int { return process.processID } +// MigrationState returns the zero value: HCS processes route stdio over +// named pipes and don't use a GCS bridge. +func (process *Process) MigrationState() cow.MigrationState { + return cow.MigrationState{} +} + // SystemID returns the ID of the process's compute system. func (process *Process) SystemID() string { return process.system.ID() diff --git a/internal/jobcontainers/process.go b/internal/jobcontainers/process.go index cf3318f3b7..f2ac6d16f8 100644 --- a/internal/jobcontainers/process.go +++ b/internal/jobcontainers/process.go @@ -197,6 +197,12 @@ func (p *JobProcess) Pid() int { return p.cmd.Pid() } +// MigrationState returns the zero value: job-container processes route +// stdio over OS pipes and don't use a GCS bridge. +func (p *JobProcess) MigrationState() cow.MigrationState { + return cow.MigrationState{} +} + // Close cleans up any state associated with the process but does not kill it. func (p *JobProcess) Close() error { p.stdioLock.Lock() diff --git a/internal/vm/guestmanager/guest.go b/internal/vm/guestmanager/guest.go index a95e5b9265..76cc8d6324 100644 --- a/internal/vm/guestmanager/guest.go +++ b/internal/vm/guestmanager/guest.go @@ -161,3 +161,30 @@ func (gm *Guest) CloseConnection() error { } return err } + +// NextPort returns the active GCS connection's IO port allocator +// floor, or 0 if no connection is active. Used by the live-migration +// save path. +func (gm *Guest) NextPort() uint32 { + gm.mu.RLock() + defer gm.mu.RUnlock() + + if gm.gc == nil { + return 0 + } + return gm.gc.NextPort() +} + +// SetNextPort raises the active GCS connection's IO port allocator +// floor. No-op if no connection is active. Used by the live-migration +// restore path to skip past vsock ports already in use by restored +// processes. +func (gm *Guest) SetNextPort(p uint32) { + gm.mu.Lock() + defer gm.mu.Unlock() + + if gm.gc == nil { + return + } + gm.gc.SetNextPort(p) +} diff --git a/internal/vm/guestmanager/manager.go b/internal/vm/guestmanager/manager.go index 573a641399..8d942ed7ba 100644 --- a/internal/vm/guestmanager/manager.go +++ b/internal/vm/guestmanager/manager.go @@ -31,6 +31,21 @@ func (gm *Guest) CreateContainer(ctx context.Context, cid string, config interfa return c, nil } +// OpenContainer attaches a host-side wrapper to a container already +// running inside the UVM. Counterpart of [CreateContainer] for the +// live-migration restore path. +func (gm *Guest) OpenContainer(ctx context.Context, cid string) (*gcs.Container, error) { + gm.mu.Lock() + defer gm.mu.Unlock() + + c, err := gm.gc.OpenContainer(ctx, cid) + if err != nil { + return nil, fmt.Errorf("failed to open container %s: %w", cid, err) + } + + return c, nil +} + // DumpStacks requests a stack dump from the guest and returns it as a string. func (gm *Guest) DumpStacks(ctx context.Context) (string, error) { gm.mu.Lock() diff --git a/internal/vm/vmmanager/lifetime.go b/internal/vm/vmmanager/lifetime.go index f09459d5cf..03743939c4 100644 --- a/internal/vm/vmmanager/lifetime.go +++ b/internal/vm/vmmanager/lifetime.go @@ -87,6 +87,17 @@ func (uvm *UtilityVM) PropertiesV2(ctx context.Context, types ...hcsschema.Prope return props, nil } +// PropertiesV3 returns the properties of the utility VM from HCS using the V2 +// property query schema. +func (uvm *UtilityVM) PropertiesV3(ctx context.Context, query *hcsschema.PropertyQuery) (*hcsschema.Properties, error) { + props, err := uvm.cs.PropertiesV3(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to get properties from HCS: %w", err) + } + + return props, nil +} + // StartedTime returns the time when the utility VM entered the running state. func (uvm *UtilityVM) StartedTime() time.Time { return uvm.cs.StartedTime() diff --git a/internal/vm/vmmanager/migration.go b/internal/vm/vmmanager/migration.go new file mode 100644 index 0000000000..c2cfa18360 --- /dev/null +++ b/internal/vm/vmmanager/migration.go @@ -0,0 +1,65 @@ +//go:build windows && (lcow || wcow) + +package vmmanager + +import ( + "context" + "fmt" + + "github.com/Microsoft/hcsshim/internal/hcs" + hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" +) + +// StartWithMigrationOptions starts the utility VM as a live migration destination +// using the provided migration configuration. +func (uvm *UtilityVM) StartWithMigrationOptions(ctx context.Context, config *hcs.MigrationConfig) error { + if err := uvm.cs.StartWithMigrationOptions(ctx, config); err != nil { + return fmt.Errorf("failed to start utility VM with migration options: %w", err) + } + return nil +} + +// InitializeLiveMigrationOnSource initializes a live migration on the source side +// of the utility VM with the provided options. +func (uvm *UtilityVM) InitializeLiveMigrationOnSource(ctx context.Context, options *hcsschema.MigrationInitializeOptions) error { + if err := uvm.cs.InitializeLiveMigrationOnSource(ctx, options); err != nil { + return fmt.Errorf("failed to initialize live migration on source: %w", err) + } + return nil +} + +// StartLiveMigrationOnSource starts the live migration on the source side using +// the provided transport socket and session ID. +func (uvm *UtilityVM) StartLiveMigrationOnSource(ctx context.Context, config *hcs.MigrationConfig) error { + if err := uvm.cs.StartLiveMigrationOnSource(ctx, config); err != nil { + return fmt.Errorf("failed to start live migration on source: %w", err) + } + return nil +} + +// StartLiveMigrationTransfer starts the memory transfer phase of a live migration. +func (uvm *UtilityVM) StartLiveMigrationTransfer(ctx context.Context, options *hcsschema.MigrationTransferOptions) error { + if err := uvm.cs.StartLiveMigrationTransfer(ctx, options); err != nil { + return fmt.Errorf("failed to start live migration transfer: %w", err) + } + return nil +} + +// FinalizeLiveMigration completes the live migration workflow. If resume is true +// the utility VM is resumed; otherwise it is stopped. +func (uvm *UtilityVM) FinalizeLiveMigration(ctx context.Context, resume bool) error { + if err := uvm.cs.FinalizeLiveMigration(ctx, resume); err != nil { + return fmt.Errorf("failed to finalize live migration: %w", err) + } + return nil +} + +// MigrationNotifications returns a read-only channel that receives live migration +// event payloads for the utility VM. +func (uvm *UtilityVM) MigrationNotifications() (<-chan hcsschema.OperationSystemMigrationNotificationInfo, error) { + ch, err := uvm.cs.MigrationNotifications() + if err != nil { + return nil, fmt.Errorf("failed to get migration notifications channel: %w", err) + } + return ch, nil +} diff --git a/pkg/migration/parse.go b/pkg/migration/parse.go new file mode 100644 index 0000000000..7e4ff0900c --- /dev/null +++ b/pkg/migration/parse.go @@ -0,0 +1,65 @@ +//go:build windows + +package migration + +import ( + hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" +) + +// InitializeOptionsFromProto converts a protobuf [InitializeOptions] to the +// HCS schema [hcsschema.MigrationInitializeOptions]. +func InitializeOptionsFromProto(p *InitializeOptions) *hcsschema.MigrationInitializeOptions { + if p == nil { + return nil + } + return &hcsschema.MigrationInitializeOptions{ + MemoryTransport: memoryTransportFromProto(p.MemoryTransport), + MemoryTransferThrottleParams: throttleParamsFromProto(p.MemoryTransferThrottleParams), + CompressionSettings: compressionSettingsFromProto(p.CompressionSettings), + ChecksumVerification: p.ChecksumVerification, + PerfTracingEnabled: p.PerfTracingEnabled, + CancelIfBlackoutThresholdExceeds: p.CancelIfBlackoutThresholdExceeds, + PrepareMemoryTransferMode: p.PrepareMemoryTransferMode, + } +} + +// memoryTransportFromProto converts a protobuf [MemoryTransport] enum value to its HCS [hcsschema.MigrationMemoryTransport] equivalent. +func memoryTransportFromProto(t MemoryTransport) hcsschema.MigrationMemoryTransport { + switch t { + case MemoryTransport_MEMORY_TRANSPORT_TCP: + return hcsschema.MigrationMemoryTransportTCP + default: + return "" + } +} + +// throttleParamsFromProto converts a protobuf [MemoryTransferThrottleParams] to its HCS [hcsschema.MemoryMigrationTransferThrottleParams] equivalent. +func throttleParamsFromProto(p *MemoryTransferThrottleParams) *hcsschema.MemoryMigrationTransferThrottleParams { + if p == nil { + return nil + } + s := &hcsschema.MemoryMigrationTransferThrottleParams{ + SkipThrottling: p.SkipThrottling, + ThrottlingScale: p.ThrottlingScale, + TargetNumberOfBrownoutTransferPasses: p.TargetNumberOfBrownoutTransferPasses, + StartingBrownoutPassNumberForThrottling: p.StartingBrownoutPassNumberForThrottling, + MaximumNumberOfBrownoutTransferPasses: p.MaximumNumberOfBrownoutTransferPasses, + TargetBlackoutTransferTime: p.TargetBlackoutTransferTime, + BlackoutTimeThresholdForCancellingMigration: p.BlackoutTimeThresholdForCancellingMigration, + } + if p.MinimumThrottlePercentage != nil { + v := uint8(*p.MinimumThrottlePercentage) + s.MinimumThrottlePercentage = &v + } + return s +} + +// compressionSettingsFromProto converts a protobuf [CompressionSettings] to its HCS [hcsschema.MigrationCompressionSettings] equivalent. +func compressionSettingsFromProto(p *CompressionSettings) *hcsschema.MigrationCompressionSettings { + if p == nil { + return nil + } + return &hcsschema.MigrationCompressionSettings{ + ThrottleWorkerCount: p.ThrottleWorkerCount, + } +}