RecoverTask: Fix issues.

This commit is contained in:
Shishir Mahajan 2020-05-15 13:46:29 -07:00
parent 6e3436679d
commit c449b1a1d1
No known key found for this signature in database
GPG Key ID: D41782E7688DEC4A
2 changed files with 22 additions and 29 deletions

View File

@ -28,6 +28,14 @@ func (d *Driver) createContainer(image containerd.Image, containerName, containe
) )
} }
func (d *Driver) loadContainer(id string) (containerd.Container, error) {
return d.client.LoadContainer(d.ctxContainerd, id)
}
func (d *Driver) createTask(container containerd.Container) (containerd.Task, error) { func (d *Driver) createTask(container containerd.Container) (containerd.Task, error) {
return container.NewTask(d.ctxContainerd, cio.NewCreator(cio.WithStdio)) return container.NewTask(d.ctxContainerd, cio.NewCreator(cio.WithStdio))
} }
func (d *Driver) getTask(container containerd.Container) (containerd.Task, error) {
return container.Task(d.ctxContainerd, cio.Load)
}

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"strings"
"syscall" "syscall"
"time" "time"
@ -99,9 +98,7 @@ type TaskConfig struct {
// recovery. // recovery.
type TaskState struct { type TaskState struct {
StartedAt time.Time StartedAt time.Time
Container containerd.Container
ContainerName string ContainerName string
Task containerd.Task
} }
type Driver struct { type Driver struct {
@ -301,14 +298,14 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
return nil, nil, fmt.Errorf("Error in creating container: %v", err) return nil, nil, fmt.Errorf("Error in creating container: %v", err)
} }
d.logger.Info(fmt.Sprintf("Successfully created container with name: %s, ID: %s and snapshot with ID: %s", containerName, container.ID(), containerSnapshotName)) d.logger.Info(fmt.Sprintf("Successfully created container with name: %s", containerName))
task, err := d.createTask(container) task, err := d.createTask(container)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("Error in creating task: %v", err) return nil, nil, fmt.Errorf("Error in creating task: %v", err)
} }
d.logger.Info(fmt.Sprintf("Task with %s ID created successfully.", task.ID())) d.logger.Info(fmt.Sprintf("Successfully created task with ID: %s", task.ID()))
h := &taskHandle{ h := &taskHandle{
taskConfig: cfg, taskConfig: cfg,
@ -322,9 +319,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
driverState := TaskState{ driverState := TaskState{
StartedAt: h.startedAt, StartedAt: h.startedAt,
Container: container,
ContainerName: containerName, ContainerName: containerName,
Task: task,
} }
if err := handle.SetDriverState(&driverState); err != nil { if err := handle.SetDriverState(&driverState); err != nil {
@ -356,32 +351,19 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to decode driver config: %v", err) return fmt.Errorf("failed to decode driver config: %v", err)
} }
image, err := d.pullImage(driverConfig.Image) container, err := d.loadContainer(taskState.ContainerName)
if err != nil { if err != nil {
return fmt.Errorf("Error in recovering image: %v", err) return fmt.Errorf("Error in recovering container: %v", err)
} }
var container containerd.Container task, err := d.getTask(container)
containerSnapshotName := fmt.Sprintf("%s-snapshot", taskState.ContainerName)
container, err = d.createContainer(image, taskState.ContainerName, containerSnapshotName, d.config.ContainerdRuntime)
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "already exists") { return fmt.Errorf("Error in recovering task: %v", err)
return fmt.Errorf("Error in recovering container: %v", err)
}
container = taskState.Container
} }
status, err := taskState.Task.Status(d.ctxContainerd) status, err := task.Status(d.ctxContainerd)
if err != nil { if err != nil {
return err return fmt.Errorf("Error in recovering task status: %v", err)
}
if status.Status != containerd.Running {
taskState.Task, err = d.createTask(container)
if err != nil {
return fmt.Errorf("Error in recovering task: %v", err)
}
} }
h := &taskHandle{ h := &taskHandle{
@ -390,15 +372,18 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
startedAt: taskState.StartedAt, startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{}, exitResult: &drivers.ExitResult{},
logger: d.logger, logger: d.logger,
container: taskState.Container, container: container,
containerName: taskState.ContainerName, containerName: taskState.ContainerName,
task: taskState.Task, task: task,
} }
d.tasks.Set(handle.Config.ID, h) d.tasks.Set(handle.Config.ID, h)
if status.Status != containerd.Running {
if status.Status == containerd.Stopped {
go h.run(d.ctxContainerd) go h.run(d.ctxContainerd)
} }
d.logger.Info(fmt.Sprintf("Task with ID: %s recovered successfully.", handle.Config.ID))
return nil return nil
} }