More driver and handle updates.

This commit is contained in:
Shishir Mahajan 2020-05-11 14:40:01 -07:00
parent 883b5f0397
commit 589b30429b
No known key found for this signature in database
GPG Key ID: D41782E7688DEC4A
3 changed files with 60 additions and 173 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"path/filepath"
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
@ -13,11 +12,11 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared/hclspec" "github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/moby/moby/pkg/namesgenerator"
) )
const ( const (
@ -95,19 +94,8 @@ type TaskConfig struct {
// This information is needed to rebuild the task state and handler during // This information is needed to rebuild the task state and handler during
// recovery. // recovery.
type TaskState struct { type TaskState struct {
ReattachConfig *structs.ReattachConfig StartedAt time.Time
TaskConfig *drivers.TaskConfig ContainerName string
StartedAt time.Time
// TODO: add any extra important values that must be persisted in order
// to restore a task.
//
// The plugin keeps track of its running tasks in a in-memory data
// structure. If the plugin crashes, this data will be lost, so Nomad
// will respawn a new instance of the plugin and try to restore its
// in-memory representation of the running tasks using the RecoverTask()
// method below.
Pid int
} }
type Driver struct { type Driver struct {
@ -300,61 +288,21 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
handle := drivers.NewTaskHandle(taskHandleVersion) handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg handle.Config = cfg
// TODO: implement driver specific mechanism to start the task. // Generate a random container name using docker namesgenerator package.
// // https://github.com/moby/moby/blob/master/pkg/namesgenerator/names-generator.go
// Once the task is started you will need to store any relevant runtime containerName := namesgenerator.GetRandomName(1)
// information in a taskHandle and TaskState. The taskHandle will be
// stored in-memory in the plugin and will be used to interact with the
// task.
//
// The TaskState will be returned to the Nomad client inside a
// drivers.TaskHandle instance. This TaskHandle will be sent back to plugin
// if the task ever needs to be recovered, so the TaskState should contain
// enough information to handle that.
//
// In the example below we use an executor to fork a process to run our
// greeter. The executor is then stored in the handle so we can access it
// later and the the plugin.Client is used to generate a reattach
// configuration that can be used to recover communication with the task.
executorConfig := &executor.ExecutorConfig{
LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"),
LogLevel: "debug",
}
exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Image)
execCmd := &executor.ExecCommand{
Cmd: "/bin/bash",
Args: []string{"-c", echoCmd},
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
}
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
}
h := &taskHandle{ h := &taskHandle{
exec: exec, containerName: containerName,
pid: ps.Pid, taskConfig: cfg,
pluginClient: pluginClient, procState: drivers.TaskStateRunning,
taskConfig: cfg, startedAt: time.Now().Round(time.Millisecond),
procState: drivers.TaskStateRunning, logger: d.logger,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
} }
driverState := TaskState{ driverState := TaskState{
ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), StartedAt: h.startedAt,
Pid: ps.Pid, ContainerName: containerName,
TaskConfig: cfg,
StartedAt: h.startedAt,
} }
if err := handle.SetDriverState(&driverState); err != nil { if err := handle.SetDriverState(&driverState); err != nil {
@ -382,38 +330,20 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
} }
var driverConfig TaskConfig var driverConfig TaskConfig
if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { if err := handle.Config.DecodeDriverConfig(&driverConfig); err != nil {
return fmt.Errorf("failed to decode driver config: %v", err) return fmt.Errorf("failed to decode driver config: %v", err)
} }
// TODO: implement driver specific logic to recover a task.
//
// Recovering a task involves recreating and storing a taskHandle as if the
// task was just started.
//
// In the example below we use the executor to re-attach to the process
// that was created when the task first started.
plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
}
execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger)
if err != nil {
return fmt.Errorf("failed to reattach to executor: %v", err)
}
h := &taskHandle{ h := &taskHandle{
exec: execImpl, taskConfig: handle.Config,
pid: taskState.Pid, procState: drivers.TaskStateRunning,
pluginClient: pluginClient, startedAt: taskState.StartedAt,
taskConfig: taskState.TaskConfig, exitResult: &drivers.ExitResult{},
procState: drivers.TaskStateRunning, logger: d.logger,
startedAt: taskState.StartedAt, containerName: taskState.ContainerName,
exitResult: &drivers.ExitResult{},
} }
d.tasks.Set(taskState.TaskConfig.ID, h) d.tasks.Set(handle.Config.ID, h)
go h.run() go h.run()
return nil return nil
@ -435,28 +365,6 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
defer close(ch) defer close(ch)
var result *drivers.ExitResult var result *drivers.ExitResult
// TODO: implement driver specific logic to notify Nomad the task has been
// completed and what was the exit result.
//
// When a result is sent in the result channel Nomad will stop the task and
// emit an event that an operator can use to get an insight on why the task
// stopped.
//
// In the example below we block and wait until the executor finishes
// running, at which point we send the exit code and signal in the result
// channel.
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
}
} else {
result = &drivers.ExitResult{
ExitCode: ps.ExitCode,
Signal: ps.Signal,
}
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -475,20 +383,8 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e
return drivers.ErrTaskNotFound return drivers.ErrTaskNotFound
} }
// TODO: implement driver specific logic to stop a task. if err := handle.shutdown(timeout, signal); err != nil {
// return fmt.Errorf("Shutdown failed: %v", err)
// The StopTask function is expected to stop a running task by sending the
// given signal to it. If the task does not stop during the given timeout,
// the driver must forcefully kill the task.
//
// In the example below we let the executor handle the task shutdown
// process for us, but you might need to customize this for your own
// implementation.
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
} }
return nil return nil
@ -505,20 +401,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
return fmt.Errorf("cannot destroy running task") return fmt.Errorf("cannot destroy running task")
} }
// TODO: implement driver specific logic to destroy a complete task. if err := handle.cleanup(); err != nil {
// return err
// Destroying a task includes removing any resources used by task and any
// local references in the plugin. If force is set to true the task should
// be destroyed even if it's currently running.
//
// In the example below we use the executor to force shutdown the task
// (timeout equals 0).
if !handle.pluginClient.Exited() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
handle.pluginClient.Kill()
} }
d.tasks.Delete(taskID) d.tasks.Delete(taskID)
@ -542,15 +426,7 @@ func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Dur
return nil, drivers.ErrTaskNotFound return nil, drivers.ErrTaskNotFound
} }
// TODO: implement driver specific logic to send task stats. return handle.stats(ctx, interval)
//
// This function returns a channel that Nomad will use to listen for task
// stats (e.g., CPU and memory usage) in a given interval. It should send
// stats until the context is canceled or the task stops running.
//
// In the example below we use the Stats function provided by the executor,
// but you can build a set of functions similar to the fingerprint process.
return handle.exec.Stats(ctx, interval)
} }
// TaskEvents returns a channel that the plugin can use to emit task related events. // TaskEvents returns a channel that the plugin can use to emit task related events.
@ -566,8 +442,6 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
return drivers.ErrTaskNotFound return drivers.ErrTaskNotFound
} }
// TODO: implement driver specific signal handling logic.
//
// The given signal must be forwarded to the target taskID. If this plugin // The given signal must be forwarded to the target taskID. If this plugin
// doesn't support receiving signals (capability SendSignals is set to // doesn't support receiving signals (capability SendSignals is set to
// false) you can just return nil. // false) you can just return nil.
@ -578,7 +452,7 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID)
} }
return handle.exec.Signal(sig) return handle.signal(sig)
} }
// ExecTask returns the result of executing the given command inside a task. // ExecTask returns the result of executing the given command inside a task.

View File

@ -2,13 +2,12 @@ package containerd
import ( import (
"context" "context"
"strconv" "fmt"
"os"
"sync" "sync"
"time" "time"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
) )
@ -19,17 +18,13 @@ type taskHandle struct {
// stateLock syncs access to all fields below // stateLock syncs access to all fields below
stateLock sync.RWMutex stateLock sync.RWMutex
logger hclog.Logger logger hclog.Logger
exec executor.Executor taskConfig *drivers.TaskConfig
pluginClient *plugin.Client procState drivers.TaskState
taskConfig *drivers.TaskConfig startedAt time.Time
procState drivers.TaskState completedAt time.Time
startedAt time.Time exitResult *drivers.ExitResult
completedAt time.Time containerName string
exitResult *drivers.ExitResult
// TODO: add any extra relevant information about the task.
pid int
} }
func (h *taskHandle) TaskStatus() *drivers.TaskStatus { func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
@ -44,7 +39,7 @@ func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
CompletedAt: h.completedAt, CompletedAt: h.completedAt,
ExitResult: h.exitResult, ExitResult: h.exitResult,
DriverAttributes: map[string]string{ DriverAttributes: map[string]string{
"pid": strconv.Itoa(h.pid), "containerName": h.containerName,
}, },
} }
} }
@ -63,10 +58,12 @@ func (h *taskHandle) run() {
h.stateLock.Unlock() h.stateLock.Unlock()
// TODO: wait for your task to complete and upate its state. // TODO: wait for your task to complete and upate its state.
ps, err := h.exec.Wait(context.Background()) //ps, err := h.exec.Wait(context.Background())
h.stateLock.Lock() h.stateLock.Lock()
defer h.stateLock.Unlock() defer h.stateLock.Unlock()
err := fmt.Errorf("Hello test error")
if err != nil { if err != nil {
h.exitResult.Err = err h.exitResult.Err = err
h.procState = drivers.TaskStateUnknown h.procState = drivers.TaskStateUnknown
@ -74,7 +71,23 @@ func (h *taskHandle) run() {
return return
} }
h.procState = drivers.TaskStateExited h.procState = drivers.TaskStateExited
h.exitResult.ExitCode = ps.ExitCode //h.exitResult.ExitCode = ps.ExitCode
h.exitResult.Signal = ps.Signal //h.exitResult.Signal = ps.Signal
h.completedAt = ps.Time //h.completedAt = ps.Time
}
func (h *taskHandle) shutdown(timeout time.Duration, signal string) error {
return nil
}
func (h *taskHandle) cleanup() error {
return nil
}
func (h *taskHandle) stats(ctx context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
return nil, nil
}
func (h *taskHandle) signal(sig os.Signal) error {
return nil
} }

2
go.mod
View File

@ -34,7 +34,7 @@ require (
github.com/hashicorp/nomad v0.10.1 github.com/hashicorp/nomad v0.10.1
github.com/hashicorp/nomad/api v0.0.0-20191203164002-b31573ae7206 // indirect github.com/hashicorp/nomad/api v0.0.0-20191203164002-b31573ae7206 // indirect
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b // indirect github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b // indirect
github.com/moby/moby v1.13.1 // indirect github.com/moby/moby v1.13.1
github.com/mrunalp/fileutils v0.0.0-20171103030105-7d4729fb3618 // indirect github.com/mrunalp/fileutils v0.0.0-20171103030105-7d4729fb3618 // indirect
github.com/opencontainers/runc v1.0.0-rc8.0.20190611121236-6cc515888830 // indirect github.com/opencontainers/runc v1.0.0-rc8.0.20190611121236-6cc515888830 // indirect
github.com/opencontainers/selinux v1.3.1 // indirect github.com/opencontainers/selinux v1.3.1 // indirect