containerd/pkg/process/io.go

440 lines
9.6 KiB
Go

//go:build !windows
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/hashicorp/go-multierror"
exec "golang.org/x/sys/execabs"
)
const binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
var bufPool = sync.Pool{
New: func() interface{} {
// setting to 4096 to align with PIPE_BUF
// http://man7.org/linux/man-pages/man7/pipe.7.html
buffer := make([]byte, 4096)
return &buffer
},
}
type processIO struct {
io runc.IO
uri *url.URL
copy bool
stdio stdio.Stdio
}
func (p *processIO) Close() error {
if p.io != nil {
return p.io.Close()
}
return nil
}
func (p *processIO) IO() runc.IO {
return p.io
}
func (p *processIO) Copy(ctx context.Context, wg *sync.WaitGroup) error {
if !p.copy {
return nil
}
var cwg sync.WaitGroup
if err := copyPipes(ctx, p.IO(), p.stdio.Stdin, p.stdio.Stdout, p.stdio.Stderr, wg, &cwg); err != nil {
return fmt.Errorf("unable to copy pipes: %w", err)
}
cwg.Wait()
return nil
}
func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdio) (*processIO, error) {
pio := &processIO{
stdio: stdio,
}
if stdio.IsNull() {
i, err := runc.NewNullIO()
if err != nil {
return nil, err
}
pio.io = i
return pio, nil
}
u, err := url.Parse(stdio.Stdout)
if err != nil {
return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
}
if u.Scheme == "" {
u.Scheme = "fifo"
}
pio.uri = u
switch u.Scheme {
case "fifo":
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "binary":
pio.io, err = NewBinaryIO(ctx, id, u)
case "file":
filePath := u.Path
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
return nil, err
}
var f *os.File
f, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
f.Close()
pio.stdio.Stdout = filePath
pio.stdio.Stderr = filePath
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
default:
return nil, fmt.Errorf("unknown STDIO scheme %s", u.Scheme)
}
if err != nil {
return nil, err
}
return pio, nil
}
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
var sameFile *countingWriteCloser
for _, i := range []struct {
name string
dest func(wc io.WriteCloser, rc io.Closer)
}{
{
name: stdout,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
}, {
name: stderr,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
log.G(ctx).Warn("error copying stderr")
}
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
},
} {
ok, err := fifo.IsFifo(i.name)
if err != nil {
return err
}
var (
fw io.WriteCloser
fr io.Closer
)
if ok {
if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil {
return fmt.Errorf("containerd-shim: opening w/o fifo %q failed: %w", i.name, err)
}
if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil {
return fmt.Errorf("containerd-shim: opening r/o fifo %q failed: %w", i.name, err)
}
} else {
if sameFile != nil {
sameFile.count++
i.dest(sameFile, nil)
continue
}
if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil {
return fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err)
}
if stdout == stderr {
sameFile = &countingWriteCloser{
WriteCloser: fw,
count: 1,
}
}
}
i.dest(fw, fr)
}
if stdin == "" {
return nil
}
f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
}
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(rio.Stdin(), f, *p)
rio.Stdin().Close()
f.Close()
}()
return nil
}
// countingWriteCloser masks io.Closer() until close has been invoked a certain number of times.
type countingWriteCloser struct {
io.WriteCloser
count int64
}
func (c *countingWriteCloser) Close() error {
if atomic.AddInt64(&c.count, -1) > 0 {
return nil
}
return c.WriteCloser.Close()
}
// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var closers []func() error
defer func() {
if err == nil {
return
}
result := multierror.Append(err)
for _, fn := range closers {
result = multierror.Append(result, fn())
}
err = multierror.Flatten(result)
}()
out, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
closers = append(closers, out.Close)
serr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
closers = append(closers, serr.Close)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)
cmd := NewBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
closers = append(closers, func() error { return cmd.Process.Kill() })
// close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}
// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
return &binaryIO{
cmd: cmd,
out: out,
err: serr,
}, nil
}
type binaryIO struct {
cmd *exec.Cmd
out, err *pipe
}
func (b *binaryIO) CloseAfterStart() error {
var (
result *multierror.Error
)
for _, v := range []*pipe{b.out, b.err} {
if v != nil {
if err := v.r.Close(); err != nil {
result = multierror.Append(result, err)
}
}
}
return result.ErrorOrNil()
}
func (b *binaryIO) Close() error {
var (
result *multierror.Error
)
for _, v := range []*pipe{b.out, b.err} {
if v != nil {
if err := v.Close(); err != nil {
result = multierror.Append(result, err)
}
}
}
if err := b.cancel(); err != nil {
result = multierror.Append(result, err)
}
return result.ErrorOrNil()
}
func (b *binaryIO) cancel() error {
if b.cmd == nil || b.cmd.Process == nil {
return nil
}
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := b.cmd.Process.Signal(syscall.SIGTERM); err != nil {
result := multierror.Append(fmt.Errorf("failed to send SIGTERM: %w", err))
log.L.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim")
if err := b.cmd.Process.Kill(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to kill process after faulty SIGTERM: %w", err))
}
return result.ErrorOrNil()
}
done := make(chan error, 1)
go func() {
done <- b.cmd.Wait()
}()
select {
case err := <-done:
return err
case <-time.After(binaryIOProcTermTimeout):
log.L.Warn("failed to wait for shim logger process to exit, killing")
err := b.cmd.Process.Kill()
if err != nil {
return fmt.Errorf("failed to kill shim logger process: %w", err)
}
return nil
}
}
func (b *binaryIO) Stdin() io.WriteCloser {
return nil
}
func (b *binaryIO) Stdout() io.ReadCloser {
return nil
}
func (b *binaryIO) Stderr() io.ReadCloser {
return nil
}
func (b *binaryIO) Set(cmd *exec.Cmd) {
if b.out != nil {
cmd.Stdout = b.out.w
}
if b.err != nil {
cmd.Stderr = b.err.w
}
}
func newPipe() (*pipe, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &pipe{
r: r,
w: w,
}, nil
}
type pipe struct {
r *os.File
w *os.File
}
func (p *pipe) Close() error {
var result *multierror.Error
if err := p.w.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close write pipe: %w", err))
}
if err := p.r.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close read pipe: %w", err))
}
return multierror.Prefix(result.ErrorOrNil(), "pipe:")
}