Shishir Mahajan 68af55ea5c TaskStats API.
Signed-off-by: Shishir Mahajan <smahajan@roblox.com>
2020-06-11 14:53:43 -07:00

261 lines
6.2 KiB
Go

package containerd
import (
"context"
"fmt"
"os"
"sync"
"syscall"
"time"
v1 "github.com/containerd/cgroups/stats/v1"
v2 "github.com/containerd/cgroups/v2/stats"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/cio"
"github.com/containerd/typeurl"
"github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/nomad/plugins/drivers"
)
// taskHandle should store all relevant runtime information
// such as process ID if this is a local task or other meta
// data if this driver deals with external APIs
type taskHandle struct {
// stateLock syncs access to all fields below
stateLock sync.RWMutex
logger hclog.Logger
taskConfig *drivers.TaskConfig
procState drivers.TaskState
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
containerName string
container containerd.Container
task containerd.Task
}
func (h *taskHandle) TaskStatus(ctxContainerd context.Context) *drivers.TaskStatus {
h.stateLock.RLock()
defer h.stateLock.RUnlock()
h.procState = drivers.TaskStateExited
isRunning, err := h.IsRunning(ctxContainerd)
if err != nil {
h.procState = drivers.TaskStateUnknown
} else if isRunning {
h.procState = drivers.TaskStateRunning
}
return &drivers.TaskStatus{
ID: h.taskConfig.ID,
Name: h.taskConfig.Name,
State: h.procState,
StartedAt: h.startedAt,
CompletedAt: h.completedAt,
ExitResult: h.exitResult,
DriverAttributes: map[string]string{
"containerName": h.containerName,
},
}
}
func (h *taskHandle) IsRunning(ctxContainerd context.Context) (bool, error) {
h.stateLock.RLock()
defer h.stateLock.RUnlock()
status, err := h.task.Status(ctxContainerd)
if err != nil {
return false, fmt.Errorf("Error in getting task status: %v", err)
}
return (status.Status == containerd.Running), nil
}
func (h *taskHandle) run(ctxContainerd context.Context) {
h.stateLock.Lock()
defer h.stateLock.Unlock()
// Sleep for 5 seconds to allow h.task.Wait() to kick in.
// TODO: Use goroutine and a channel to synchronize this, instead of sleep.
time.Sleep(5 * time.Second)
h.task.Start(ctxContainerd)
}
// exec launches a new process in a running container.
func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
defer opts.Stdout.Close()
defer opts.Stderr.Close()
spec, err := h.container.Spec(ctxContainerd)
if err != nil {
return nil, err
}
pspec := spec.Process
pspec.Terminal = opts.Tty
pspec.Args = opts.Command
execID, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
cioOpts := []cio.Opt{cio.WithStreams(opts.Stdin, opts.Stdout, opts.Stderr)}
if opts.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
ioCreator := cio.NewCreator(cioOpts...)
process, err := h.task.Exec(ctxContainerd, execID[:8], pspec, ioCreator)
if err != nil {
return nil, err
}
defer process.Delete(ctxContainerd)
statusC, err := process.Wait(ctxContainerd)
if err != nil {
return nil, err
}
if err := process.Start(ctxContainerd); err != nil {
return nil, err
}
var code uint32
status := <-statusC
code, _, err = status.Result()
if err != nil {
return nil, err
}
go func() {
for {
select {
case <-ctx.Done():
return
}
}
}()
return &drivers.ExitResult{
ExitCode: int(code),
}, nil
}
func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error {
if err := h.task.Kill(ctxContainerd, signal); err != nil {
return err
}
// timeout = 5 seconds, passed by nomad client
// TODO: Make timeout configurable in task_config. This will allow users to set a higher timeout
// if they need more time for their container to shutdown gracefully.
time.Sleep(timeout)
status, err := h.task.Status(ctxContainerd)
if err != nil {
return err
}
if status.Status != containerd.Running {
h.logger.Info("Task is not running anymore, no need to SIGKILL")
return nil
}
return h.task.Kill(ctxContainerd, syscall.SIGKILL)
}
func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
if _, err := h.task.Delete(ctxContainerd); err != nil {
return err
}
if err := h.container.Delete(ctxContainerd, containerd.WithSnapshotCleanup); err != nil {
return err
}
return nil
}
func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
ch := make(chan *drivers.TaskResourceUsage)
go h.handleStats(ch, ctx, ctxContainerd, interval)
return ch, nil
}
func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) {
statsInterval, err := time.ParseDuration("30s")
if err != nil {
h.logger.Info("Error in parsing time interval.")
return
}
interval = statsInterval
defer close(ch)
timer := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
h.logger.Info("HELLO: timer.C")
timer.Reset(interval)
}
// Get containerd task metric
metric, err := h.task.Metrics(ctxContainerd)
if err != nil {
h.logger.Error("Failed to get task metric:", "error", err)
return
}
h.logger.Info("HELLO: STEP 1")
anydata, err := typeurl.UnmarshalAny(metric.Data)
if err != nil {
h.logger.Error("Failed to unmarshal metric data:", "error", err)
}
var (
data *v1.Metrics
data2 *v2.Metrics
)
switch v := anydata.(type) {
case *v1.Metrics:
h.logger.Info("HELLO: v1.Metrics")
data = v
case *v2.Metrics:
h.logger.Info("HELLO: v2.Metrics")
data2 = v
default:
h.logger.Error("Cannot convert metric data to cgroups.Metrics")
return
}
h.logger.Info(fmt.Sprintf("V1 Metrics: %+v\n", data))
h.logger.Info(fmt.Sprintf("V2 Metrics: %+v\n", data2))
select {
case <-ctx.Done():
return
case ch <- h.getTaskResourceUsage(metric):
}
}
}
// Convert containerd task metric to TaskResourceUsage.
func (h *taskHandle) getTaskResourceUsage(metric *types.Metric) *drivers.TaskResourceUsage {
return nil
}
func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error {
return h.task.Kill(ctxContainerd, sig.(syscall.Signal))
}