Merge pull request #49 from Roblox/fix_issues
Fix issues in containerd-driver
This commit is contained in:
commit
0c1a1bb51d
@ -18,9 +18,11 @@ limitations under the License.
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/cio"
|
||||
@ -43,15 +45,24 @@ type ContainerConfig struct {
|
||||
}
|
||||
|
||||
func (d *Driver) isContainerdRunning() (bool, error) {
|
||||
return d.client.IsServing(d.ctxContainerd)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return d.client.IsServing(ctxWithTimeout)
|
||||
}
|
||||
|
||||
func (d *Driver) getContainerdVersion() (containerd.Version, error) {
|
||||
return d.client.Version(d.ctxContainerd)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return d.client.Version(ctxWithTimeout)
|
||||
}
|
||||
|
||||
func (d *Driver) pullImage(imageName string) (containerd.Image, error) {
|
||||
return d.client.Pull(d.ctxContainerd, imageName, containerd.WithPullUnpack)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 90*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return d.client.Pull(ctxWithTimeout, imageName, containerd.WithPullUnpack)
|
||||
}
|
||||
|
||||
func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskConfig) (containerd.Container, error) {
|
||||
@ -185,8 +196,11 @@ func (d *Driver) createContainer(containerConfig *ContainerConfig, config *TaskC
|
||||
opts = append(opts, oci.WithLinuxNamespace(specs.LinuxNamespace{Type: specs.NetworkNamespace, Path: containerConfig.NetworkNamespacePath}))
|
||||
}
|
||||
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return d.client.NewContainer(
|
||||
d.ctxContainerd,
|
||||
ctxWithTimeout,
|
||||
containerConfig.ContainerName,
|
||||
containerd.WithRuntime(d.config.ContainerdRuntime, nil),
|
||||
containerd.WithNewSnapshot(containerConfig.ContainerSnapshotName, containerConfig.Image),
|
||||
@ -205,7 +219,10 @@ func buildMountpoint(mountType, mountTarget, mountSource string, mountOptions []
|
||||
}
|
||||
|
||||
func (d *Driver) loadContainer(id string) (containerd.Container, error) {
|
||||
return d.client.LoadContainer(d.ctxContainerd, id)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return d.client.LoadContainer(ctxWithTimeout, id)
|
||||
}
|
||||
|
||||
func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) {
|
||||
@ -219,7 +236,10 @@ func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return container.NewTask(d.ctxContainerd, cio.NewCreator(cio.WithStreams(nil, stdout, stderr)))
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return container.NewTask(ctxWithTimeout, cio.NewCreator(cio.WithStreams(nil, stdout, stderr)))
|
||||
}
|
||||
|
||||
// FIFO's are named pipes in linux.
|
||||
@ -229,5 +249,8 @@ func openFIFO(path string) (*os.File, error) {
|
||||
}
|
||||
|
||||
func (d *Driver) getTask(container containerd.Container) (containerd.Task, error) {
|
||||
return container.Task(d.ctxContainerd, cio.Load)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return container.Task(ctxWithTimeout, cio.Load)
|
||||
}
|
||||
|
@ -307,14 +307,10 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
||||
}
|
||||
|
||||
isRunning, err := d.isContainerdRunning()
|
||||
if err != nil {
|
||||
d.logger.Error("Error in buildFingerprint(): failed to get containerd status: %v", err)
|
||||
fp.Health = drivers.HealthStateUndetected
|
||||
fp.HealthDescription = "Undetected"
|
||||
return fp
|
||||
}
|
||||
|
||||
if !isRunning {
|
||||
if err != nil || !isRunning {
|
||||
if err != nil {
|
||||
d.logger.Error("Error in buildFingerprint(): failed to get containerd status", "error", err)
|
||||
}
|
||||
fp.Health = drivers.HealthStateUnhealthy
|
||||
fp.HealthDescription = "Unhealthy"
|
||||
return fp
|
||||
@ -323,7 +319,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
|
||||
// Get containerd version
|
||||
version, err := d.getContainerdVersion()
|
||||
if err != nil {
|
||||
d.logger.Warn("Error in buildFingerprint(): failed to get containerd version: %v", err)
|
||||
d.logger.Warn("Error in buildFingerprint(): failed to get containerd version:", "error", err)
|
||||
return fp
|
||||
}
|
||||
|
||||
@ -398,13 +394,13 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
|
||||
return nil, nil, fmt.Errorf("Error in creating container: %v", err)
|
||||
}
|
||||
|
||||
d.logger.Info(fmt.Sprintf("Successfully created container with name: %s", containerName))
|
||||
d.logger.Info(fmt.Sprintf("Successfully created container with name: %s\n", containerName))
|
||||
task, err := d.createTask(container, cfg.StdoutPath, cfg.StderrPath)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Error in creating task: %v", err)
|
||||
}
|
||||
|
||||
d.logger.Info(fmt.Sprintf("Successfully created task with ID: %s", task.ID()))
|
||||
d.logger.Info(fmt.Sprintf("Successfully created task with ID: %s\n", task.ID()))
|
||||
|
||||
h := &taskHandle{
|
||||
taskConfig: cfg,
|
||||
@ -429,6 +425,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
|
||||
}
|
||||
|
||||
d.tasks.Set(cfg.ID, h)
|
||||
|
||||
go h.run(d.ctxContainerd)
|
||||
return handle, nil, nil
|
||||
}
|
||||
@ -474,7 +471,10 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||
return fmt.Errorf("Error in recovering task: %v", err)
|
||||
}
|
||||
|
||||
status, err := task.Status(d.ctxContainerd)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
status, err := task.Status(ctxWithTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error in recovering task status: %v", err)
|
||||
}
|
||||
@ -499,7 +499,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||
go h.run(d.ctxContainerd)
|
||||
}
|
||||
|
||||
d.logger.Info(fmt.Sprintf("Task with ID: %s recovered successfully.", handle.Config.ID))
|
||||
d.logger.Info(fmt.Sprintf("Task with ID: %s recovered successfully.\n", handle.Config.ID))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -522,17 +522,15 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
|
||||
exitStatusCh, err := handle.task.Wait(d.ctxContainerd)
|
||||
if err != nil {
|
||||
result = &drivers.ExitResult{
|
||||
Err: fmt.Errorf("executor: error waiting on process: %v", err),
|
||||
ExitCode: 255,
|
||||
Err: fmt.Errorf("executor: error waiting on process: %v", err),
|
||||
}
|
||||
} else {
|
||||
status := <-exitStatusCh
|
||||
code, _, err := status.Result()
|
||||
if err != nil {
|
||||
d.logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
result = &drivers.ExitResult{
|
||||
ExitCode: int(code),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
@ -607,7 +605,7 @@ func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Dur
|
||||
if err != nil {
|
||||
d.logger.Warn("Error parsing driver stats interval, fallback on default interval")
|
||||
} else {
|
||||
msg := fmt.Sprintf("Overriding client stats interval: %v with driver stats interval: %v", interval, d.config.StatsInterval)
|
||||
msg := fmt.Sprintf("Overriding client stats interval: %v with driver stats interval: %v\n", interval, d.config.StatsInterval)
|
||||
d.logger.Debug(msg)
|
||||
interval = statsInterval
|
||||
}
|
||||
|
@ -88,7 +88,10 @@ func (h *taskHandle) IsRunning(ctxContainerd context.Context) (bool, error) {
|
||||
h.stateLock.RLock()
|
||||
defer h.stateLock.RUnlock()
|
||||
|
||||
status, err := h.task.Status(ctxContainerd)
|
||||
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
status, err := h.task.Status(ctxWithTimeout)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error in getting task status: %v", err)
|
||||
}
|
||||
@ -175,7 +178,10 @@ func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opt
|
||||
}
|
||||
|
||||
func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error {
|
||||
if err := h.task.Kill(ctxContainerd, signal); err != nil {
|
||||
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := h.task.Kill(ctxWithTimeout, signal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -184,7 +190,7 @@ func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Durati
|
||||
// if they need more time for their container to shutdown gracefully.
|
||||
time.Sleep(timeout)
|
||||
|
||||
status, err := h.task.Status(ctxContainerd)
|
||||
status, err := h.task.Status(ctxWithTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -194,14 +200,17 @@ func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Durati
|
||||
return nil
|
||||
}
|
||||
|
||||
return h.task.Kill(ctxContainerd, syscall.SIGKILL)
|
||||
return h.task.Kill(ctxWithTimeout, syscall.SIGKILL)
|
||||
}
|
||||
|
||||
func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
|
||||
if _, err := h.task.Delete(ctxContainerd); err != nil {
|
||||
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if _, err := h.task.Delete(ctxWithTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.container.Delete(ctxContainerd, containerd.WithSnapshotCleanup); err != nil {
|
||||
if err := h.container.Delete(ctxWithTimeout, containerd.WithSnapshotCleanup); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -209,6 +218,7 @@ func (h *taskHandle) cleanup(ctxContainerd context.Context) error {
|
||||
|
||||
func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
|
||||
ch := make(chan *drivers.TaskResourceUsage)
|
||||
|
||||
go h.handleStats(ch, ctx, ctxContainerd, interval)
|
||||
|
||||
return ch, nil
|
||||
@ -216,6 +226,7 @@ func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Dur
|
||||
|
||||
func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) {
|
||||
defer close(ch)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
for {
|
||||
select {
|
||||
@ -315,5 +326,8 @@ func (h *taskHandle) getV2TaskResourceUsage(metrics *v2.Metrics) *drivers.TaskRe
|
||||
}
|
||||
}
|
||||
func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error {
|
||||
return h.task.Kill(ctxContainerd, sig.(syscall.Signal))
|
||||
ctxWithTimeout, cancel := context.WithTimeout(ctxContainerd, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return h.task.Kill(ctxWithTimeout, sig.(syscall.Signal))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user