From a23d84064e23571a837aba1fe040cdf4d4d17b7e Mon Sep 17 00:00:00 2001 From: Shishir Mahajan Date: Thu, 17 Dec 2020 14:03:31 -0800 Subject: [PATCH] Fix issue #1: Add timeout to containerd context. --- containerd/containerd.go | 37 +++++++++++++++++++++++------ containerd/driver.go | 11 +++++++-- containerd/handle.go | 51 +++++++++++++++++++++++++++++----------- 3 files changed, 76 insertions(+), 23 deletions(-) diff --git a/containerd/containerd.go b/containerd/containerd.go index 0735e90..19b1586 100644 --- a/containerd/containerd.go +++ b/containerd/containerd.go @@ -18,9 +18,11 @@ limitations under the License. package containerd import ( + "context" "fmt" "os" "syscall" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/cio" @@ -43,15 +45,24 @@ type ContainerConfig struct { } func (d *Driver) isContainerdRunning() (bool, error) { - return d.client.IsServing(d.ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second) + defer cancel() + + return d.client.IsServing(ctxWithTimeout) } func (d *Driver) getContainerdVersion() (containerd.Version, error) { - return d.client.Version(d.ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second) + defer cancel() + + return d.client.Version(ctxWithTimeout) } func (d *Driver) pullImage(imageName string) (containerd.Image, error) { - return d.client.Pull(d.ctxContainerd, imageName, containerd.WithPullUnpack) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 90*time.Second) + defer cancel() + + return d.client.Pull(ctxWithTimeout, imageName, containerd.WithPullUnpack) } func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskConfig) (containerd.Container, error) { @@ -185,8 +196,11 @@ func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskC opts = append(opts, oci.WithLinuxNamespace(specs.LinuxNamespace{Type: specs.NetworkNamespace, Path: containerConfig.NetworkNamespacePath})) } + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second) + defer cancel() + return d.client.NewContainer( - d.ctxContainerd, + ctxWithTimeout, containerConfig.ContainerName, containerd.WithRuntime(d.config.ContainerdRuntime, nil), containerd.WithNewSnapshot(containerConfig.ContainerSnapshotName, containerConfig.Image), @@ -205,7 +219,10 @@ func buildMountpoint(mountType, mountTarget, mountSource string, mountOptions [] } func (d *Driver) loadContainer(id string) (containerd.Container, error) { - return d.client.LoadContainer(d.ctxContainerd, id) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second) + defer cancel() + + return d.client.LoadContainer(ctxWithTimeout, id) } func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) { @@ -219,7 +236,10 @@ func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPa return nil, err } - return container.NewTask(d.ctxContainerd, cio.NewCreator(cio.WithStreams(nil, stdout, stderr))) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second) + defer cancel() + + return container.NewTask(ctxWithTimeout, cio.NewCreator(cio.WithStreams(nil, stdout, stderr))) } // FIFO's are named pipes in linux. @@ -229,5 +249,8 @@ func openFIFO(path string) (*os.File, error) { } func (d *Driver) getTask(container containerd.Container) (containerd.Task, error) { - return container.Task(d.ctxContainerd, cio.Load) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second) + defer cancel() + + return container.Task(ctxWithTimeout, cio.Load) } diff --git a/containerd/driver.go b/containerd/driver.go index 0a9dfc4..b251706 100644 --- a/containerd/driver.go +++ b/containerd/driver.go @@ -429,6 +429,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.tasks.Set(cfg.ID, h) + go h.run(d.ctxContainerd) return handle, nil, nil } @@ -474,7 +475,10 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("Error in recovering task: %v", err) } - status, err := task.Status(d.ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second) + defer cancel() + + status, err := task.Status(ctxWithTimeout) if err != nil { return fmt.Errorf("Error in recovering task status: %v", err) } @@ -519,7 +523,10 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr defer close(ch) var result *drivers.ExitResult - exitStatusCh, err := handle.task.Wait(d.ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second) + defer cancel() + + exitStatusCh, err := handle.task.Wait(ctxWithTimeout) if err != nil { result = &drivers.ExitResult{ Err: fmt.Errorf("executor: error waiting on process: %v", err), diff --git a/containerd/handle.go b/containerd/handle.go index 9fde3aa..9940a6a 100644 --- a/containerd/handle.go +++ b/containerd/handle.go @@ -88,7 +88,10 @@ func (h *taskHandle) IsRunning(ctxContainerd context.Context) (bool, error) { h.stateLock.RLock() defer h.stateLock.RUnlock() - status, err := h.task.Status(ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 15*time.Second) + defer cancel() + + status, err := h.task.Status(ctxWithTimeout) if err != nil { return false, fmt.Errorf("Error in getting task status: %v", err) } @@ -109,7 +112,10 @@ func (h *taskHandle) run(ctxContainerd context.Context) { // TODO: Use goroutine and a channel to synchronize this, instead of sleep. time.Sleep(5 * time.Second) - h.task.Start(ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second) + defer cancel() + + h.task.Start(ctxWithTimeout) } // exec launches a new process in a running container. @@ -117,7 +123,10 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt defer opts.Stdout.Close() defer opts.Stderr.Close() - spec, err := h.container.Spec(ctxContainerd) + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second) + defer cancel() + + spec, err := h.container.Spec(ctxWithTimeout) if err != nil { return nil, err } @@ -136,19 +145,19 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt } ioCreator := cio.NewCreator(cioOpts...) - process, err := h.task.Exec(ctxContainerd, execID[:8], pspec, ioCreator) + process, err := h.task.Exec(ctxWithTimeout, execID[:8], pspec, ioCreator) if err != nil { return nil, err } - defer process.Delete(ctxContainerd) + defer process.Delete(ctxWithTimeout) - statusC, err := process.Wait(ctxContainerd) + statusC, err := process.Wait(ctxWithTimeout) if err != nil { return nil, err } - if err := process.Start(ctxContainerd); err != nil { + if err := process.Start(ctxWithTimeout); err != nil { return nil, err } @@ -175,7 +184,10 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt } func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error { - if err := h.task.Kill(ctxContainerd, signal); err != nil { + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second) + defer cancel() + + if err := h.task.Kill(ctxWithTimeout, signal); err != nil { return err } @@ -184,7 +196,7 @@ func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Durati // if they need more time for their container to shutdown gracefully. time.Sleep(timeout) - status, err := h.task.Status(ctxContainerd) + status, err := h.task.Status(ctxWithTimeout) if err != nil { return err } @@ -194,14 +206,17 @@ func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Durati return nil } - return h.task.Kill(ctxContainerd, syscall.SIGKILL) + return h.task.Kill(ctxWithTimeout, syscall.SIGKILL) } func (h *taskHandle) cleanup(ctxContainerd context.Context) error { - if _, err := h.task.Delete(ctxContainerd); err != nil { + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second) + defer cancel() + + if _, err := h.task.Delete(ctxWithTimeout); err != nil { return err } - if err := h.container.Delete(ctxContainerd, containerd.WithSnapshotCleanup); err != nil { + if err := h.container.Delete(ctxWithTimeout, containerd.WithSnapshotCleanup); err != nil { return err } return nil @@ -209,6 +224,7 @@ func (h *taskHandle) cleanup(ctxContainerd context.Context) error { func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { ch := make(chan *drivers.TaskResourceUsage) + go h.handleStats(ch, ctx, ctxContainerd, interval) return ch, nil @@ -216,6 +232,10 @@ func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Dur func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) { defer close(ch) + + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second) + defer cancel() + timer := time.NewTimer(0) for { select { @@ -226,7 +246,7 @@ func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxCon } // Get containerd task metric - metric, err := h.task.Metrics(ctxContainerd) + metric, err := h.task.Metrics(ctxWithTimeout) if err != nil { h.logger.Error("Failed to get task metric:", "error", err) return @@ -315,5 +335,8 @@ func (h *taskHandle) getV2TaskResourceUsage(metrics *v2.Metrics) *drivers.TaskRe } } func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error { - return h.task.Kill(ctxContainerd, sig.(syscall.Signal)) + ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second) + defer cancel() + + return h.task.Kill(ctxWithTimeout, sig.(syscall.Signal)) }