Code restructuring.

This commit is contained in:
Shishir Mahajan 2020-06-08 12:06:15 -07:00
parent 993c76e590
commit 93a430f920
No known key found for this signature in database
GPG Key ID: D41782E7688DEC4A
2 changed files with 60 additions and 56 deletions

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/namespaces"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-hclog"
@ -517,66 +516,12 @@ func (d *Driver) SignalTask(taskID string, signal string) error {
}
func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
defer opts.Stdout.Close()
defer opts.Stderr.Close()
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
spec, err := handle.container.Spec(d.ctxContainerd)
if err != nil {
return nil, err
}
pspec := spec.Process
pspec.Terminal = opts.Tty
pspec.Args = opts.Command
execID := getRandomID(8)
cioOpts := []cio.Opt{cio.WithStreams(opts.Stdin, opts.Stdout, opts.Stderr)}
if opts.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
ioCreator := cio.NewCreator(cioOpts...)
process, err := handle.task.Exec(d.ctxContainerd, execID, pspec, ioCreator)
if err != nil {
return nil, err
}
defer process.Delete(d.ctxContainerd)
statusC, err := process.Wait(d.ctxContainerd)
if err != nil {
return nil, err
}
if err := process.Start(d.ctxContainerd); err != nil {
return nil, err
}
var code uint32
status := <-statusC
code, _, err = status.Result()
if err != nil {
return nil, err
}
go func() {
for {
select {
case <-ctx.Done():
return
}
}
}()
return &drivers.ExitResult{
ExitCode: int(code),
}, nil
return handle.exec(ctx, d.ctxContainerd, taskID, opts)
}
// ExecTask returns the result of executing the given command inside a task.

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/plugins/drivers"
)
@ -80,6 +81,64 @@ func (h *taskHandle) run(ctxContainerd context.Context) {
h.task.Start(ctxContainerd)
}
func (h *taskHandle) exec(ctx, ctxContainerd context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
defer opts.Stdout.Close()
defer opts.Stderr.Close()
spec, err := h.container.Spec(ctxContainerd)
if err != nil {
return nil, err
}
pspec := spec.Process
pspec.Terminal = opts.Tty
pspec.Args = opts.Command
execID := getRandomID(8)
cioOpts := []cio.Opt{cio.WithStreams(opts.Stdin, opts.Stdout, opts.Stderr)}
if opts.Tty {
cioOpts = append(cioOpts, cio.WithTerminal)
}
ioCreator := cio.NewCreator(cioOpts...)
process, err := h.task.Exec(ctxContainerd, execID, pspec, ioCreator)
if err != nil {
return nil, err
}
defer process.Delete(ctxContainerd)
statusC, err := process.Wait(ctxContainerd)
if err != nil {
return nil, err
}
if err := process.Start(ctxContainerd); err != nil {
return nil, err
}
var code uint32
status := <-statusC
code, _, err = status.Result()
if err != nil {
return nil, err
}
go func() {
for {
select {
case <-ctx.Done():
return
}
}
}()
return &drivers.ExitResult{
ExitCode: int(code),
}, nil
}
func (h *taskHandle) shutdown(ctxContainerd context.Context, timeout time.Duration, signal syscall.Signal) error {
if err := h.task.Kill(ctxContainerd, signal); err != nil {
return err