651 lines
20 KiB
Go
651 lines
20 KiB
Go
/*
|
|
Copyright 2020 Roblox Corporation
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/hashicorp/consul-template/signals"
|
|
"github.com/hashicorp/go-hclog"
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/client/stats"
|
|
"github.com/hashicorp/nomad/drivers/shared/eventer"
|
|
"github.com/hashicorp/nomad/plugins/base"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
|
"github.com/hashicorp/nomad/plugins/shared/structs"
|
|
"github.com/moby/moby/pkg/namesgenerator"
|
|
)
|
|
|
|
const (
|
|
// pluginName is the name of the plugin
|
|
// this is used for logging and (along with the version) for uniquely
|
|
// identifying plugin binaries fingerprinted by the client
|
|
pluginName = "containerd-driver"
|
|
|
|
// pluginVersion allows the client to identify and use newer versions of
|
|
// an installed plugin
|
|
pluginVersion = "v0.1.0"
|
|
|
|
// fingerprintPeriod is the interval at which the plugin will send
|
|
// fingerprint responses
|
|
fingerprintPeriod = 30 * time.Second
|
|
|
|
// taskHandleVersion is the version of task handle which this plugin sets
|
|
// and understands how to decode
|
|
// this is used to allow modification and migration of the task schema
|
|
// used by the plugin
|
|
taskHandleVersion = 1
|
|
)
|
|
|
|
var (
|
|
// pluginInfo describes the plugin
|
|
pluginInfo = &base.PluginInfoResponse{
|
|
Type: base.PluginTypeDriver,
|
|
PluginApiVersions: []string{drivers.ApiVersion010},
|
|
PluginVersion: pluginVersion,
|
|
Name: pluginName,
|
|
}
|
|
|
|
// configSpec is the specification of the plugin's configuration
|
|
// this is used to validate the configuration specified for the plugin
|
|
// on the client.
|
|
// this is not global, but can be specified on a per-client basis.
|
|
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
|
"enabled": hclspec.NewDefault(
|
|
hclspec.NewAttr("enabled", "bool", false),
|
|
hclspec.NewLiteral("true"),
|
|
),
|
|
"containerd_runtime": hclspec.NewAttr("containerd_runtime", "string", true),
|
|
"stats_interval": hclspec.NewAttr("stats_interval", "string", false),
|
|
})
|
|
|
|
// taskConfigSpec is the specification of the plugin's configuration for
|
|
// a task
|
|
// this is used to validate the configuration specified for the plugin
|
|
// when a job is submitted.
|
|
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
|
"image": hclspec.NewAttr("image", "string", true),
|
|
"command": hclspec.NewAttr("command", "string", false),
|
|
"args": hclspec.NewAttr("args", "list(string)", false),
|
|
"cap_add": hclspec.NewAttr("cap_add", "list(string)", false),
|
|
"cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false),
|
|
"devices": hclspec.NewAttr("devices", "list(string)", false),
|
|
"privileged": hclspec.NewAttr("privileged", "bool", false),
|
|
"seccomp": hclspec.NewAttr("seccomp", "bool", false),
|
|
"seccomp_profile": hclspec.NewAttr("seccomp_profile", "string", false),
|
|
"readonly_rootfs": hclspec.NewAttr("readonly_rootfs", "bool", false),
|
|
"host_network": hclspec.NewAttr("host_network", "bool", false),
|
|
"mounts": hclspec.NewBlockList("mounts", hclspec.NewObject(map[string]*hclspec.Spec{
|
|
"type": hclspec.NewDefault(
|
|
hclspec.NewAttr("type", "string", false),
|
|
hclspec.NewLiteral("\"volume\""),
|
|
),
|
|
"target": hclspec.NewAttr("target", "string", true),
|
|
"source": hclspec.NewAttr("source", "string", false),
|
|
"options": hclspec.NewAttr("options", "list(string)", false),
|
|
})),
|
|
})
|
|
|
|
// capabilities indicates what optional features this driver supports
|
|
// this should be set according to the target run time.
|
|
// https://godoc.org/github.com/hashicorp/nomad/plugins/drivers#Capabilities
|
|
capabilities = &drivers.Capabilities{
|
|
SendSignals: true,
|
|
Exec: true,
|
|
FSIsolation: drivers.FSIsolationNone,
|
|
NetIsolationModes: []drivers.NetIsolationMode{drivers.NetIsolationModeGroup, drivers.NetIsolationModeTask},
|
|
}
|
|
)
|
|
|
|
// Config contains configuration information for the plugin
|
|
type Config struct {
|
|
Enabled bool `codec:"enabled"`
|
|
ContainerdRuntime string `codec:"containerd_runtime"`
|
|
StatsInterval string `codec:"stats_interval"`
|
|
}
|
|
|
|
// Volume, bind, and tmpfs type mounts are supported.
|
|
// Mount contains configuration information about a mountpoint.
|
|
type Mount struct {
|
|
Type string `codec:"type"`
|
|
Target string `codec:"target"`
|
|
Source string `codec:"source"`
|
|
Options []string `codec:"options"`
|
|
}
|
|
|
|
// TaskConfig contains configuration information for a task that runs with
|
|
// this plugin
|
|
type TaskConfig struct {
|
|
Image string `codec:"image"`
|
|
Command string `codec:"command"`
|
|
Args []string `codec:"args"`
|
|
CapAdd []string `codec:"cap_add"`
|
|
CapDrop []string `codec:"cap_drop"`
|
|
Devices []string `codec:"devices"`
|
|
Seccomp bool `codec:"seccomp"`
|
|
SeccompProfile string `codec:"seccomp_profile"`
|
|
Privileged bool `codec:"privileged"`
|
|
ReadOnlyRootfs bool `codec:"readonly_rootfs"`
|
|
HostNetwork bool `codec:"host_network"`
|
|
Mounts []Mount `codec:"mounts"`
|
|
}
|
|
|
|
// TaskState is the runtime state which is encoded in the handle returned to
|
|
// Nomad client.
|
|
// This information is needed to rebuild the task state and handler during
|
|
// recovery.
|
|
type TaskState struct {
|
|
StartedAt time.Time
|
|
ContainerName string
|
|
}
|
|
|
|
type Driver struct {
|
|
// eventer is used to handle multiplexing of TaskEvents calls such that an
|
|
// event can be broadcast to all callers
|
|
eventer *eventer.Eventer
|
|
|
|
// config is the plugin configuration set by the SetConfig RPC
|
|
config *Config
|
|
|
|
// nomadConfig is the client config from Nomad
|
|
nomadConfig *base.ClientDriverConfig
|
|
|
|
// tasks is the in memory datastore mapping taskIDs to driver handles
|
|
tasks *taskStore
|
|
|
|
// ctx is the context for the driver. It is passed to other subsystems to
|
|
// coordinate shutdown
|
|
ctx context.Context
|
|
|
|
// signalShutdown is called when the driver is shutting down and cancels
|
|
// the ctx passed to any subsystems
|
|
signalShutdown context.CancelFunc
|
|
|
|
// logger will log to the Nomad agent
|
|
logger log.Logger
|
|
|
|
// context for containerd
|
|
ctxContainerd context.Context
|
|
|
|
// containerd client
|
|
client *containerd.Client
|
|
}
|
|
|
|
// NewPlugin returns a new containerd driver plugin
|
|
func NewPlugin(logger log.Logger) drivers.DriverPlugin {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
logger = logger.Named(pluginName)
|
|
|
|
// This will create a new containerd client which will talk to
|
|
// default containerd socket path.
|
|
client, err := containerd.New("/run/containerd/containerd.sock")
|
|
if err != nil {
|
|
logger.Error("Error in creating containerd client", "err", err)
|
|
return nil
|
|
}
|
|
|
|
// Calls to containerd API are namespaced.
|
|
// "nomad" is the namespace that will be used for all nomad-driver-containerd
|
|
// related containerd API calls.
|
|
ctxContainerd := namespaces.WithNamespace(context.Background(), "nomad")
|
|
|
|
return &Driver{
|
|
eventer: eventer.NewEventer(ctx, logger),
|
|
config: &Config{},
|
|
tasks: newTaskStore(),
|
|
ctx: ctx,
|
|
ctxContainerd: ctxContainerd,
|
|
client: client,
|
|
signalShutdown: cancel,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// PluginInfo returns information describing the plugin.
|
|
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
|
|
return pluginInfo, nil
|
|
}
|
|
|
|
// ConfigSchema returns the plugin configuration schema.
|
|
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
|
|
return configSpec, nil
|
|
}
|
|
|
|
// SetConfig is called by the client to pass the configuration for the plugin.
|
|
func (d *Driver) SetConfig(cfg *base.Config) error {
|
|
var config Config
|
|
if len(cfg.PluginConfig) != 0 {
|
|
if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Save the configuration to the plugin
|
|
d.config = &config
|
|
|
|
// Save the Nomad agent configuration
|
|
if cfg.AgentConfig != nil {
|
|
d.nomadConfig = cfg.AgentConfig.Driver
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TaskConfigSchema returns the HCL schema for the configuration of a task.
|
|
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
|
|
return taskConfigSpec, nil
|
|
}
|
|
|
|
// Capabilities returns the features supported by the driver.
|
|
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
|
|
return capabilities, nil
|
|
}
|
|
|
|
// Fingerprint returns a channel that will be used to send health information
|
|
// and other driver specific node attributes.
|
|
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
|
|
ch := make(chan *drivers.Fingerprint)
|
|
go d.handleFingerprint(ctx, ch)
|
|
return ch, nil
|
|
}
|
|
|
|
// handleFingerprint manages the channel and the flow of fingerprint data.
|
|
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
|
|
defer close(ch)
|
|
|
|
// Nomad expects the initial fingerprint to be sent immediately
|
|
ticker := time.NewTimer(0)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-d.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// after the initial fingerprint we can set the proper fingerprint
|
|
// period
|
|
ticker.Reset(fingerprintPeriod)
|
|
ch <- d.buildFingerprint()
|
|
}
|
|
}
|
|
}
|
|
|
|
// buildFingerprint returns the driver's fingerprint data
|
|
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
|
fp := &drivers.Fingerprint{
|
|
Attributes: map[string]*structs.Attribute{},
|
|
Health: drivers.HealthStateHealthy,
|
|
HealthDescription: drivers.DriverHealthy,
|
|
}
|
|
|
|
isRunning, err := d.isContainerdRunning()
|
|
if err != nil {
|
|
d.logger.Error("Error in buildFingerprint(): failed to get containerd status: %v", err)
|
|
fp.Health = drivers.HealthStateUndetected
|
|
fp.HealthDescription = "Undetected"
|
|
return fp
|
|
}
|
|
|
|
if !isRunning {
|
|
fp.Health = drivers.HealthStateUnhealthy
|
|
fp.HealthDescription = "Unhealthy"
|
|
return fp
|
|
}
|
|
|
|
// Get containerd version
|
|
version, err := d.getContainerdVersion()
|
|
if err != nil {
|
|
d.logger.Warn("Error in buildFingerprint(): failed to get containerd version: %v", err)
|
|
return fp
|
|
}
|
|
|
|
fp.Attributes["driver.containerd.containerd_version"] = structs.NewStringAttribute(version.Version)
|
|
fp.Attributes["driver.containerd.containerd_revision"] = structs.NewStringAttribute(version.Revision)
|
|
return fp
|
|
}
|
|
|
|
// StartTask returns a task handle and a driver network if necessary.
|
|
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
|
|
if _, ok := d.tasks.Get(cfg.ID); ok {
|
|
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
|
|
}
|
|
|
|
var driverConfig TaskConfig
|
|
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
|
|
}
|
|
|
|
if driverConfig.HostNetwork && cfg.NetworkIsolation != nil {
|
|
return nil, nil, fmt.Errorf("host_network and bridge network mode are mutually exclusive, and only one of them should be set")
|
|
}
|
|
|
|
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
|
|
handle := drivers.NewTaskHandle(taskHandleVersion)
|
|
handle.Config = cfg
|
|
|
|
// Generate a random container name using docker namesgenerator package.
|
|
// https://github.com/moby/moby/blob/master/pkg/namesgenerator/names-generator.go
|
|
containerName := cfg.AllocID[:8] + "_" + namesgenerator.GetRandomName(1)
|
|
|
|
image, err := d.pullImage(driverConfig.Image)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Error in pulling image: %v", err)
|
|
}
|
|
|
|
d.logger.Info(fmt.Sprintf("Successfully pulled %s image\n", image.Name()))
|
|
|
|
// Setup environment variables.
|
|
var env []string
|
|
var secretsDir, taskDir, allocDir string
|
|
for key, val := range cfg.Env {
|
|
if skipOverride(key) {
|
|
continue
|
|
}
|
|
if key == "NOMAD_SECRETS_DIR" {
|
|
secretsDir = val
|
|
}
|
|
if key == "NOMAD_TASK_DIR" {
|
|
taskDir = val
|
|
}
|
|
if key == "NOMAD_ALLOC_DIR" {
|
|
allocDir = val
|
|
}
|
|
env = append(env, fmt.Sprintf("%s=%s", key, val))
|
|
}
|
|
|
|
containerSnapshotName := fmt.Sprintf("%s-snapshot", containerName)
|
|
var netnsPath string
|
|
if cfg.NetworkIsolation != nil && cfg.NetworkIsolation.Path != "" {
|
|
netnsPath = cfg.NetworkIsolation.Path
|
|
}
|
|
|
|
// memory and cpu are coming from the resources stanza of the nomad job.
|
|
// https://www.nomadproject.io/docs/job-specification/resources
|
|
memoryLimit := cfg.Resources.LinuxResources.MemoryLimitBytes
|
|
cpuShares := cfg.Resources.LinuxResources.CPUShares
|
|
|
|
container, err := d.createContainer(image, containerName, containerSnapshotName, d.config.ContainerdRuntime, netnsPath, secretsDir, taskDir, allocDir, env, memoryLimit, cpuShares, &driverConfig)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Error in creating container: %v", err)
|
|
}
|
|
|
|
d.logger.Info(fmt.Sprintf("Successfully created container with name: %s", containerName))
|
|
task, err := d.createTask(container, cfg.StdoutPath, cfg.StderrPath)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("Error in creating task: %v", err)
|
|
}
|
|
|
|
d.logger.Info(fmt.Sprintf("Successfully created task with ID: %s", task.ID()))
|
|
|
|
h := &taskHandle{
|
|
taskConfig: cfg,
|
|
procState: drivers.TaskStateRunning,
|
|
startedAt: time.Now().Round(time.Millisecond),
|
|
logger: d.logger,
|
|
totalCpuStats: stats.NewCpuStats(),
|
|
userCpuStats: stats.NewCpuStats(),
|
|
systemCpuStats: stats.NewCpuStats(),
|
|
container: container,
|
|
containerName: containerName,
|
|
task: task,
|
|
}
|
|
|
|
driverState := TaskState{
|
|
StartedAt: h.startedAt,
|
|
ContainerName: containerName,
|
|
}
|
|
|
|
if err := handle.SetDriverState(&driverState); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
|
|
}
|
|
|
|
d.tasks.Set(cfg.ID, h)
|
|
go h.run(d.ctxContainerd)
|
|
return handle, nil, nil
|
|
}
|
|
|
|
// skipOverride determines whether the environment variable (key) needs an override or not.
|
|
func skipOverride(key string) bool {
|
|
skipOverrideList := []string{"PATH"}
|
|
for _, k := range skipOverrideList {
|
|
if key == k {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// RecoverTask recreates the in-memory state of a task from a TaskHandle.
|
|
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
|
if handle == nil {
|
|
return fmt.Errorf("error: handle cannot be nil")
|
|
}
|
|
|
|
if _, ok := d.tasks.Get(handle.Config.ID); ok {
|
|
return nil
|
|
}
|
|
|
|
var taskState TaskState
|
|
if err := handle.GetDriverState(&taskState); err != nil {
|
|
return fmt.Errorf("failed to decode task state from handle: %v", err)
|
|
}
|
|
|
|
var driverConfig TaskConfig
|
|
if err := handle.Config.DecodeDriverConfig(&driverConfig); err != nil {
|
|
return fmt.Errorf("failed to decode driver config: %v", err)
|
|
}
|
|
|
|
container, err := d.loadContainer(taskState.ContainerName)
|
|
if err != nil {
|
|
return fmt.Errorf("Error in recovering container: %v", err)
|
|
}
|
|
|
|
task, err := d.getTask(container)
|
|
if err != nil {
|
|
return fmt.Errorf("Error in recovering task: %v", err)
|
|
}
|
|
|
|
status, err := task.Status(d.ctxContainerd)
|
|
if err != nil {
|
|
return fmt.Errorf("Error in recovering task status: %v", err)
|
|
}
|
|
|
|
h := &taskHandle{
|
|
taskConfig: handle.Config,
|
|
procState: drivers.TaskStateRunning,
|
|
startedAt: taskState.StartedAt,
|
|
exitResult: &drivers.ExitResult{},
|
|
logger: d.logger,
|
|
totalCpuStats: stats.NewCpuStats(),
|
|
userCpuStats: stats.NewCpuStats(),
|
|
systemCpuStats: stats.NewCpuStats(),
|
|
container: container,
|
|
containerName: taskState.ContainerName,
|
|
task: task,
|
|
}
|
|
|
|
d.tasks.Set(handle.Config.ID, h)
|
|
|
|
if status.Status == containerd.Stopped {
|
|
go h.run(d.ctxContainerd)
|
|
}
|
|
|
|
d.logger.Info(fmt.Sprintf("Task with ID: %s recovered successfully.", handle.Config.ID))
|
|
return nil
|
|
}
|
|
|
|
// WaitTask returns a channel used to notify Nomad when a task exits.
|
|
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
ch := make(chan *drivers.ExitResult)
|
|
go d.handleWait(ctx, handle, ch)
|
|
return ch, nil
|
|
}
|
|
|
|
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
|
|
defer close(ch)
|
|
var result *drivers.ExitResult
|
|
|
|
exitStatusCh, err := handle.task.Wait(d.ctxContainerd)
|
|
if err != nil {
|
|
result = &drivers.ExitResult{
|
|
Err: fmt.Errorf("executor: error waiting on process: %v", err),
|
|
}
|
|
} else {
|
|
status := <-exitStatusCh
|
|
code, _, err := status.Result()
|
|
if err != nil {
|
|
d.logger.Error(err.Error())
|
|
return
|
|
}
|
|
result = &drivers.ExitResult{
|
|
ExitCode: int(code),
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-d.ctx.Done():
|
|
return
|
|
case ch <- result:
|
|
}
|
|
}
|
|
}
|
|
|
|
// StopTask stops a running task with the given signal and within the timeout window.
|
|
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return drivers.ErrTaskNotFound
|
|
}
|
|
|
|
if err := handle.shutdown(d.ctxContainerd, timeout, syscall.SIGTERM); err != nil {
|
|
return fmt.Errorf("Shutdown failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DestroyTask cleans up and removes a task that has terminated.
|
|
func (d *Driver) DestroyTask(taskID string, force bool) error {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return drivers.ErrTaskNotFound
|
|
}
|
|
|
|
isRunning, err := handle.IsRunning(d.ctxContainerd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if isRunning && !force {
|
|
return fmt.Errorf("cannot destroy running task")
|
|
}
|
|
|
|
if err := handle.cleanup(d.ctxContainerd); err != nil {
|
|
return err
|
|
}
|
|
|
|
d.tasks.Delete(taskID)
|
|
return nil
|
|
}
|
|
|
|
// InspectTask returns detailed status information for the referenced taskID.
|
|
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
return handle.TaskStatus(d.ctxContainerd), nil
|
|
}
|
|
|
|
// TaskStats returns a channel which the driver should send stats to at the given interval.
|
|
func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
if d.config.StatsInterval != "" {
|
|
statsInterval, err := time.ParseDuration(d.config.StatsInterval)
|
|
if err != nil {
|
|
d.logger.Warn("Error parsing driver stats interval, fallback on default interval")
|
|
} else {
|
|
msg := fmt.Sprintf("Overriding client stats interval: %v with driver stats interval: %v", interval, d.config.StatsInterval)
|
|
d.logger.Debug(msg)
|
|
interval = statsInterval
|
|
}
|
|
}
|
|
|
|
return handle.stats(ctx, d.ctxContainerd, interval)
|
|
}
|
|
|
|
// TaskEvents returns a channel that the plugin can use to emit task related events.
|
|
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
|
|
return d.eventer.TaskEvents(ctx)
|
|
}
|
|
|
|
// SignalTask forwards a signal to a task.
|
|
// This is an optional capability.
|
|
func (d *Driver) SignalTask(taskID string, signal string) error {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return drivers.ErrTaskNotFound
|
|
}
|
|
|
|
// The given signal will be forwarded to the target taskID.
|
|
// Please checkout https://github.com/hashicorp/consul-template/blob/master/signals/signals_unix.go
|
|
// for a list of supported signals.
|
|
sig, ok := signals.SignalLookup[signal]
|
|
if !ok {
|
|
return fmt.Errorf("Invalid signal: %s", signal)
|
|
}
|
|
|
|
return handle.signal(d.ctxContainerd, sig)
|
|
}
|
|
|
|
// ExecTaskStreaming returns the result of executing the given command inside a task.
|
|
func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
|
|
handle, ok := d.tasks.Get(taskID)
|
|
if !ok {
|
|
return nil, drivers.ErrTaskNotFound
|
|
}
|
|
|
|
return handle.exec(ctx, d.ctxContainerd, taskID, opts)
|
|
}
|
|
|
|
// ExecTask returns the result of executing the given command inside a task.
|
|
// This is an optional capability.
|
|
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
|
|
// TODO: implement driver specific logic to execute commands in a task.
|
|
return nil, fmt.Errorf("This driver does not support exec")
|
|
}
|