Merge pull request #110 from Roblox/stdout_stderr_issue
Fix: Stderr and stdout of existing processes are lost after a restart of nomad
This commit is contained in:
commit
5168f15971
@ -341,12 +341,7 @@ func (d *Driver) loadContainer(id string) (containerd.Container, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) {
|
func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) {
|
||||||
stdout, err := openFIFO(stdoutPath)
|
stdout, stderr, err := getStdoutStderrFifos(stdoutPath, stderrPath)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
stderr, err := openFIFO(stderrPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -357,9 +352,14 @@ func (d *Driver) createTask(container containerd.Container, stdoutPath, stderrPa
|
|||||||
return container.NewTask(ctxWithTimeout, cio.NewCreator(cio.WithStreams(nil, stdout, stderr)))
|
return container.NewTask(ctxWithTimeout, cio.NewCreator(cio.WithStreams(nil, stdout, stderr)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Driver) getTask(container containerd.Container) (containerd.Task, error) {
|
func (d *Driver) getTask(container containerd.Container, stdoutPath, stderrPath string) (containerd.Task, error) {
|
||||||
|
stdout, stderr, err := getStdoutStderrFifos(stdoutPath, stderrPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
ctxWithTimeout, cancel := context.WithTimeout(d.ctxContainerd, 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
return container.Task(ctxWithTimeout, cio.Load)
|
return container.Task(ctxWithTimeout, cio.NewAttach(cio.WithStreams(nil, stdout, stderr)))
|
||||||
}
|
}
|
||||||
|
@ -207,6 +207,8 @@ type TaskConfig struct {
|
|||||||
type TaskState struct {
|
type TaskState struct {
|
||||||
StartedAt time.Time
|
StartedAt time.Time
|
||||||
ContainerName string
|
ContainerName string
|
||||||
|
StdoutPath string
|
||||||
|
StderrPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
@ -496,6 +498,8 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
|
|||||||
driverState := TaskState{
|
driverState := TaskState{
|
||||||
StartedAt: h.startedAt,
|
StartedAt: h.startedAt,
|
||||||
ContainerName: containerName,
|
ContainerName: containerName,
|
||||||
|
StdoutPath: cfg.StdoutPath,
|
||||||
|
StderrPath: cfg.StderrPath,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := handle.SetDriverState(&driverState); err != nil {
|
if err := handle.SetDriverState(&driverState); err != nil {
|
||||||
@ -539,7 +543,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
|||||||
return fmt.Errorf("Error in recovering container: %v", err)
|
return fmt.Errorf("Error in recovering container: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
task, err := d.getTask(container)
|
task, err := d.getTask(container, taskState.StdoutPath, taskState.StderrPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error in recovering task: %v", err)
|
return fmt.Errorf("Error in recovering task: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,20 @@ func buildMountpoint(mountType, mountTarget, mountSource string, mountOptions []
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getStdoutStderrFifos return the container's stdout and stderr FIFO's.
|
||||||
|
func getStdoutStderrFifos(stdoutPath, stderrPath string) (*os.File, *os.File, error) {
|
||||||
|
stdout, err := openFIFO(stdoutPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stderr, err := openFIFO(stderrPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return stdout, stderr, nil
|
||||||
|
}
|
||||||
|
|
||||||
// FIFO's are named pipes in linux.
|
// FIFO's are named pipes in linux.
|
||||||
// openFIFO() opens the nomad task stdout/stderr pipes and returns the fd.
|
// openFIFO() opens the nomad task stdout/stderr pipes and returns the fd.
|
||||||
func openFIFO(path string) (*os.File, error) {
|
func openFIFO(path string) (*os.File, error) {
|
||||||
|
@ -29,6 +29,7 @@ main() {
|
|||||||
setup
|
setup
|
||||||
echo "INFO: Checking if nomad-driver-containerd is up and running, and nomad is ready to accept jobs."
|
echo "INFO: Checking if nomad-driver-containerd is up and running, and nomad is ready to accept jobs."
|
||||||
is_containerd_driver_active
|
is_containerd_driver_active
|
||||||
|
is_nomad_ready
|
||||||
|
|
||||||
run_tests $@
|
run_tests $@
|
||||||
exit $PASS_STATUS
|
exit $PASS_STATUS
|
||||||
@ -202,4 +203,27 @@ is_containerd_driver_active() {
|
|||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is_nomad_ready() {
|
||||||
|
i="0"
|
||||||
|
while test $i -lt 5
|
||||||
|
do
|
||||||
|
set +e
|
||||||
|
status=$(curl -s http://127.0.0.1:4646/v1/nodes|jq '.[0] ."Status"')
|
||||||
|
rc=$?
|
||||||
|
set -e
|
||||||
|
if [[ $rc -eq 0 && $status = \"ready\" ]]; then
|
||||||
|
echo "INFO: nomad is ready to accept jobs."
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
echo "INFO: nomad is initializing, sleep for 4 seconds."
|
||||||
|
sleep 4s
|
||||||
|
i=$[$i+1]
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ $i -ge 5 ]; then
|
||||||
|
echo "ERROR: nomad didn't come up. exit 1."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
main "$@"
|
main "$@"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user