/*
 *
 * Copyright 2021 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package xdsresource

import (
	"fmt"
	"math"
	"net"
	"strconv"

	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
	v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
	"google.golang.org/grpc/internal/pretty"
	"google.golang.org/grpc/xds/internal"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/anypb"
)

func unmarshalEndpointsResource(r *anypb.Any) (string, EndpointsUpdate, error) {
	r, err := UnwrapResource(r)
	if err != nil {
		return "", EndpointsUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
	}

	if !IsEndpointsResource(r.GetTypeUrl()) {
		return "", EndpointsUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
	}

	cla := &v3endpointpb.ClusterLoadAssignment{}
	if err := proto.Unmarshal(r.GetValue(), cla); err != nil {
		return "", EndpointsUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
	}

	u, err := parseEDSRespProto(cla)
	if err != nil {
		return cla.GetClusterName(), EndpointsUpdate{}, err
	}
	u.Raw = r
	return cla.GetClusterName(), u, nil
}

func parseAddress(socketAddress *v3corepb.SocketAddress) string {
	return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
}

func parseDropPolicy(dropPolicy *v3endpointpb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
	percentage := dropPolicy.GetDropPercentage()
	var (
		numerator   = percentage.GetNumerator()
		denominator uint32
	)
	switch percentage.GetDenominator() {
	case v3typepb.FractionalPercent_HUNDRED:
		denominator = 100
	case v3typepb.FractionalPercent_TEN_THOUSAND:
		denominator = 10000
	case v3typepb.FractionalPercent_MILLION:
		denominator = 1000000
	}
	return OverloadDropConfig{
		Category:    dropPolicy.GetCategory(),
		Numerator:   numerator,
		Denominator: denominator,
	}
}

func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs map[string]bool) ([]Endpoint, error) {
	endpoints := make([]Endpoint, 0, len(lbEndpoints))
	for _, lbEndpoint := range lbEndpoints {
		// If the load_balancing_weight field is specified, it must be set to a
		// value of at least 1.  If unspecified, each host is presumed to have
		// equal weight in a locality.
		weight := uint32(1)
		if w := lbEndpoint.GetLoadBalancingWeight(); w != nil {
			if w.GetValue() == 0 {
				return nil, fmt.Errorf("EDS response contains an endpoint with zero weight: %+v", lbEndpoint)
			}
			weight = w.GetValue()
		}
		addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())
		if uniqueEndpointAddrs[addr] {
			return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr)
		}
		uniqueEndpointAddrs[addr] = true
		endpoints = append(endpoints, Endpoint{
			HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
			Address:      addr,
			Weight:       weight,
		})
	}
	return endpoints, nil
}

func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
	ret := EndpointsUpdate{}
	for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
		ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
	}
	priorities := make(map[uint32]map[string]bool)
	sumOfWeights := make(map[uint32]uint64)
	uniqueEndpointAddrs := make(map[string]bool)
	for _, locality := range m.Endpoints {
		l := locality.GetLocality()
		if l == nil {
			return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
		}
		weight := locality.GetLoadBalancingWeight().GetValue()
		if weight == 0 {
			logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l))
			continue
		}
		priority := locality.GetPriority()
		sumOfWeights[priority] += uint64(weight)
		if sumOfWeights[priority] > math.MaxUint32 {
			return EndpointsUpdate{}, fmt.Errorf("sum of weights of localities at the same priority %d exceeded maximal value", priority)
		}
		localitiesWithPriority := priorities[priority]
		if localitiesWithPriority == nil {
			localitiesWithPriority = make(map[string]bool)
			priorities[priority] = localitiesWithPriority
		}
		lid := internal.LocalityID{
			Region:  l.Region,
			Zone:    l.Zone,
			SubZone: l.SubZone,
		}
		lidStr, _ := lid.ToString()

		// "Since an xDS configuration can place a given locality under multiple
		// priorities, it is possible to see locality weight attributes with
		// different values for the same locality." - A52
		//
		// This is handled in the client by emitting the locality weight
		// specified for the priority it is specified in. If the same locality
		// has a different weight in two priorities, each priority will specify
		// a locality with the locality weight specified for that priority, and
		// thus the subsequent tree of balancers linked to that priority will
		// use that locality weight as well.
		if localitiesWithPriority[lidStr] {
			return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority)
		}
		localitiesWithPriority[lidStr] = true
		endpoints, err := parseEndpoints(locality.GetLbEndpoints(), uniqueEndpointAddrs)
		if err != nil {
			return EndpointsUpdate{}, err
		}
		ret.Localities = append(ret.Localities, Locality{
			ID:        lid,
			Endpoints: endpoints,
			Weight:    weight,
			Priority:  priority,
		})
	}
	for i := 0; i < len(priorities); i++ {
		if _, ok := priorities[uint32(i)]; !ok {
			return EndpointsUpdate{}, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
		}
	}
	return ret, nil
}
