From 589b30429bd8130d482164955d0485b9f04dfcc5 Mon Sep 17 00:00:00 2001 From: Shishir Mahajan Date: Mon, 11 May 2020 14:40:01 -0700 Subject: [PATCH] More driver and handle updates. --- containerd/driver.go | 180 +++++++------------------------------------ containerd/handle.go | 51 +++++++----- go.mod | 2 +- 3 files changed, 60 insertions(+), 173 deletions(-) diff --git a/containerd/driver.go b/containerd/driver.go index 4ce5010..8e04ab8 100644 --- a/containerd/driver.go +++ b/containerd/driver.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "path/filepath" "time" "github.com/containerd/containerd" @@ -13,11 +12,11 @@ import ( "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/drivers/shared/eventer" - "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared/hclspec" "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/moby/moby/pkg/namesgenerator" ) const ( @@ -95,19 +94,8 @@ type TaskConfig struct { // This information is needed to rebuild the task state and handler during // recovery. type TaskState struct { - ReattachConfig *structs.ReattachConfig - TaskConfig *drivers.TaskConfig - StartedAt time.Time - - // TODO: add any extra important values that must be persisted in order - // to restore a task. - // - // The plugin keeps track of its running tasks in a in-memory data - // structure. If the plugin crashes, this data will be lost, so Nomad - // will respawn a new instance of the plugin and try to restore its - // in-memory representation of the running tasks using the RecoverTask() - // method below. - Pid int + StartedAt time.Time + ContainerName string } type Driver struct { @@ -300,61 +288,21 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg - // TODO: implement driver specific mechanism to start the task. - // - // Once the task is started you will need to store any relevant runtime - // information in a taskHandle and TaskState. The taskHandle will be - // stored in-memory in the plugin and will be used to interact with the - // task. - // - // The TaskState will be returned to the Nomad client inside a - // drivers.TaskHandle instance. This TaskHandle will be sent back to plugin - // if the task ever needs to be recovered, so the TaskState should contain - // enough information to handle that. - // - // In the example below we use an executor to fork a process to run our - // greeter. The executor is then stored in the handle so we can access it - // later and the the plugin.Client is used to generate a reattach - // configuration that can be used to recover communication with the task. - executorConfig := &executor.ExecutorConfig{ - LogFile: filepath.Join(cfg.TaskDir().Dir, "executor.out"), - LogLevel: "debug", - } - - exec, pluginClient, err := executor.CreateExecutor(d.logger, d.nomadConfig, executorConfig) - if err != nil { - return nil, nil, fmt.Errorf("failed to create executor: %v", err) - } - - echoCmd := fmt.Sprintf(`echo "%s"`, driverConfig.Image) - execCmd := &executor.ExecCommand{ - Cmd: "/bin/bash", - Args: []string{"-c", echoCmd}, - StdoutPath: cfg.StdoutPath, - StderrPath: cfg.StderrPath, - } - - ps, err := exec.Launch(execCmd) - if err != nil { - pluginClient.Kill() - return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err) - } + // Generate a random container name using docker namesgenerator package. + // https://github.com/moby/moby/blob/master/pkg/namesgenerator/names-generator.go + containerName := namesgenerator.GetRandomName(1) h := &taskHandle{ - exec: exec, - pid: ps.Pid, - pluginClient: pluginClient, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger, + containerName: containerName, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, } driverState := TaskState{ - ReattachConfig: structs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), - Pid: ps.Pid, - TaskConfig: cfg, - StartedAt: h.startedAt, + StartedAt: h.startedAt, + ContainerName: containerName, } if err := handle.SetDriverState(&driverState); err != nil { @@ -382,38 +330,20 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { } var driverConfig TaskConfig - if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { + if err := handle.Config.DecodeDriverConfig(&driverConfig); err != nil { return fmt.Errorf("failed to decode driver config: %v", err) } - // TODO: implement driver specific logic to recover a task. - // - // Recovering a task involves recreating and storing a taskHandle as if the - // task was just started. - // - // In the example below we use the executor to re-attach to the process - // that was created when the task first started. - plugRC, err := structs.ReattachConfigToGoPlugin(taskState.ReattachConfig) - if err != nil { - return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) - } - - execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, d.logger) - if err != nil { - return fmt.Errorf("failed to reattach to executor: %v", err) - } - h := &taskHandle{ - exec: execImpl, - pid: taskState.Pid, - pluginClient: pluginClient, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateRunning, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, + taskConfig: handle.Config, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitResult: &drivers.ExitResult{}, + logger: d.logger, + containerName: taskState.ContainerName, } - d.tasks.Set(taskState.TaskConfig.ID, h) + d.tasks.Set(handle.Config.ID, h) go h.run() return nil @@ -435,28 +365,6 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr defer close(ch) var result *drivers.ExitResult - // TODO: implement driver specific logic to notify Nomad the task has been - // completed and what was the exit result. - // - // When a result is sent in the result channel Nomad will stop the task and - // emit an event that an operator can use to get an insight on why the task - // stopped. - // - // In the example below we block and wait until the executor finishes - // running, at which point we send the exit code and signal in the result - // channel. - ps, err := handle.exec.Wait(ctx) - if err != nil { - result = &drivers.ExitResult{ - Err: fmt.Errorf("executor: error waiting on process: %v", err), - } - } else { - result = &drivers.ExitResult{ - ExitCode: ps.ExitCode, - Signal: ps.Signal, - } - } - for { select { case <-ctx.Done(): @@ -475,20 +383,8 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e return drivers.ErrTaskNotFound } - // TODO: implement driver specific logic to stop a task. - // - // The StopTask function is expected to stop a running task by sending the - // given signal to it. If the task does not stop during the given timeout, - // the driver must forcefully kill the task. - // - // In the example below we let the executor handle the task shutdown - // process for us, but you might need to customize this for your own - // implementation. - if err := handle.exec.Shutdown(signal, timeout); err != nil { - if handle.pluginClient.Exited() { - return nil - } - return fmt.Errorf("executor Shutdown failed: %v", err) + if err := handle.shutdown(timeout, signal); err != nil { + return fmt.Errorf("Shutdown failed: %v", err) } return nil @@ -505,20 +401,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { return fmt.Errorf("cannot destroy running task") } - // TODO: implement driver specific logic to destroy a complete task. - // - // Destroying a task includes removing any resources used by task and any - // local references in the plugin. If force is set to true the task should - // be destroyed even if it's currently running. - // - // In the example below we use the executor to force shutdown the task - // (timeout equals 0). - if !handle.pluginClient.Exited() { - if err := handle.exec.Shutdown("", 0); err != nil { - handle.logger.Error("destroying executor failed", "err", err) - } - - handle.pluginClient.Kill() + if err := handle.cleanup(); err != nil { + return err } d.tasks.Delete(taskID) @@ -542,15 +426,7 @@ func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Dur return nil, drivers.ErrTaskNotFound } - // TODO: implement driver specific logic to send task stats. - // - // This function returns a channel that Nomad will use to listen for task - // stats (e.g., CPU and memory usage) in a given interval. It should send - // stats until the context is canceled or the task stops running. - // - // In the example below we use the Stats function provided by the executor, - // but you can build a set of functions similar to the fingerprint process. - return handle.exec.Stats(ctx, interval) + return handle.stats(ctx, interval) } // TaskEvents returns a channel that the plugin can use to emit task related events. @@ -566,8 +442,6 @@ func (d *Driver) SignalTask(taskID string, signal string) error { return drivers.ErrTaskNotFound } - // TODO: implement driver specific signal handling logic. - // // The given signal must be forwarded to the target taskID. If this plugin // doesn't support receiving signals (capability SendSignals is set to // false) you can just return nil. @@ -578,7 +452,7 @@ func (d *Driver) SignalTask(taskID string, signal string) error { d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID) } - return handle.exec.Signal(sig) + return handle.signal(sig) } // ExecTask returns the result of executing the given command inside a task. diff --git a/containerd/handle.go b/containerd/handle.go index 4406d36..1f1e6c6 100644 --- a/containerd/handle.go +++ b/containerd/handle.go @@ -2,13 +2,12 @@ package containerd import ( "context" - "strconv" + "fmt" + "os" "sync" "time" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -19,17 +18,13 @@ type taskHandle struct { // stateLock syncs access to all fields below stateLock sync.RWMutex - logger hclog.Logger - exec executor.Executor - pluginClient *plugin.Client - taskConfig *drivers.TaskConfig - procState drivers.TaskState - startedAt time.Time - completedAt time.Time - exitResult *drivers.ExitResult - - // TODO: add any extra relevant information about the task. - pid int + logger hclog.Logger + taskConfig *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult + containerName string } func (h *taskHandle) TaskStatus() *drivers.TaskStatus { @@ -44,7 +39,7 @@ func (h *taskHandle) TaskStatus() *drivers.TaskStatus { CompletedAt: h.completedAt, ExitResult: h.exitResult, DriverAttributes: map[string]string{ - "pid": strconv.Itoa(h.pid), + "containerName": h.containerName, }, } } @@ -63,10 +58,12 @@ func (h *taskHandle) run() { h.stateLock.Unlock() // TODO: wait for your task to complete and upate its state. - ps, err := h.exec.Wait(context.Background()) + //ps, err := h.exec.Wait(context.Background()) h.stateLock.Lock() defer h.stateLock.Unlock() + err := fmt.Errorf("Hello test error") + if err != nil { h.exitResult.Err = err h.procState = drivers.TaskStateUnknown @@ -74,7 +71,23 @@ func (h *taskHandle) run() { return } h.procState = drivers.TaskStateExited - h.exitResult.ExitCode = ps.ExitCode - h.exitResult.Signal = ps.Signal - h.completedAt = ps.Time + //h.exitResult.ExitCode = ps.ExitCode + //h.exitResult.Signal = ps.Signal + //h.completedAt = ps.Time +} + +func (h *taskHandle) shutdown(timeout time.Duration, signal string) error { + return nil +} + +func (h *taskHandle) cleanup() error { + return nil +} + +func (h *taskHandle) stats(ctx context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { + return nil, nil +} + +func (h *taskHandle) signal(sig os.Signal) error { + return nil } diff --git a/go.mod b/go.mod index 3b3f1ff..ca13f1f 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/hashicorp/nomad v0.10.1 github.com/hashicorp/nomad/api v0.0.0-20191203164002-b31573ae7206 // indirect github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b // indirect - github.com/moby/moby v1.13.1 // indirect + github.com/moby/moby v1.13.1 github.com/mrunalp/fileutils v0.0.0-20171103030105-7d4729fb3618 // indirect github.com/opencontainers/runc v1.0.0-rc8.0.20190611121236-6cc515888830 // indirect github.com/opencontainers/selinux v1.3.1 // indirect