/* Copyright 2020 Roblox Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package containerd import ( "context" "fmt" "os" "sync" "syscall" "time" v1 "github.com/containerd/cgroups/stats/v1" v2 "github.com/containerd/cgroups/v2/stats" "github.com/containerd/containerd" "github.com/containerd/containerd/cio" "github.com/containerd/typeurl" "github.com/hashicorp/go-hclog" uuid "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client/stats" hstats "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/plugins/drivers" ) // taskHandle should store all relevant runtime information // such as process ID if this is a local task or other meta // data if this driver deals with external APIs type taskHandle struct { // stateLock syncs access to all fields below stateLock sync.RWMutex logger hclog.Logger taskConfig *drivers.TaskConfig procState drivers.TaskState startedAt time.Time completedAt time.Time exitResult *drivers.ExitResult totalCpuStats *stats.CpuStats userCpuStats *stats.CpuStats systemCpuStats *stats.CpuStats containerName string container containerd.Container task containerd.Task } func (h *taskHandle) TaskStatus(ctxContainerd context.Context) *drivers.TaskStatus { h.stateLock.RLock() defer h.stateLock.RUnlock() h.procState = drivers.TaskStateExited isRunning, err := h.IsRunning(ctxContainerd) if err != nil { h.procState = drivers.TaskStateUnknown } else if isRunning { h.procState = drivers.TaskStateRunning } return &drivers.TaskStatus{ ID: h.taskConfig.ID, Name: h.taskConfig.Name, State: h.procState, StartedAt: h.startedAt, CompletedAt: h.completedAt, ExitResult: h.exitResult, DriverAttributes: map[string]string{ "containerName": h.containerName, }, } } func (h *taskHandle) IsRunning(ctxContainerd context.Context) (bool, error) { h.stateLock.RLock() defer h.stateLock.RUnlock() status, err := h.task.Status(ctxContainerd) if err != nil { return false, fmt.Errorf("Error in getting task status: %v", err) } return (status.Status == containerd.Running), nil } func (h *taskHandle) run(ctxContainerd context.Context) { h.stateLock.Lock() defer h.stateLock.Unlock() // Every executor runs this init at creation for stats if err := hstats.Init(); err != nil { h.logger.Error("unable to initialize stats", "error", err) } // Sleep for 5 seconds to allow h.task.Wait() to kick in. // TODO: Use goroutine and a channel to synchronize this, instead of sleep. time.Sleep(5 * time.Second) h.task.Start(ctxContainerd) } // exec launches a new process in a running container. func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) { defer opts.Stdout.Close() defer opts.Stderr.Close() spec, err := h.container.Spec(ctxContainerd) if err != nil { return nil, err } pspec := spec.Process pspec.Terminal = opts.Tty pspec.Args = opts.Command execID, err := uuid.GenerateUUID() if err != nil { return nil, err } cioOpts := []cio.Opt{cio.WithStreams(opts.Stdin, opts.Stdout, opts.Stderr)} if opts.Tty { cioOpts = append(cioOpts, cio.WithTerminal) } ioCreator := cio.NewCreator(cioOpts...) process, err := h.task.Exec(ctxContainerd, execID[:8], pspec, ioCreator) if err != nil { return nil, err } defer process.Delete(ctxContainerd) statusC, err := process.Wait(ctxContainerd) if err != nil { return nil, err } if err := process.Start(ctxContainerd); err != nil { return nil, err } var code uint32 status := <-statusC code, _, err = status.Result() if err != nil { return nil, err } go func() { for { select { case <-ctx.Done(): return } } }() return &drivers.ExitResult{ ExitCode: int(code), }, nil } func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error { if err := h.task.Kill(ctxContainerd, signal); err != nil { return err } // timeout = 5 seconds, passed by nomad client // TODO: Make timeout configurable in task_config. This will allow users to set a higher timeout // if they need more time for their container to shutdown gracefully. time.Sleep(timeout) status, err := h.task.Status(ctxContainerd) if err != nil { return err } if status.Status != containerd.Running { h.logger.Info("Task is not running anymore, no need to SIGKILL") return nil } return h.task.Kill(ctxContainerd, syscall.SIGKILL) } func (h *taskHandle) cleanup(ctxContainerd context.Context) error { if _, err := h.task.Delete(ctxContainerd); err != nil { return err } if err := h.container.Delete(ctxContainerd, containerd.WithSnapshotCleanup); err != nil { return err } return nil } func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { ch := make(chan *drivers.TaskResourceUsage) go h.handleStats(ch, ctx, ctxContainerd, interval) return ch, nil } func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) { defer close(ch) timer := time.NewTimer(0) for { select { case <-ctx.Done(): return case <-timer.C: timer.Reset(interval) } // Get containerd task metric metric, err := h.task.Metrics(ctxContainerd) if err != nil { h.logger.Error("Failed to get task metric:", "error", err) return } anydata, err := typeurl.UnmarshalAny(metric.Data) if err != nil { h.logger.Error("Failed to unmarshal metric data:", "error", err) return } var taskResourceUsage *drivers.TaskResourceUsage switch data := anydata.(type) { case *v1.Metrics: taskResourceUsage = h.getV1TaskResourceUsage(data) case *v2.Metrics: taskResourceUsage = h.getV2TaskResourceUsage(data) default: h.logger.Error("Cannot convert metric data to cgroups.Metrics") return } select { case <-ctx.Done(): return case ch <- taskResourceUsage: } } } // Convert containerd V1 task metrics to TaskResourceUsage. func (h *taskHandle) getV1TaskResourceUsage(metrics *v1.Metrics) *drivers.TaskResourceUsage { totalPercent := h.totalCpuStats.Percent(float64(metrics.CPU.Usage.Total)) cs := &drivers.CpuStats{ SystemMode: h.systemCpuStats.Percent(float64(metrics.CPU.Usage.Kernel)), UserMode: h.userCpuStats.Percent(float64(metrics.CPU.Usage.User)), Percent: totalPercent, TotalTicks: h.totalCpuStats.TicksConsumed(totalPercent), Measured: []string{"Percent", "System Mode", "User Mode"}, } ms := &drivers.MemoryStats{ RSS: metrics.Memory.RSS, Cache: metrics.Memory.Cache, Swap: metrics.Memory.Swap.Usage, Usage: metrics.Memory.Usage.Usage, MaxUsage: metrics.Memory.Usage.Max, Measured: []string{"RSS", "Cache", "Swap", "Usage"}, } ts := time.Now().UTC().UnixNano() return &drivers.TaskResourceUsage{ ResourceUsage: &drivers.ResourceUsage{ CpuStats: cs, MemoryStats: ms, }, Timestamp: ts, } } // Convert containerd V2 task metrics to TaskResourceUsage. func (h *taskHandle) getV2TaskResourceUsage(metrics *v2.Metrics) *drivers.TaskResourceUsage { totalPercent := h.totalCpuStats.Percent(float64(metrics.CPU.SystemUsec + metrics.CPU.UserUsec)) cs := &drivers.CpuStats{ SystemMode: h.systemCpuStats.Percent(float64(metrics.CPU.SystemUsec)), UserMode: h.userCpuStats.Percent(float64(metrics.CPU.UserUsec)), Percent: totalPercent, TotalTicks: h.totalCpuStats.TicksConsumed(totalPercent), Measured: []string{"Percent", "System Mode", "User Mode"}, } ms := &drivers.MemoryStats{ Swap: metrics.Memory.SwapUsage, Usage: metrics.Memory.Usage, Measured: []string{"Swap", "Usage"}, } ts := time.Now().UTC().UnixNano() return &drivers.TaskResourceUsage{ ResourceUsage: &drivers.ResourceUsage{ CpuStats: cs, MemoryStats: ms, }, Timestamp: ts, } } func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error { return h.task.Kill(ctxContainerd, sig.(syscall.Signal)) }