package remote // import "github.com/docker/docker/libcontainerd/remote"

import (
	"context"
	"encoding/json"
	"io"
	"os"
	"path/filepath"
	"reflect"
	"runtime"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/containerd/containerd"
	apievents "github.com/containerd/containerd/api/events"
	"github.com/containerd/containerd/api/types"
	"github.com/containerd/containerd/archive"
	"github.com/containerd/containerd/cio"
	"github.com/containerd/containerd/content"
	"github.com/containerd/containerd/images"
	"github.com/containerd/containerd/protobuf"
	v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
	cerrdefs "github.com/containerd/errdefs"
	"github.com/containerd/log"
	"github.com/containerd/typeurl/v2"
	"github.com/docker/docker/errdefs"
	"github.com/docker/docker/libcontainerd/queue"
	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
	"github.com/docker/docker/pkg/ioutils"
	"github.com/hashicorp/go-multierror"
	"github.com/opencontainers/go-digest"
	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
	specs "github.com/opencontainers/runtime-spec/specs-go"
	"github.com/pkg/errors"
	"go.opentelemetry.io/otel"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
)

// DockerContainerBundlePath is the label key pointing to the container's bundle path
const DockerContainerBundlePath = "com.docker/engine.bundle.path"

type client struct {
	client   *containerd.Client
	stateDir string
	logger   *log.Entry
	ns       string

	backend libcontainerdtypes.Backend
	eventQ  queue.Queue
}

type container struct {
	client *client
	c8dCtr containerd.Container

	v2runcoptions *v2runcoptions.Options
}

type task struct {
	containerd.Task
	ctr *container
}

type process struct {
	containerd.Process
}

// NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
	c := &client{
		client:   cli,
		stateDir: stateDir,
		logger:   log.G(ctx).WithField("module", "libcontainerd").WithField("namespace", ns),
		ns:       ns,
		backend:  b,
	}

	go c.processEventStream(ctx, ns)

	return c, nil
}

func (c *client) Version(ctx context.Context) (containerd.Version, error) {
	return c.client.Version(ctx)
}

func (c *container) newTask(t containerd.Task) *task {
	return &task{Task: t, ctr: c}
}

func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) {
	var dio *cio.DirectIO
	defer func() {
		if err != nil && dio != nil {
			dio.Cancel()
			dio.Close()
		}
	}()

	attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
		// dio must be assigned to the previously defined dio for the defer above
		// to handle cleanup
		dio, err = c.client.newDirectIO(ctx, fifos)
		if err != nil {
			return nil, err
		}
		return attachStdio(dio)
	}
	t, err := c.c8dCtr.Task(ctx, attachIO)
	if err != nil {
		return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
	}
	return c.newTask(t), nil
}

func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
	bdir := c.bundleDir(id)
	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")

	newOpts := []containerd.NewContainerOpts{
		containerd.WithSpec(ociSpec),
		containerd.WithRuntime(shim, runtimeOptions),
		WithBundle(bdir, ociSpec),
	}
	opts = append(opts, newOpts...)

	ctr, err := c.client.NewContainer(ctx, id, opts...)
	if err != nil {
		if cerrdefs.IsAlreadyExists(err) {
			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
		}
		return nil, wrapError(err)
	}

	created := container{
		client: c,
		c8dCtr: ctr,
	}
	if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
		created.v2runcoptions = x
	}
	return &created, nil
}

// NewTask creates a task for the specified containerd id
func (c *container) NewTask(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
	var (
		checkpoint     *types.Descriptor
		t              containerd.Task
		rio            cio.IO
		stdinCloseSync = make(chan containerd.Process, 1)
	)

	ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.NewTask")
	defer span.End()

	if checkpointDir != "" {
		// write checkpoint to the content store
		tar := archive.Diff(ctx, "", checkpointDir)
		var err error
		checkpoint, err = c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
		// remove the checkpoint when we're done
		defer func() {
			if checkpoint != nil {
				err := c.client.client.ContentStore().Delete(ctx, digest.Digest(checkpoint.Digest))
				if err != nil {
					c.client.logger.WithError(err).WithFields(log.Fields{
						"ref":    checkpointDir,
						"digest": checkpoint.Digest,
					}).Warnf("failed to delete temporary checkpoint entry")
				}
			}
		}()
		if err := tar.Close(); err != nil {
			return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
		}
		if err != nil {
			return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
		}
	}

	// Optimization: assume the relevant metadata has not changed in the
	// moment since the container was created. Elide redundant RPC requests
	// to refresh the metadata separately for spec and labels.
	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
	if err != nil {
		return nil, errors.Wrap(err, "failed to retrieve metadata")
	}
	bundle := md.Labels[DockerContainerBundlePath]

	var spec specs.Spec
	if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
		return nil, errors.Wrap(err, "failed to retrieve spec")
	}
	uid, gid := getSpecUser(&spec)

	taskOpts := []containerd.NewTaskOpts{
		func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
			info.Checkpoint = checkpoint
			return nil
		},
	}

	if runtime.GOOS != "windows" {
		taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
			if c.v2runcoptions != nil {
				opts := proto.Clone(c.v2runcoptions).(*v2runcoptions.Options)
				opts.IoUid = uint32(uid)
				opts.IoGid = uint32(gid)
				info.Options = opts
			}
			return nil
		})
	} else {
		taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
	}

	t, err = c.c8dCtr.NewTask(ctx,
		func(id string) (cio.IO, error) {
			fifos := newFIFOSet(bundle, id, withStdin, spec.Process.Terminal)

			rio, err = c.createIO(fifos, stdinCloseSync, attachStdio)
			return rio, err
		},
		taskOpts...,
	)
	if err != nil {
		close(stdinCloseSync)
		if rio != nil {
			rio.Cancel()
			rio.Close()
		}
		return nil, errors.Wrap(wrapError(err), "failed to create task for container")
	}

	// Signal c.createIO that it can call CloseIO
	stdinCloseSync <- t

	return c.newTask(t), nil
}

func (t *task) Start(ctx context.Context) error {
	ctx, span := otel.Tracer("").Start(ctx, "libcontainerd.remote.task.Start")
	defer span.End()
	return wrapError(t.Task.Start(ctx))
}

// Exec creates exec process.
//
// The containerd client calls Exec to register the exec config in the shim side.
// When the client calls Start, the shim will create stdin fifo if needs. But
// for the container main process, the stdin fifo will be created in Create not
// the Start call. stdinCloseSync channel should be closed after Start exec
// process.
func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
	var (
		p              containerd.Process
		rio            cio.IO
		stdinCloseSync = make(chan containerd.Process, 1)
	)

	// Optimization: assume the DockerContainerBundlePath label has not been
	// updated since the container metadata was last loaded/refreshed.
	md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
	if err != nil {
		return nil, wrapError(err)
	}

	fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)

	defer func() {
		if err != nil {
			if rio != nil {
				rio.Cancel()
				rio.Close()
			}
		}
	}()

	p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
		rio, err = t.ctr.createIO(fifos, stdinCloseSync, attachStdio)
		return rio, err
	})
	if err != nil {
		close(stdinCloseSync)
		if cerrdefs.IsAlreadyExists(err) {
			return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
		}
		return nil, wrapError(err)
	}

	// Signal c.createIO that it can call CloseIO
	//
	// the stdin of exec process will be created after p.Start in containerd
	defer func() { stdinCloseSync <- p }()

	if err = p.Start(ctx); err != nil {
		// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
		// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
		// older containerd-shim
		ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
		defer cancel()
		p.Delete(ctx)
		return nil, wrapError(err)
	}
	return process{p}, nil
}

func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
	return wrapError(t.Task.Kill(ctx, signal))
}

func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
	return wrapError(p.Process.Kill(ctx, signal))
}

func (t *task) Pause(ctx context.Context) error {
	return wrapError(t.Task.Pause(ctx))
}

func (t *task) Resume(ctx context.Context) error {
	return wrapError(t.Task.Resume(ctx))
}

func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
	m, err := t.Metrics(ctx)
	if err != nil {
		return nil, err
	}

	v, err := typeurl.UnmarshalAny(m.Data)
	if err != nil {
		return nil, err
	}
	return libcontainerdtypes.InterfaceToStats(protobuf.FromTimestamp(m.Timestamp), v), nil
}

func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
	pis, err := t.Pids(ctx)
	if err != nil {
		return nil, err
	}

	var infos []libcontainerdtypes.Summary
	for _, pi := range pis {
		i, err := typeurl.UnmarshalAny(pi.Info)
		if err != nil {
			return nil, errors.Wrap(err, "unable to decode process details")
		}
		s, err := summaryFromInterface(i)
		if err != nil {
			return nil, err
		}
		infos = append(infos, *s)
	}

	return infos, nil
}

func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
	s, err := t.Task.Delete(ctx)
	return s, wrapError(err)
}

func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
	s, err := p.Process.Delete(ctx)
	return s, wrapError(err)
}

func (c *container) Delete(ctx context.Context) error {
	// Optimization: assume the DockerContainerBundlePath label has not been
	// updated since the container metadata was last loaded/refreshed.
	md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
	if err != nil {
		return err
	}
	bundle := md.Labels[DockerContainerBundlePath]
	if err := c.c8dCtr.Delete(ctx); err != nil {
		return wrapError(err)
	}
	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
		if err := os.RemoveAll(bundle); err != nil {
			c.client.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
				"container": c.c8dCtr.ID(),
				"bundle":    bundle,
			}).Error("failed to remove state dir")
		}
	}
	return nil
}

func (t *task) ForceDelete(ctx context.Context) error {
	_, err := t.Task.Delete(ctx, containerd.WithProcessKill)
	return wrapError(err)
}

func (t *task) Status(ctx context.Context) (containerd.Status, error) {
	s, err := t.Task.Status(ctx)
	return s, wrapError(err)
}

func (p process) Status(ctx context.Context) (containerd.Status, error) {
	s, err := p.Process.Status(ctx)
	return s, wrapError(err)
}

func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
	return func(r *containerd.CheckpointTaskInfo) error {
		if r.Options == nil && c.v2runcoptions != nil {
			r.Options = &v2runcoptions.CheckpointOptions{}
		}

		switch opts := r.Options.(type) {
		case *v2runcoptions.CheckpointOptions:
			opts.Exit = exit
		}

		return nil
	}
}

func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
	img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
	if err != nil {
		return wrapError(err)
	}
	// Whatever happens, delete the checkpoint from containerd
	defer func() {
		err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
		if err != nil {
			t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
				Warnf("failed to delete checkpoint image")
		}
	}()

	b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
	if err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
	}
	var index ocispec.Index
	if err := json.Unmarshal(b, &index); err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
	}

	var cpDesc *ocispec.Descriptor
	for _, m := range index.Manifests {
		m := m
		if m.MediaType == images.MediaTypeContainerd1Checkpoint {
			cpDesc = &m //nolint:gosec
			break
		}
	}
	if cpDesc == nil {
		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
	}

	rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
	if err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
	}
	defer rat.Close()
	_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
	if err != nil {
		return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
	}

	return err
}

// LoadContainer loads the containerd container.
func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
	ctr, err := c.client.LoadContainer(ctx, id)
	if err != nil {
		if cerrdefs.IsNotFound(err) {
			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
		}
		return nil, wrapError(err)
	}
	return &container{client: c, c8dCtr: ctr}, nil
}

func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
	t, err := c.c8dCtr.Task(ctx, nil)
	if err != nil {
		return nil, wrapError(err)
	}
	return c.newTask(t), nil
}

// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *container) createIO(fifos *cio.FIFOSet, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
	var (
		io  *cio.DirectIO
		err error
	)
	io, err = c.client.newDirectIO(context.Background(), fifos)
	if err != nil {
		return nil, err
	}

	if io.Stdin != nil {
		var (
			closeErr  error
			stdinOnce sync.Once
		)
		pipe := io.Stdin
		io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
			stdinOnce.Do(func() {
				closeErr = pipe.Close()

				select {
				case p, ok := <-stdinCloseSync:
					if !ok {
						return
					}
					if err := closeStdin(context.Background(), p); err != nil {
						if closeErr != nil {
							closeErr = multierror.Append(closeErr, err)
						} else {
							// Avoid wrapping a single error in a multierror.
							closeErr = err
						}
					}
				default:
					// The process wasn't ready. Close its stdin asynchronously.
					go func() {
						p, ok := <-stdinCloseSync
						if !ok {
							return
						}
						if err := closeStdin(context.Background(), p); err != nil {
							c.client.logger.WithError(err).
								WithField("container", c.c8dCtr.ID()).
								Error("failed to close container stdin")
						}
					}()
				}
			})
			return closeErr
		})
	}

	rio, err := attachStdio(io)
	if err != nil {
		io.Cancel()
		io.Close()
	}
	return rio, err
}

func closeStdin(ctx context.Context, p containerd.Process) error {
	err := p.CloseIO(ctx, containerd.WithStdinCloser)
	if err != nil && strings.Contains(err.Error(), "transport is closing") {
		err = nil
	}
	return err
}

func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
	c.eventQ.Append(ei.ContainerID, func() {
		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
		if err != nil {
			c.logger.WithContext(ctx).WithError(err).WithFields(log.Fields{
				"container":  ei.ContainerID,
				"event":      et,
				"event-info": ei,
			}).Error("failed to process event")
		}
	})
}

func (c *client) waitServe(ctx context.Context) bool {
	t := 100 * time.Millisecond
	delay := time.NewTimer(t)
	if !delay.Stop() {
		<-delay.C
	}
	defer delay.Stop()

	// `IsServing` will actually block until the service is ready.
	// However it can return early, so we'll loop with a delay to handle it.
	for {
		serving, err := c.client.IsServing(ctx)
		if err != nil {
			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
				return false
			}
			log.G(ctx).WithError(err).Warn("Error while testing if containerd API is ready")
		}

		if serving {
			return true
		}

		delay.Reset(t)
		select {
		case <-ctx.Done():
			return false
		case <-delay.C:
		}
	}
}

func (c *client) processEventStream(ctx context.Context, ns string) {
	// Create a new context specifically for this subscription.
	// The context must be cancelled to cancel the subscription.
	// In cases where we have to restart event stream processing,
	//   we'll need the original context b/c this one will be cancelled
	subCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	// Filter on both namespace *and* topic. To create an "and" filter,
	// this must be a single, comma-separated string
	eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")

	c.logger.Debug("processing event stream")

	for {
		select {
		case err := <-errC:
			if err != nil {
				errStatus, ok := status.FromError(err)
				if !ok || errStatus.Code() != codes.Canceled {
					c.logger.WithError(err).Error("Failed to get event")
					c.logger.Info("Waiting for containerd to be ready to restart event processing")
					if c.waitServe(ctx) {
						go c.processEventStream(ctx, ns)
						return
					}
				}
				c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
			}
			return
		case ev := <-eventStream:
			if ev.Event == nil {
				c.logger.WithField("event", ev).Warn("invalid event")
				continue
			}

			v, err := typeurl.UnmarshalAny(ev.Event)
			if err != nil {
				c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
				continue
			}

			c.logger.WithField("topic", ev.Topic).Debug("event")

			switch t := v.(type) {
			case *apievents.TaskCreate:
				c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ContainerID,
					Pid:         t.Pid,
				})
			case *apievents.TaskStart:
				c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ContainerID,
					Pid:         t.Pid,
				})
			case *apievents.TaskExit:
				c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ID,
					Pid:         t.Pid,
					ExitCode:    t.ExitStatus,
					ExitedAt:    protobuf.FromTimestamp(t.ExitedAt),
				})
			case *apievents.TaskOOM:
				c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
				})
			case *apievents.TaskExecAdded:
				c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ExecID,
				})
			case *apievents.TaskExecStarted:
				c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
					ProcessID:   t.ExecID,
					Pid:         t.Pid,
				})
			case *apievents.TaskPaused:
				c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
				})
			case *apievents.TaskResumed:
				c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
					ContainerID: t.ContainerID,
				})
			case *apievents.TaskDelete:
				c.logger.WithFields(log.Fields{
					"topic":     ev.Topic,
					"type":      reflect.TypeOf(t),
					"container": t.ContainerID,
				}).Info("ignoring event")
			default:
				c.logger.WithFields(log.Fields{
					"topic": ev.Topic,
					"type":  reflect.TypeOf(t),
				}).Info("ignoring event")
			}
		}
	}
}

func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
	writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
	if err != nil {
		return nil, err
	}
	defer writer.Close()
	size, err := io.Copy(writer, r)
	if err != nil {
		return nil, err
	}
	labels := map[string]string{
		"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
	}
	if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
		return nil, err
	}
	return &types.Descriptor{
		MediaType: mediaType,
		Digest:    writer.Digest().String(),
		Size:      size,
	}, nil
}

func (c *client) bundleDir(id string) string {
	return filepath.Join(c.stateDir, id)
}

func wrapError(err error) error {
	switch {
	case err == nil:
		return nil
	case cerrdefs.IsNotFound(err):
		return errdefs.NotFound(err)
	}

	msg := err.Error()
	for _, s := range []string{"container does not exist", "not found", "no such container"} {
		if strings.Contains(msg, s) {
			return errdefs.NotFound(err)
		}
	}
	return err
}
