348 lines
9.1 KiB
Go
348 lines
9.1 KiB
Go
/*
|
|
Copyright 2020 Roblox Corporation
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
v1 "github.com/containerd/cgroups/stats/v1"
|
|
v2 "github.com/containerd/cgroups/v2/stats"
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/cio"
|
|
"github.com/containerd/typeurl"
|
|
"github.com/hashicorp/go-hclog"
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/nomad/client/stats"
|
|
hstats "github.com/hashicorp/nomad/helper/stats"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
)
|
|
|
|
// taskHandle should store all relevant runtime information
|
|
// such as process ID if this is a local task or other meta
|
|
// data if this driver deals with external APIs
|
|
type taskHandle struct {
|
|
// stateLock syncs access to all fields below
|
|
stateLock sync.RWMutex
|
|
|
|
logger hclog.Logger
|
|
taskConfig *drivers.TaskConfig
|
|
procState drivers.TaskState
|
|
startedAt time.Time
|
|
completedAt time.Time
|
|
exitResult *drivers.ExitResult
|
|
totalCpuStats *stats.CpuStats
|
|
userCpuStats *stats.CpuStats
|
|
systemCpuStats *stats.CpuStats
|
|
containerName string
|
|
container containerd.Container
|
|
task containerd.Task
|
|
}
|
|
|
|
func (h *taskHandle) TaskStatus(ctxContainerd context.Context) *drivers.TaskStatus {
|
|
h.stateLock.RLock()
|
|
defer h.stateLock.RUnlock()
|
|
|
|
h.procState = drivers.TaskStateExited
|
|
|
|
isRunning, err := h.IsRunning(ctxContainerd)
|
|
if err != nil {
|
|
h.procState = drivers.TaskStateUnknown
|
|
} else if isRunning {
|
|
h.procState = drivers.TaskStateRunning
|
|
}
|
|
|
|
return &drivers.TaskStatus{
|
|
ID: h.taskConfig.ID,
|
|
Name: h.taskConfig.Name,
|
|
State: h.procState,
|
|
StartedAt: h.startedAt,
|
|
CompletedAt: h.completedAt,
|
|
ExitResult: h.exitResult,
|
|
DriverAttributes: map[string]string{
|
|
"containerName": h.containerName,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (h *taskHandle) IsRunning(ctxContainerd context.Context) (bool, error) {
|
|
h.stateLock.RLock()
|
|
defer h.stateLock.RUnlock()
|
|
|
|
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
|
defer cancel()
|
|
|
|
status, err := h.task.Status(ctxWithTimeout)
|
|
if err != nil {
|
|
return false, fmt.Errorf("Error in getting task status: %v", err)
|
|
}
|
|
|
|
return (status.Status == containerd.Running), nil
|
|
}
|
|
|
|
func (h *taskHandle) run(ctxContainerd context.Context) {
|
|
h.stateLock.Lock()
|
|
defer h.stateLock.Unlock()
|
|
|
|
// Every executor runs this init at creation for stats
|
|
if err := hstats.Init(); err != nil {
|
|
h.logger.Error("unable to initialize stats", "error", err)
|
|
}
|
|
|
|
// Sleep for 5 seconds to allow h.task.Wait() to kick in.
|
|
// TODO: Use goroutine and a channel to synchronize this, instead of sleep.
|
|
time.Sleep(5 * time.Second)
|
|
|
|
h.task.Start(ctxContainerd)
|
|
}
|
|
|
|
// exec launches a new process in a running container.
|
|
func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
|
|
defer opts.Stdout.Close()
|
|
defer opts.Stderr.Close()
|
|
|
|
spec, err := h.container.Spec(ctxContainerd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pspec := spec.Process
|
|
pspec.Terminal = opts.Tty
|
|
pspec.Args = opts.Command
|
|
execID, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cioOpts := []cio.Opt{cio.WithStreams(opts.Stdin, opts.Stdout, opts.Stderr)}
|
|
if opts.Tty {
|
|
cioOpts = append(cioOpts, cio.WithTerminal)
|
|
}
|
|
ioCreator := cio.NewCreator(cioOpts...)
|
|
|
|
process, err := h.task.Exec(ctxContainerd, execID[:8], pspec, ioCreator)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case s, ok := <-opts.ResizeCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
if err = h.task.Resize(ctxContainerd, uint32(s.Width), uint32(s.Height)); err != nil {
|
|
h.logger.Error("Failed to resize terminal", "error", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
defer process.Delete(ctxContainerd)
|
|
|
|
statusC, err := process.Wait(ctxContainerd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := process.Start(ctxContainerd); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var code uint32
|
|
status := <-statusC
|
|
code, _, err = status.Result()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return &drivers.ExitResult{
|
|
ExitCode: int(code),
|
|
}, nil
|
|
|
|
}
|
|
|
|
func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error {
|
|
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := h.task.Kill(ctxWithTimeout, signal); err != nil {
|
|
return err
|
|
}
|
|
|
|
// timeout = 5 seconds, passed by nomad client
|
|
// TODO: Make timeout configurable in task_config. This will allow users to set a higher timeout
|
|
// if they need more time for their container to shutdown gracefully.
|
|
time.Sleep(timeout)
|
|
|
|
status, err := h.task.Status(ctxWithTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if status.Status != containerd.Running {
|
|
h.logger.Info("Task is not running anymore, no need to SIGKILL")
|
|
return nil
|
|
}
|
|
|
|
return h.task.Kill(ctxWithTimeout, syscall.SIGKILL)
|
|
}
|
|
|
|
func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
|
|
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
|
defer cancel()
|
|
|
|
if _, err := h.task.Delete(ctxWithTimeout); err != nil {
|
|
return err
|
|
}
|
|
if err := h.container.Delete(ctxWithTimeout, containerd.WithSnapshotCleanup); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
|
|
ch := make(chan *drivers.TaskResourceUsage)
|
|
|
|
go h.handleStats(ch, ctx, ctxContainerd, interval)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) {
|
|
defer close(ch)
|
|
|
|
timer := time.NewTimer(0)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
timer.Reset(interval)
|
|
}
|
|
|
|
// Get containerd task metric
|
|
metric, err := h.task.Metrics(ctxContainerd)
|
|
if err != nil {
|
|
h.logger.Error("Failed to get task metric:", "error", err)
|
|
return
|
|
}
|
|
|
|
anydata, err := typeurl.UnmarshalAny(metric.Data)
|
|
if err != nil {
|
|
h.logger.Error("Failed to unmarshal metric data:", "error", err)
|
|
return
|
|
}
|
|
|
|
var taskResourceUsage *drivers.TaskResourceUsage
|
|
|
|
switch data := anydata.(type) {
|
|
case *v1.Metrics:
|
|
taskResourceUsage = h.getV1TaskResourceUsage(data)
|
|
case *v2.Metrics:
|
|
taskResourceUsage = h.getV2TaskResourceUsage(data)
|
|
default:
|
|
h.logger.Error("Cannot convert metric data to cgroups.Metrics")
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case ch <- taskResourceUsage:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Convert containerd V1 task metrics to TaskResourceUsage.
|
|
func (h *taskHandle) getV1TaskResourceUsage(metrics *v1.Metrics) *drivers.TaskResourceUsage {
|
|
totalPercent := h.totalCpuStats.Percent(float64(metrics.CPU.Usage.Total))
|
|
cs := &drivers.CpuStats{
|
|
SystemMode: h.systemCpuStats.Percent(float64(metrics.CPU.Usage.Kernel)),
|
|
UserMode: h.userCpuStats.Percent(float64(metrics.CPU.Usage.User)),
|
|
Percent: totalPercent,
|
|
TotalTicks: h.totalCpuStats.TicksConsumed(totalPercent),
|
|
Measured: []string{"Percent", "System Mode", "User Mode"},
|
|
}
|
|
|
|
ms := &drivers.MemoryStats{
|
|
RSS: metrics.Memory.RSS,
|
|
Cache: metrics.Memory.Cache,
|
|
Swap: metrics.Memory.Swap.Usage,
|
|
Usage: metrics.Memory.Usage.Usage,
|
|
MaxUsage: metrics.Memory.Usage.Max,
|
|
Measured: []string{"RSS", "Cache", "Swap", "Usage"},
|
|
}
|
|
|
|
ts := time.Now().UTC().UnixNano()
|
|
return &drivers.TaskResourceUsage{
|
|
ResourceUsage: &drivers.ResourceUsage{
|
|
CpuStats: cs,
|
|
MemoryStats: ms,
|
|
},
|
|
Timestamp: ts,
|
|
}
|
|
}
|
|
|
|
// Convert containerd V2 task metrics to TaskResourceUsage.
|
|
func (h *taskHandle) getV2TaskResourceUsage(metrics *v2.Metrics) *drivers.TaskResourceUsage {
|
|
totalPercent := h.totalCpuStats.Percent(float64(metrics.CPU.SystemUsec + metrics.CPU.UserUsec))
|
|
cs := &drivers.CpuStats{
|
|
SystemMode: h.systemCpuStats.Percent(float64(metrics.CPU.SystemUsec)),
|
|
UserMode: h.userCpuStats.Percent(float64(metrics.CPU.UserUsec)),
|
|
Percent: totalPercent,
|
|
TotalTicks: h.totalCpuStats.TicksConsumed(totalPercent),
|
|
Measured: []string{"Percent", "System Mode", "User Mode"},
|
|
}
|
|
|
|
ms := &drivers.MemoryStats{
|
|
Swap: metrics.Memory.SwapUsage,
|
|
Usage: metrics.Memory.Usage,
|
|
Measured: []string{"Swap", "Usage"},
|
|
}
|
|
|
|
ts := time.Now().UTC().UnixNano()
|
|
return &drivers.TaskResourceUsage{
|
|
ResourceUsage: &drivers.ResourceUsage{
|
|
CpuStats: cs,
|
|
MemoryStats: ms,
|
|
},
|
|
Timestamp: ts,
|
|
}
|
|
}
|
|
func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error {
|
|
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
|
defer cancel()
|
|
|
|
return h.task.Kill(ctxWithTimeout, sig.(syscall.Signal))
|
|
}
|