Fix issue #1: Add timeout to containerd context.

This commit is contained in:
Shishir Mahajan 2020-12-17 14:03:31 -08:00
parent 6ccc13a3cb
commit a23d84064e
3 changed files with 76 additions and 23 deletions

View File

@ -18,9 +18,11 @@ limitations under the License.
package containerd package containerd
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"syscall" "syscall"
"time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
@ -43,15 +45,24 @@ type ContainerConfig struct {
} }
func (d *Driver) isContainerdRunning() (bool, error) { func (d *Driver) isContainerdRunning() (bool, error) {
return d.client.IsServing(d.ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second)
defer cancel()
return d.client.IsServing(ctxWithTimeout)
} }
func (d *Driver) getContainerdVersion() (containerd.Version, error) { func (d *Driver) getContainerdVersion() (containerd.Version, error) {
return d.client.Version(d.ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second)
defer cancel()
return d.client.Version(ctxWithTimeout)
} }
func (d *Driver) pullImage(imageName string) (containerd.Image, error) { func (d *Driver) pullImage(imageName string) (containerd.Image, error) {
return d.client.Pull(d.ctxContainerd, imageName, containerd.WithPullUnpack) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 90*time.Second)
defer cancel()
return d.client.Pull(ctxWithTimeout, imageName, containerd.WithPullUnpack)
} }
func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskConfig) (containerd.Container, error) { func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskConfig) (containerd.Container, error) {
@ -185,8 +196,11 @@ func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskC
opts = append(opts, oci.WithLinuxNamespace(specs.LinuxNamespace{Type: specs.NetworkNamespace, Path: containerConfig.NetworkNamespacePath})) opts = append(opts, oci.WithLinuxNamespace(specs.LinuxNamespace{Type: specs.NetworkNamespace, Path: containerConfig.NetworkNamespacePath}))
} }
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second)
defer cancel()
return d.client.NewContainer( return d.client.NewContainer(
d.ctxContainerd, ctxWithTimeout,
containerConfig.ContainerName, containerConfig.ContainerName,
containerd.WithRuntime(d.config.ContainerdRuntime, nil), containerd.WithRuntime(d.config.ContainerdRuntime, nil),
containerd.WithNewSnapshot(containerConfig.ContainerSnapshotName, containerConfig.Image), containerd.WithNewSnapshot(containerConfig.ContainerSnapshotName, containerConfig.Image),
@ -205,7 +219,10 @@ func buildMountpoint(mountType, mountTarget, mountSource string, mountOptions []
} }
func (d *Driver) loadContainer(id string) (containerd.Container, error) { func (d *Driver) loadContainer(id string) (containerd.Container, error) {
return d.client.LoadContainer(d.ctxContainerd, id) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second)
defer cancel()
return d.client.LoadContainer(ctxWithTimeout, id)
} }
func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) { func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) {
@ -219,7 +236,10 @@ func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPa
return nil, err return nil, err
} }
return container.NewTask(d.ctxContainerd, cio.NewCreator(cio.WithStreams(nil, stdout, stderr))) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
defer cancel()
return container.NewTask(ctxWithTimeout, cio.NewCreator(cio.WithStreams(nil, stdout, stderr)))
} }
// FIFO's are named pipes in linux. // FIFO's are named pipes in linux.
@ -229,5 +249,8 @@ func openFIFO(path string) (*os.File, error) {
} }
func (d *Driver) getTask(container containerd.Container) (containerd.Task, error) { func (d *Driver) getTask(container containerd.Container) (containerd.Task, error) {
return container.Task(d.ctxContainerd, cio.Load) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 15*time.Second)
defer cancel()
return container.Task(ctxWithTimeout, cio.Load)
} }

View File

@ -429,6 +429,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
} }
d.tasks.Set(cfg.ID, h) d.tasks.Set(cfg.ID, h)
go h.run(d.ctxContainerd) go h.run(d.ctxContainerd)
return handle, nil, nil return handle, nil, nil
} }
@ -474,7 +475,10 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("Error in recovering task: %v", err) return fmt.Errorf("Error in recovering task: %v", err)
} }
status, err := task.Status(d.ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
defer cancel()
status, err := task.Status(ctxWithTimeout)
if err != nil { if err != nil {
return fmt.Errorf("Error in recovering task status: %v", err) return fmt.Errorf("Error in recovering task status: %v", err)
} }
@ -519,7 +523,10 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
defer close(ch) defer close(ch)
var result *drivers.ExitResult var result *drivers.ExitResult
exitStatusCh, err := handle.task.Wait(d.ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
defer cancel()
exitStatusCh, err := handle.task.Wait(ctxWithTimeout)
if err != nil { if err != nil {
result = &drivers.ExitResult{ result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err), Err: fmt.Errorf("executor: error waiting on process: %v", err),

View File

@ -88,7 +88,10 @@ func (h *taskHandle) IsRunning(ctxContainerd context.Context) (bool, error) {
h.stateLock.RLock() h.stateLock.RLock()
defer h.stateLock.RUnlock() defer h.stateLock.RUnlock()
status, err := h.task.Status(ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 15*time.Second)
defer cancel()
status, err := h.task.Status(ctxWithTimeout)
if err != nil { if err != nil {
return false, fmt.Errorf("Error in getting task status: %v", err) return false, fmt.Errorf("Error in getting task status: %v", err)
} }
@ -109,7 +112,10 @@ func (h *taskHandle) run(ctxContainerd context.Context) {
// TODO: Use goroutine and a channel to synchronize this, instead of sleep. // TODO: Use goroutine and a channel to synchronize this, instead of sleep.
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
h.task.Start(ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
defer cancel()
h.task.Start(ctxWithTimeout)
} }
// exec launches a new process in a running container. // exec launches a new process in a running container.
@ -117,7 +123,10 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt
defer opts.Stdout.Close() defer opts.Stdout.Close()
defer opts.Stderr.Close() defer opts.Stderr.Close()
spec, err := h.container.Spec(ctxContainerd) ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
defer cancel()
spec, err := h.container.Spec(ctxWithTimeout)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -136,19 +145,19 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt
} }
ioCreator := cio.NewCreator(cioOpts...) ioCreator := cio.NewCreator(cioOpts...)
process, err := h.task.Exec(ctxContainerd, execID[:8], pspec, ioCreator) process, err := h.task.Exec(ctxWithTimeout, execID[:8], pspec, ioCreator)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer process.Delete(ctxContainerd) defer process.Delete(ctxWithTimeout)
statusC, err := process.Wait(ctxContainerd) statusC, err := process.Wait(ctxWithTimeout)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := process.Start(ctxContainerd); err != nil { if err := process.Start(ctxWithTimeout); err != nil {
return nil, err return nil, err
} }
@ -175,7 +184,10 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt
} }
func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error { func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error {
if err := h.task.Kill(ctxContainerd, signal); err != nil { ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
defer cancel()
if err := h.task.Kill(ctxWithTimeout, signal); err != nil {
return err return err
} }
@ -184,7 +196,7 @@ func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Durati
// if they need more time for their container to shutdown gracefully. // if they need more time for their container to shutdown gracefully.
time.Sleep(timeout) time.Sleep(timeout)
status, err := h.task.Status(ctxContainerd) status, err := h.task.Status(ctxWithTimeout)
if err != nil { if err != nil {
return err return err
} }
@ -194,14 +206,17 @@ func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Durati
return nil return nil
} }
return h.task.Kill(ctxContainerd, syscall.SIGKILL) return h.task.Kill(ctxWithTimeout, syscall.SIGKILL)
} }
func (h *taskHandle) cleanup(ctxContainerd context.Context) error { func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
if _, err := h.task.Delete(ctxContainerd); err != nil { ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
defer cancel()
if _, err := h.task.Delete(ctxWithTimeout); err != nil {
return err return err
} }
if err := h.container.Delete(ctxContainerd, containerd.WithSnapshotCleanup); err != nil { if err := h.container.Delete(ctxWithTimeout, containerd.WithSnapshotCleanup); err != nil {
return err return err
} }
return nil return nil
@ -209,6 +224,7 @@ func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
ch := make(chan *drivers.TaskResourceUsage) ch := make(chan *drivers.TaskResourceUsage)
go h.handleStats(ch, ctx, ctxContainerd, interval) go h.handleStats(ch, ctx, ctxContainerd, interval)
return ch, nil return ch, nil
@ -216,6 +232,10 @@ func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Dur
func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) { func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) {
defer close(ch) defer close(ch)
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
defer cancel()
timer := time.NewTimer(0) timer := time.NewTimer(0)
for { for {
select { select {
@ -226,7 +246,7 @@ func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxCon
} }
// Get containerd task metric // Get containerd task metric
metric, err := h.task.Metrics(ctxContainerd) metric, err := h.task.Metrics(ctxWithTimeout)
if err != nil { if err != nil {
h.logger.Error("Failed to get task metric:", "error", err) h.logger.Error("Failed to get task metric:", "error", err)
return return
@ -315,5 +335,8 @@ func (h *taskHandle) getV2TaskResourceUsage(metrics *v2.Metrics) *drivers.TaskRe
} }
} }
func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error { func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error {
return h.task.Kill(ctxContainerd, sig.(syscall.Signal)) ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
defer cancel()
return h.task.Kill(ctxWithTimeout, sig.(syscall.Signal))
} }