package cluster // import "github.com/docker/docker/daemon/cluster"

import (
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"io"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/containerd/log"
	"github.com/distribution/reference"
	"github.com/docker/docker/api/types/backend"
	"github.com/docker/docker/api/types/container"
	"github.com/docker/docker/api/types/registry"
	"github.com/docker/docker/api/types/swarm"
	timetypes "github.com/docker/docker/api/types/time"
	"github.com/docker/docker/daemon/cluster/convert"
	"github.com/docker/docker/errdefs"
	gogotypes "github.com/gogo/protobuf/types"
	swarmapi "github.com/moby/swarmkit/v2/api"
	"github.com/opencontainers/go-digest"
	"github.com/pkg/errors"
	"google.golang.org/grpc"
)

// GetServices returns all services of a managed swarm cluster.
func (c *Cluster) GetServices(options swarm.ServiceListOptions) ([]swarm.Service, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	state := c.currentNodeState()
	if !state.IsActiveManager() {
		return nil, c.errNoManager(state)
	}

	// We move the accepted filter check here as "mode" filter
	// is processed in the daemon, not in SwarmKit. So it might
	// be good to have accepted file check in the same file as
	// the filter processing (in the for loop below).
	accepted := map[string]bool{
		"name":    true,
		"id":      true,
		"label":   true,
		"mode":    true,
		"runtime": true,
	}
	if err := options.Filters.Validate(accepted); err != nil {
		return nil, err
	}

	if len(options.Filters.Get("runtime")) == 0 {
		// Default to using the container runtime filter
		options.Filters.Add("runtime", string(swarm.RuntimeContainer))
	}

	filters := &swarmapi.ListServicesRequest_Filters{
		NamePrefixes: options.Filters.Get("name"),
		IDPrefixes:   options.Filters.Get("id"),
		Labels:       convertKVStringsToMap(options.Filters.Get("label")),
		Runtimes:     options.Filters.Get("runtime"),
	}

	ctx := context.TODO()
	ctx, cancel := context.WithTimeout(ctx, swarmRequestTimeout)
	defer cancel()

	r, err := state.controlClient.ListServices(
		ctx,
		&swarmapi.ListServicesRequest{Filters: filters},
		grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
	)
	if err != nil {
		return nil, err
	}

	services := make([]swarm.Service, 0, len(r.Services))

	// if the  user requests the service statuses, we'll store the IDs needed
	// in this slice
	var serviceIDs []string
	if options.Status {
		serviceIDs = make([]string, 0, len(r.Services))
	}
	for _, service := range r.Services {
		if options.Filters.Contains("mode") {
			var mode string
			switch service.Spec.GetMode().(type) {
			case *swarmapi.ServiceSpec_Global:
				mode = "global"
			case *swarmapi.ServiceSpec_Replicated:
				mode = "replicated"
			case *swarmapi.ServiceSpec_ReplicatedJob:
				mode = "replicated-job"
			case *swarmapi.ServiceSpec_GlobalJob:
				mode = "global-job"
			}

			if !options.Filters.ExactMatch("mode", mode) {
				continue
			}
		}
		if options.Status {
			serviceIDs = append(serviceIDs, service.ID)
		}
		svcs, err := convert.ServiceFromGRPC(*service)
		if err != nil {
			return nil, err
		}
		services = append(services, svcs)
	}

	if options.Status {
		// Listing service statuses is a separate call because, while it is the
		// most common UI operation, it is still just a UI operation, and it
		// would be improper to include this data in swarm's Service object.
		// We pay the cost with some complexity here, but this is still way
		// more efficient than marshalling and unmarshalling all the JSON
		// needed to list tasks and get this data otherwise client-side
		resp, err := state.controlClient.ListServiceStatuses(
			ctx,
			&swarmapi.ListServiceStatusesRequest{Services: serviceIDs},
			grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
		)
		if err != nil {
			return nil, err
		}

		// we'll need to match up statuses in the response with the services in
		// the list operation. if we did this by operating on two lists, the
		// result would be quadratic. instead, make a mapping of service IDs to
		// service statuses so that this is roughly linear. additionally,
		// convert the status response to an engine api service status here.
		serviceMap := map[string]*swarm.ServiceStatus{}
		for _, status := range resp.Statuses {
			serviceMap[status.ServiceID] = &swarm.ServiceStatus{
				RunningTasks:   status.RunningTasks,
				DesiredTasks:   status.DesiredTasks,
				CompletedTasks: status.CompletedTasks,
			}
		}

		// because this is a list of values and not pointers, make sure we
		// actually alter the value when iterating.
		for i, service := range services {
			// the return value of the ListServiceStatuses operation is
			// guaranteed to contain a value in the response for every argument
			// in the request, so we can safely do this assignment. and even if
			// it wasn't, and the service ID was for some reason absent from
			// this map, the resulting value of service.Status would just be
			// nil -- the same thing it was before
			service.ServiceStatus = serviceMap[service.ID]
			services[i] = service
		}
	}

	return services, nil
}

// GetService returns a service based on an ID or name.
func (c *Cluster) GetService(input string, insertDefaults bool) (swarm.Service, error) {
	var service *swarmapi.Service
	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
		s, err := getService(ctx, state.controlClient, input, insertDefaults)
		if err != nil {
			return err
		}
		service = s
		return nil
	}); err != nil {
		return swarm.Service{}, err
	}
	svc, err := convert.ServiceFromGRPC(*service)
	if err != nil {
		return swarm.Service{}, err
	}
	return svc, nil
}

// CreateService creates a new service in a managed swarm cluster.
func (c *Cluster) CreateService(s swarm.ServiceSpec, encodedAuth string, queryRegistry bool) (*swarm.ServiceCreateResponse, error) {
	var resp *swarm.ServiceCreateResponse
	err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
		err := c.populateNetworkID(ctx, state.controlClient, &s)
		if err != nil {
			return err
		}

		serviceSpec, err := convert.ServiceSpecToGRPC(s)
		if err != nil {
			return errdefs.InvalidParameter(err)
		}

		resp = &swarm.ServiceCreateResponse{}

		switch serviceSpec.Task.Runtime.(type) {
		case *swarmapi.TaskSpec_Attachment:
			return fmt.Errorf("invalid task spec: spec type %q not supported", swarm.RuntimeNetworkAttachment)
		// handle other runtimes here
		case *swarmapi.TaskSpec_Generic:
			switch serviceSpec.Task.GetGeneric().Kind {
			case string(swarm.RuntimePlugin):
				if !c.config.Backend.HasExperimental() {
					return fmt.Errorf("runtime type %q only supported in experimental", swarm.RuntimePlugin)
				}
				if s.TaskTemplate.PluginSpec == nil {
					return errors.New("plugin spec must be set")
				}

			default:
				return fmt.Errorf("unsupported runtime type: %q", serviceSpec.Task.GetGeneric().Kind)
			}

			r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
			if err != nil {
				return err
			}

			resp.ID = r.Service.ID
		case *swarmapi.TaskSpec_Container:
			ctnr := serviceSpec.Task.GetContainer()
			if ctnr == nil {
				return errors.New("service does not use container tasks")
			}
			if encodedAuth != "" {
				ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
			}

			// retrieve auth config from encoded auth
			authConfig := &registry.AuthConfig{}
			if encodedAuth != "" {
				authReader := strings.NewReader(encodedAuth)
				dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
				if err := dec.Decode(authConfig); err != nil {
					log.G(ctx).Warnf("invalid authconfig: %v", err)
				}
			}

			// pin image by digest for API versions < 1.30
			// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
			// should be removed in the future. Since integration tests only use the
			// latest API version, so this is no longer required.
			if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
				digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
				if err != nil {
					log.G(ctx).Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
					// warning in the client response should be concise
					resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
				} else if ctnr.Image != digestImage {
					log.G(ctx).Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
					ctnr.Image = digestImage
				} else {
					log.G(ctx).Debugf("creating service using supplied digest reference %s", ctnr.Image)
				}

				// Replace the context with a fresh one.
				// If we timed out while communicating with the
				// registry, then "ctx" will already be expired, which
				// would cause UpdateService below to fail. Reusing
				// "ctx" could make it impossible to create a service
				// if the registry is slow or unresponsive.
				var cancel func()
				ctx = context.WithoutCancel(ctx)
				ctx, cancel = context.WithTimeout(ctx, swarmRequestTimeout)
				defer cancel()
			}

			r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
			if err != nil {
				return err
			}

			resp.ID = r.Service.ID
		}
		return nil
	})

	return resp, err
}

// UpdateService updates existing service to match new properties.
func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec swarm.ServiceSpec, flags swarm.ServiceUpdateOptions, queryRegistry bool) (*swarm.ServiceUpdateResponse, error) {
	var resp *swarm.ServiceUpdateResponse

	err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
		err := c.populateNetworkID(ctx, state.controlClient, &spec)
		if err != nil {
			return err
		}

		serviceSpec, err := convert.ServiceSpecToGRPC(spec)
		if err != nil {
			return errdefs.InvalidParameter(err)
		}

		currentService, err := getService(ctx, state.controlClient, serviceIDOrName, false)
		if err != nil {
			return err
		}

		resp = &swarm.ServiceUpdateResponse{}

		switch serviceSpec.Task.Runtime.(type) {
		case *swarmapi.TaskSpec_Attachment:
			return fmt.Errorf("invalid task spec: spec type %q not supported", swarm.RuntimeNetworkAttachment)
		case *swarmapi.TaskSpec_Generic:
			switch serviceSpec.Task.GetGeneric().Kind {
			case string(swarm.RuntimePlugin):
				if spec.TaskTemplate.PluginSpec == nil {
					return errors.New("plugin spec must be set")
				}
			}
		case *swarmapi.TaskSpec_Container:
			newCtnr := serviceSpec.Task.GetContainer()
			if newCtnr == nil {
				return errors.New("service does not use container tasks")
			}

			encodedAuth := flags.EncodedRegistryAuth
			if encodedAuth != "" {
				newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
			} else {
				// this is needed because if the encodedAuth isn't being updated then we
				// shouldn't lose it, and continue to use the one that was already present
				var ctnr *swarmapi.ContainerSpec
				switch flags.RegistryAuthFrom {
				case swarm.RegistryAuthFromSpec, "":
					ctnr = currentService.Spec.Task.GetContainer()
				case swarm.RegistryAuthFromPreviousSpec:
					if currentService.PreviousSpec == nil {
						return errors.New("service does not have a previous spec")
					}
					ctnr = currentService.PreviousSpec.Task.GetContainer()
				default:
					return errors.New("unsupported registryAuthFrom value")
				}
				if ctnr == nil {
					return errors.New("service does not use container tasks")
				}
				newCtnr.PullOptions = ctnr.PullOptions
				// update encodedAuth so it can be used to pin image by digest
				if ctnr.PullOptions != nil {
					encodedAuth = ctnr.PullOptions.RegistryAuth
				}
			}

			// retrieve auth config from encoded auth
			authConfig := &registry.AuthConfig{}
			if encodedAuth != "" {
				if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
					log.G(ctx).Warnf("invalid authconfig: %v", err)
				}
			}

			// pin image by digest for API versions < 1.30
			// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
			// should be removed in the future. Since integration tests only use the
			// latest API version, so this is no longer required.
			if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
				digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
				if err != nil {
					log.G(ctx).Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
					// warning in the client response should be concise
					resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
				} else if newCtnr.Image != digestImage {
					log.G(ctx).Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
					newCtnr.Image = digestImage
				} else {
					log.G(ctx).Debugf("updating service using supplied digest reference %s", newCtnr.Image)
				}

				// Replace the context with a fresh one.
				// If we timed out while communicating with the
				// registry, then "ctx" will already be expired, which
				// would cause UpdateService below to fail. Reusing
				// "ctx" could make it impossible to update a service
				// if the registry is slow or unresponsive.
				var cancel func()
				ctx = context.WithoutCancel(ctx)
				ctx, cancel = context.WithTimeout(ctx, swarmRequestTimeout)
				defer cancel()
			}
		}

		var rollback swarmapi.UpdateServiceRequest_Rollback
		switch flags.Rollback {
		case "", "none":
			rollback = swarmapi.UpdateServiceRequest_NONE
		case "previous":
			rollback = swarmapi.UpdateServiceRequest_PREVIOUS
		default:
			return fmt.Errorf("unrecognized rollback option %s", flags.Rollback)
		}

		_, err = state.controlClient.UpdateService(
			ctx,
			&swarmapi.UpdateServiceRequest{
				ServiceID: currentService.ID,
				Spec:      &serviceSpec,
				ServiceVersion: &swarmapi.Version{
					Index: version,
				},
				Rollback: rollback,
			},
		)
		return err
	})
	return resp, err
}

// RemoveService removes a service from a managed swarm cluster.
func (c *Cluster) RemoveService(input string) error {
	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
		service, err := getService(ctx, state.controlClient, input, false)
		if err != nil {
			return err
		}

		_, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
		return err
	})
}

// ServiceLogs collects service logs and writes them back to `config.OutStream`
func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *container.LogsOptions) (<-chan *backend.LogMessage, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	state := c.currentNodeState()
	if !state.IsActiveManager() {
		return nil, c.errNoManager(state)
	}

	swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
	if err != nil {
		return nil, errors.Wrap(err, "error making log selector")
	}

	// set the streams we'll use
	stdStreams := []swarmapi.LogStream{}
	if config.ShowStdout {
		stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
	}
	if config.ShowStderr {
		stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
	}

	// Get tail value squared away - the number of previous log lines we look at
	var tail int64
	// in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
	// it to -1 (all). i don't agree with that, but i also think no tail value
	// should be legitimate. if you don't pass tail, we assume you want "all"
	if config.Tail == "all" || config.Tail == "" {
		// tail of 0 means send all logs on the swarmkit side
		tail = 0
	} else {
		t, err := strconv.Atoi(config.Tail)
		if err != nil {
			return nil, errors.New(`tail value must be a positive integer or "all"`)
		}
		if t < 0 {
			return nil, errors.New("negative tail values not supported")
		}
		// we actually use negative tail in swarmkit to represent messages
		// backwards starting from the beginning. also, -1 means no logs. so,
		// basically, for api compat with docker container logs, add one and
		// flip the sign. we error above if you try to negative tail, which
		// isn't supported by docker (and would error deeper in the stack
		// anyway)
		//
		// See the logs protobuf for more information
		tail = int64(-(t + 1))
	}

	// get the since value - the time in the past we're looking at logs starting from
	var sinceProto *gogotypes.Timestamp
	if config.Since != "" {
		s, n, err := timetypes.ParseTimestamps(config.Since, 0)
		if err != nil {
			return nil, errors.Wrap(err, "could not parse since timestamp")
		}
		since := time.Unix(s, n)
		sinceProto, err = gogotypes.TimestampProto(since)
		if err != nil {
			return nil, errors.Wrap(err, "could not parse timestamp to proto")
		}
	}

	stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
		Selector: swarmSelector,
		Options: &swarmapi.LogSubscriptionOptions{
			Follow:  config.Follow,
			Streams: stdStreams,
			Tail:    tail,
			Since:   sinceProto,
		},
	})
	if err != nil {
		return nil, err
	}

	messageChan := make(chan *backend.LogMessage, 1)
	go func() {
		defer close(messageChan)
		for {
			// Check the context before doing anything.
			select {
			case <-ctx.Done():
				return
			default:
			}
			subscribeMsg, err := stream.Recv()
			if err == io.EOF {
				return
			}
			// if we're not io.EOF, push the message in and return
			if err != nil {
				select {
				case <-ctx.Done():
				case messageChan <- &backend.LogMessage{Err: err}:
				}
				return
			}

			for _, msg := range subscribeMsg.Messages {
				// make a new message
				m := new(backend.LogMessage)
				m.Attrs = make([]backend.LogAttr, 0, len(msg.Attrs)+3)
				// add the timestamp, adding the error if it fails
				m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
				if err != nil {
					m.Err = err
				}

				nodeKey := contextPrefix + ".node.id"
				serviceKey := contextPrefix + ".service.id"
				taskKey := contextPrefix + ".task.id"

				// copy over all of the details
				for _, d := range msg.Attrs {
					switch d.Key {
					case nodeKey, serviceKey, taskKey:
						// we have the final say over context details (in case there
						// is a conflict (if the user added a detail with a context's
						// key for some reason))
					default:
						m.Attrs = append(m.Attrs, backend.LogAttr{Key: d.Key, Value: d.Value})
					}
				}
				m.Attrs = append(m.Attrs,
					backend.LogAttr{Key: nodeKey, Value: msg.Context.NodeID},
					backend.LogAttr{Key: serviceKey, Value: msg.Context.ServiceID},
					backend.LogAttr{Key: taskKey, Value: msg.Context.TaskID},
				)

				switch msg.Stream {
				case swarmapi.LogStreamStdout:
					m.Source = "stdout"
				case swarmapi.LogStreamStderr:
					m.Source = "stderr"
				default:
					// TODO(thaJeztah): make switch exhaustive; add swarmapi.LogStreamUnknown
				}
				m.Line = msg.Data

				// there could be a case where the reader stops accepting
				// messages and the context is canceled. we need to check that
				// here, or otherwise we risk blocking forever on the message
				// send.
				select {
				case <-ctx.Done():
					return
				case messageChan <- m:
				}
			}
		}
	}()
	return messageChan, nil
}

// convertSelector takes a backend.LogSelector, which contains raw names that
// may or may not be valid, and converts them to an api.LogSelector proto. It
// returns an error if something fails
func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
	// don't rely on swarmkit to resolve IDs, do it ourselves
	swarmSelector := &swarmapi.LogSelector{}
	for _, s := range selector.Services {
		service, err := getService(ctx, cc, s, false)
		if err != nil {
			return nil, err
		}
		c := service.Spec.Task.GetContainer()
		if c == nil {
			return nil, errors.New("logs only supported on container tasks")
		}
		swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
	}
	for _, t := range selector.Tasks {
		task, err := getTask(ctx, cc, t)
		if err != nil {
			return nil, err
		}
		c := task.Spec.GetContainer()
		if c == nil {
			return nil, errors.New("logs only supported on container tasks")
		}
		swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
	}
	return swarmSelector, nil
}

// imageWithDigestString takes an image such as name or name:tag
// and returns the image pinned to a digest, such as name@sha256:34234
func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *registry.AuthConfig) (string, error) {
	ref, err := reference.ParseAnyReference(image)
	if err != nil {
		return "", err
	}
	namedRef, ok := ref.(reference.Named)
	if !ok {
		if _, ok := ref.(reference.Digested); ok {
			return image, nil
		}
		return "", errors.Errorf("unknown image reference format: %s", image)
	}
	// only query registry if not a canonical reference (i.e. with digest)
	if _, ok := namedRef.(reference.Canonical); !ok {
		namedRef = reference.TagNameOnly(namedRef)

		taggedRef, ok := namedRef.(reference.NamedTagged)
		if !ok {
			return "", errors.Errorf("image reference not tagged: %s", image)
		}

		// Fetch the image manifest's digest; if a mirror is configured, try the
		// mirror first, but continue with upstream on failure.
		repos, err := c.config.ImageBackend.GetRepositories(ctx, taggedRef, authConfig)
		if err != nil {
			return "", err
		}

		var (
			imgDigest digest.Digest
			lastErr   error
		)
		for _, repo := range repos {
			dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
			if err != nil {
				lastErr = err
				continue
			}
			imgDigest = dscrptr.Digest
		}
		if lastErr != nil {
			return "", lastErr
		}

		namedDigestedRef, err := reference.WithDigest(taggedRef, imgDigest)
		if err != nil {
			return "", err
		}
		// return familiar form until interface updated to return type
		return reference.FamiliarString(namedDigestedRef), nil
	}
	// reference already contains a digest, so just return it
	return reference.FamiliarString(ref), nil
}

// digestWarning constructs a formatted warning string
// using the image name that could not be pinned by digest. The
// formatting is hardcoded, but could me made smarter in the future
func digestWarning(image string) string {
	return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
}
