Driver and handle updates.

This commit is contained in:
Shishir Mahajan 2020-05-11 18:02:03 -07:00
parent a8bab44acf
commit 433b9b30fb
No known key found for this signature in database
GPG Key ID: D41782E7688DEC4A
2 changed files with 47 additions and 31 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"syscall"
"time"
"github.com/containerd/containerd"
@ -312,6 +313,8 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
container: container,
task: task,
}
driverState := TaskState{
@ -324,7 +327,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
}
d.tasks.Set(cfg.ID, h)
go h.run()
go h.run(d.ctxContainerd)
return handle, nil, nil
}
@ -359,7 +362,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
d.tasks.Set(handle.Config.ID, h)
go h.run()
go h.run(d.ctxContainerd)
return nil
}
@ -379,6 +382,23 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
defer close(ch)
var result *drivers.ExitResult
exitStatusCh, err := handle.task.Wait(d.ctxContainerd)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
}
} else {
status := <-exitStatusCh
code, _, err := status.Result()
if err != nil {
d.logger.Error(err.Error())
return
}
result = &drivers.ExitResult{
ExitCode: int(code),
}
}
for {
select {
case <-ctx.Done():
@ -397,7 +417,10 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e
return drivers.ErrTaskNotFound
}
if err := handle.shutdown(timeout, signal); err != nil {
d.logger.Info("StopTask signal: %s", signal)
d.logger.Info("StopTask timeout: %v", timeout)
if err := handle.shutdown(d.ctxContainerd, syscall.SIGTERM); err != nil {
return fmt.Errorf("Shutdown failed: %v", err)
}
@ -415,7 +438,7 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
return fmt.Errorf("cannot destroy running task")
}
if err := handle.cleanup(); err != nil {
if err := handle.cleanup(d.ctxContainerd); err != nil {
return err
}

View File

@ -2,11 +2,12 @@ package containerd
import (
"context"
"fmt"
"os"
"sync"
"syscall"
"time"
"github.com/containerd/containerd"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/plugins/drivers"
)
@ -25,6 +26,8 @@ type taskHandle struct {
completedAt time.Time
exitResult *drivers.ExitResult
containerName string
container containerd.Container
task containerd.Task
}
func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
@ -50,37 +53,27 @@ func (h *taskHandle) IsRunning() bool {
return h.procState == drivers.TaskStateRunning
}
func (h *taskHandle) run() {
h.stateLock.Lock()
if h.exitResult == nil {
h.exitResult = &drivers.ExitResult{}
}
h.stateLock.Unlock()
// TODO: wait for your task to complete and upate its state.
//ps, err := h.exec.Wait(context.Background())
func (h *taskHandle) run(ctxContainerd context.Context) {
h.stateLock.Lock()
defer h.stateLock.Unlock()
err := fmt.Errorf("Hello test error")
// Sleep for 5 seconds to allow h.task.Wait() to kick in.
time.Sleep(5 * time.Second)
if err != nil {
h.exitResult.Err = err
h.procState = drivers.TaskStateUnknown
h.completedAt = time.Now()
return
h.task.Start(ctxContainerd)
}
func (h *taskHandle) shutdown(ctxContainerd context.Context, signal syscall.Signal) error {
return h.task.Kill(ctxContainerd, signal)
}
func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
if err := h.container.Delete(ctxContainerd, containerd.WithSnapshotCleanup); err != nil {
return err
}
if _, err := h.task.Delete(ctxContainerd); err != nil {
return err
}
h.procState = drivers.TaskStateExited
//h.exitResult.ExitCode = ps.ExitCode
//h.exitResult.Signal = ps.Signal
//h.completedAt = ps.Time
}
func (h *taskHandle) shutdown(timeout time.Duration, signal string) error {
return nil
}
func (h *taskHandle) cleanup() error {
return nil
}