Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ func CommandContext(ctx context.Context, host cow.ProcessHost, name string, arg
return cmd
}

// Attach wires IO relays to a process the caller has already resolved.
// Counterpart of [Command] / [CommandContext] for the destination side
// of live migration: caller obtains `p` via the host's restore path
// (e.g. gcs.Container.OpenProcessWithIO) and Attach binds the
// process's stdio to the supplied destination streams.
func Attach(ctx context.Context, p cow.Process, stdin io.Reader, stdout, stderr io.Writer) (*Cmd, error) {
cmd := &Cmd{
Process: p,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Log: log.G(ctx).WithField("pid", p.Pid()),
Context: ctx,
ExitState: &ExitState{},
allDoneCh: make(chan struct{}),
CopyAfterExitTimeout: time.Second,
}
if err := cmd.startRelay(); err != nil {
return nil, err
}
return cmd, nil
}

// Start starts a command. The caller must ensure that if Start succeeds,
// Wait is eventually called to clean up resources.
func (c *Cmd) Start() error {
Expand Down Expand Up @@ -209,7 +232,13 @@ func (c *Cmd) Start() error {
c.Log = c.Log.WithField("pid", p.Pid())
}

// Start relaying process IO.
return c.startRelay()
}

// startRelay wires the IO relay goroutines and the context-cancel
// killer to [Cmd.Process].
func (c *Cmd) startRelay() error {
p := c.Process
stdin, stdout, stderr := p.Stdio()
if c.Stdin != nil {
// Do not make stdin part of the error group because there is no way for
Expand Down
55 changes: 55 additions & 0 deletions internal/cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func (p *localProcess) Pid() int {
return p.p.Pid
}

// IOPorts always returns zeros: the test fake uses OS pipes, not vsock.
func (p *localProcess) IOPorts() (stdin, stdout, stderr uint32) {
return 0, 0, 0
}

func (p *localProcess) ResizeConsole(ctx context.Context, x, y uint16) error {
return errors.New("not supported")
}
Expand Down Expand Up @@ -280,3 +285,53 @@ func TestCmdStuckIo(t *testing.T) {
t.Fatalf("expected: %v; got: %v", errIOTimeOut, err)
}
}

// TestCmdAttach verifies that Attach binds a Cmd to a caller-supplied
// process and the resulting Cmd can be Wait'd to completion. Mirrors
// the migration restore path: caller obtains the process via the
// host's restore API (e.g. gcs.OpenProcessWithIO) and Attach wires IO.
func TestCmdAttach(t *testing.T) {
host := &localProcessHost{}
p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{
CommandLine: "cmd /c exit /b 0",
})
if err != nil {
t.Fatal(err)
}

cmd, err := Attach(context.Background(), p, nil, nil, nil)
if err != nil {
t.Fatalf("Attach: %v", err)
}
if cmd.Process != p {
t.Fatal("Cmd.Process does not match the supplied process")
}
if err := cmd.Wait(); err != nil {
t.Fatalf("Wait: %v", err)
}
}

// TestCmdAttachIO verifies that Attach's IO relays flow process output
// to caller-supplied destination streams.
func TestCmdAttachIO(t *testing.T) {
host := &localProcessHost{}
p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{
CommandLine: "cmd /c echo hello",
CreateStdOutPipe: true,
})
if err != nil {
t.Fatal(err)
}

var stdout bytes.Buffer
cmd, err := Attach(context.Background(), p, nil, &stdout, nil)
if err != nil {
t.Fatalf("Attach: %v", err)
}
if err := cmd.Wait(); err != nil {
t.Fatalf("Wait: %v", err)
}
if got := stdout.String(); got != "hello\r\n" {
t.Fatalf("stdout=%q, want %q", got, "hello\r\n")
}
}
16 changes: 16 additions & 0 deletions internal/controller/process/mocks/mock_cow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/cow/cow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type Process interface {
CloseStderr(ctx context.Context) error
// Pid returns the process ID.
Pid() int
// IOPorts returns the host-side vsock ports allocated for this
// process's stdio relay, or zeros for hosts that don't use vsock
// (WCOW HCS, job containers). Used by the live-migration save path.
IOPorts() (stdin, stdout, stderr uint32)
// Stdio returns the stdio streams for a process. These may be nil if a stream
// was not requested during CreateProcess.
Stdio() (_ io.Writer, _ io.Reader, _ io.Reader)
Expand Down
67 changes: 64 additions & 3 deletions internal/gcs/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gcs
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -67,9 +68,9 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
return c, nil
}

// CloneContainer just creates the wrappers and sets up notification requests for a
// container that is already running inside the UVM (after cloning).
func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) {
// OpenContainer attaches a host-side wrapper to a container already
// running inside the UVM.
func (gc *GuestConnection) OpenContainer(_ context.Context, cid string) (_ *Container, err error) {
c := &Container{
gc: gc,
id: cid,
Expand Down Expand Up @@ -118,6 +119,66 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co
return c.gc.exec(ctx, c.id, config)
}

// OpenProcessWithIO is the live-migration restore counterpart of
// [Container.CreateProcess]: it attaches to a process already running
// in this container and re-listens on the supplied vsock ports.
func (c *Container) OpenProcessWithIO(ctx context.Context, pid uint32, stdinPort, stdoutPort, stderrPort uint32) (_ *Process, err error) {
ctx, span := oc.StartSpan(ctx, "gcs::Container::OpenProcessWithIO", oc.WithClientSpanKind)
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(
trace.StringAttribute("cid", c.id),
trace.Int64Attribute("pid", int64(pid)))

p := &Process{
gc: c.gc,
cid: c.id,
id: pid,
stdinPort: stdinPort,
stdoutPort: stdoutPort,
stderrPort: stderrPort,
}
defer func() {
if err != nil {
p.Close()
}
}()

listen := func(port uint32) (*ioChannel, error) {
if port == 0 {
return nil, nil
}
l, err := c.gc.ioListenFn(port)
if err != nil {
return nil, fmt.Errorf("listen vsock port %d: %w", port, err)
}
return newIoChannel(l), nil
}
if p.stdin, err = listen(stdinPort); err != nil {
return nil, err
}
if p.stdout, err = listen(stdoutPort); err != nil {
return nil, err
}
if p.stderr, err = listen(stderrPort); err != nil {
return nil, err
}

// Subscribe to the process exit notification.
waitReq := prot.ContainerWaitForProcess{
RequestBase: makeRequest(ctx, c.id),
ProcessID: p.id,
TimeoutInMs: 0xffffffff,
}
p.waitCall, err = c.gc.brdg.AsyncRPC(ctx, prot.RPCWaitForProcess, &waitReq, &p.waitResp)
if err != nil {
return nil, fmt.Errorf("failed to wait on existing process pid %d in container %s: %w", pid, c.id, err)
}
go p.waitBackground()
log.G(ctx).WithField("pid", p.id).Debug("opened existing process with IO")
return p, nil
}

// ID returns the container's ID.
func (c *Container) ID() string {
return c.id
Expand Down
21 changes: 21 additions & 0 deletions internal/gcs/guestconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,27 @@ func (gc *GuestConnection) newIoChannel() (*ioChannel, uint32, error) {
return newIoChannel(l), port, nil
}

// SetNextPort raises the new-process IO port allocator floor. Called
// by the live-migration restore path after [Connect] to skip past
// vsock ports already in use by restored processes. Never goes
// backwards.
func (gc *GuestConnection) SetNextPort(p uint32) {
gc.mu.Lock()
defer gc.mu.Unlock()
if p > gc.nextPort {
gc.nextPort = p
}
}

// NextPort returns the current allocator floor. Used by the
// live-migration save path to record what [SetNextPort] should be
// seeded with on the destination.
func (gc *GuestConnection) NextPort() uint32 {
gc.mu.Lock()
defer gc.mu.Unlock()
return gc.nextPort
}

func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error {
gc.mu.Lock()
defer gc.mu.Unlock()
Expand Down
31 changes: 25 additions & 6 deletions internal/gcs/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Process struct {
stdin, stdout, stderr *ioChannel
stdinCloseWriteOnce sync.Once
stdinCloseWriteErr error
// stdinPort, stdoutPort, stderrPort record the vsock ports that
// gc.exec allocated for this process's stdio relay.
stdinPort, stdoutPort, stderrPort uint32
}

var _ cow.Process = &Process{}
Expand Down Expand Up @@ -100,6 +103,9 @@ func (gc *GuestConnection) exec(ctx context.Context, cid string, params interfac
g := winio.VsockServiceID(vsockSettings.StdErr)
hvsockSettings.StdErr = &g
}
// Snapshot the per-stream vsock ports so the live-migration snapshot
// can re-establish the same host-side listeners on the destination.
p.stdinPort, p.stdoutPort, p.stderrPort = vsockSettings.StdIn, vsockSettings.StdOut, vsockSettings.StdErr

var resp prot.ContainerExecuteProcessResponse
err = gc.brdg.RPC(ctx, prot.RPCExecuteProcess, &req, &resp, false)
Expand Down Expand Up @@ -131,14 +137,20 @@ func (p *Process) Close() error {
trace.StringAttribute("cid", p.cid),
trace.Int64Attribute("pid", int64(p.id)))

if err := p.stdin.Close(); err != nil {
log.G(ctx).WithError(err).Warn("close stdin failed")
if p.stdin != nil {
if err := p.stdin.Close(); err != nil {
log.G(ctx).WithError(err).Warn("close stdin failed")
}
}
if err := p.stdout.Close(); err != nil {
log.G(ctx).WithError(err).Warn("close stdout failed")
if p.stdout != nil {
if err := p.stdout.Close(); err != nil {
log.G(ctx).WithError(err).Warn("close stdout failed")
}
}
if err := p.stderr.Close(); err != nil {
log.G(ctx).WithError(err).Warn("close stderr failed")
if p.stderr != nil {
if err := p.stderr.Close(); err != nil {
log.G(ctx).WithError(err).Warn("close stderr failed")
}
}
return nil
}
Expand Down Expand Up @@ -211,6 +223,13 @@ func (p *Process) Pid() int {
return int(p.id)
}

// IOPorts returns the host-side vsock ports allocated for this process's
// stdin/stdout/stderr relay (zero if the corresponding stream was not
// opened).
func (p *Process) IOPorts() (stdin, stdout, stderr uint32) {
return p.stdinPort, p.stdoutPort, p.stderrPort
}

// ResizeConsole requests that the pty associated with the process resize its
// window.
func (p *Process) ResizeConsole(ctx context.Context, width, height uint16) (err error) {
Expand Down
6 changes: 6 additions & 0 deletions internal/hcs/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func (process *Process) Pid() int {
return process.processID
}

// IOPorts always returns zeros: HCS processes route stdio over named
// pipes, not vsock. Implemented to satisfy [cow.Process].
func (process *Process) IOPorts() (stdin, stdout, stderr uint32) {
return 0, 0, 0
}

// SystemID returns the ID of the process's compute system.
func (process *Process) SystemID() string {
return process.system.ID()
Expand Down
6 changes: 6 additions & 0 deletions internal/jobcontainers/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (p *JobProcess) Pid() int {
return p.cmd.Pid()
}

// IOPorts always returns zeros: job-container processes route stdio
// over OS pipes, not vsock. Implemented to satisfy [cow.Process].
func (p *JobProcess) IOPorts() (stdin, stdout, stderr uint32) {
return 0, 0, 0
}

// Close cleans up any state associated with the process but does not kill it.
func (p *JobProcess) Close() error {
p.stdioLock.Lock()
Expand Down
27 changes: 27 additions & 0 deletions internal/vm/guestmanager/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,30 @@ func (gm *Guest) CloseConnection() error {

return err
}

// NextPort returns the active GCS connection's IO port allocator
// floor, or 0 if no connection is active. Used by the live-migration
// save path.
func (gm *Guest) NextPort() uint32 {
gm.mu.RLock()
defer gm.mu.RUnlock()

if gm.gc == nil {
return 0
}
return gm.gc.NextPort()
}

// SetNextPort raises the active GCS connection's IO port allocator
// floor. No-op if no connection is active. Used by the live-migration
// restore path to skip past vsock ports already in use by restored
// processes.
func (gm *Guest) SetNextPort(p uint32) {
gm.mu.Lock()
defer gm.mu.Unlock()

if gm.gc == nil {
return
}
gm.gc.SetNextPort(p)
}
Loading
Loading