package libnetwork

//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto

import (
	"context"
	"encoding/json"
	"fmt"
	"net"
	"sort"
	"sync"

	"github.com/containerd/log"
	"github.com/docker/docker/libnetwork/cluster"
	"github.com/docker/docker/libnetwork/discoverapi"
	"github.com/docker/docker/libnetwork/driverapi"
	"github.com/docker/docker/libnetwork/networkdb"
	"github.com/docker/docker/libnetwork/scope"
	"github.com/docker/docker/libnetwork/types"
	"github.com/docker/go-events"
	"github.com/gogo/protobuf/proto"
)

const (
	subsysGossip = "networking:gossip"
	subsysIPSec  = "networking:ipsec"
	keyringSize  = 3
)

// ByTime implements sort.Interface for []*types.EncryptionKey based on
// the LamportTime field.
type ByTime []*types.EncryptionKey

func (b ByTime) Len() int           { return len(b) }
func (b ByTime) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }

type nwAgent struct {
	networkDB         *networkdb.NetworkDB
	bindAddr          net.IP
	advertiseAddr     string
	dataPathAddr      string
	coreCancelFuncs   []func()
	driverCancelFuncs map[string][]func()
	mu                sync.Mutex
}

func (a *nwAgent) dataPathAddress() string {
	a.mu.Lock()
	defer a.mu.Unlock()
	if a.dataPathAddr != "" {
		return a.dataPathAddr
	}
	return a.advertiseAddr
}

const libnetworkEPTable = "endpoint_table"

func getBindAddr(ifaceName string) (net.IP, error) {
	iface, err := net.InterfaceByName(ifaceName)
	if err != nil {
		return nil, fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
	}

	addrs, err := iface.Addrs()
	if err != nil {
		return nil, fmt.Errorf("failed to get interface addresses: %v", err)
	}

	for _, a := range addrs {
		addr, ok := a.(*net.IPNet)
		if !ok {
			continue
		}
		addrIP := addr.IP

		if addrIP.IsLinkLocalUnicast() {
			continue
		}

		return addrIP, nil
	}

	return nil, fmt.Errorf("failed to get bind address")
}

// resolveAddr resolves the given address, which can be one of, and
// parsed in the following order or priority:
//
// - a well-formed IP-address
// - a hostname
// - an interface-name
func resolveAddr(addrOrInterface string) (net.IP, error) {
	// Try and see if this is a valid IP address
	if ip := net.ParseIP(addrOrInterface); ip != nil {
		return ip, nil
	}

	// If not a valid IP address, it could be a hostname.
	addr, err := net.ResolveIPAddr("ip", addrOrInterface)
	if err != nil {
		// If hostname lookup failed, try to look for an interface with the given name.
		return getBindAddr(addrOrInterface)
	}
	return addr.IP, nil
}

func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
	drvEnc := discoverapi.DriverEncryptionUpdate{}

	agent := c.getAgent()
	if agent == nil {
		log.G(context.TODO()).Debug("Skipping key change as agent is nil")
		return nil
	}

	// Find the deleted key. If the deleted key was the primary key,
	// a new primary key should be set before removing if from keyring.
	c.mu.Lock()
	added := []byte{}
	deleted := []byte{}
	j := len(c.keys)
	for i := 0; i < j; {
		same := false
		for _, key := range keys {
			if same = key.LamportTime == c.keys[i].LamportTime; same {
				break
			}
		}
		if !same {
			cKey := c.keys[i]
			if cKey.Subsystem == subsysGossip {
				deleted = cKey.Key
			}

			if cKey.Subsystem == subsysIPSec {
				drvEnc.Prune = cKey.Key
				drvEnc.PruneTag = cKey.LamportTime
			}
			c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
			c.keys[j-1] = nil
			j--
		}
		i++
	}
	c.keys = c.keys[:j]

	// Find the new key and add it to the key ring
	for _, key := range keys {
		same := false
		for _, cKey := range c.keys {
			if same = cKey.LamportTime == key.LamportTime; same {
				break
			}
		}
		if !same {
			c.keys = append(c.keys, key)
			if key.Subsystem == subsysGossip {
				added = key.Key
			}

			if key.Subsystem == subsysIPSec {
				drvEnc.Key = key.Key
				drvEnc.Tag = key.LamportTime
			}
		}
	}
	c.mu.Unlock()

	if len(added) > 0 {
		agent.networkDB.SetKey(added)
	}

	key, _, err := c.getPrimaryKeyTag(subsysGossip)
	if err != nil {
		return err
	}
	agent.networkDB.SetPrimaryKey(key)

	key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
	if err != nil {
		return err
	}
	drvEnc.Primary = key
	drvEnc.PrimaryTag = tag

	if len(deleted) > 0 {
		agent.networkDB.RemoveKey(deleted)
	}

	c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
		dr, ok := driver.(discoverapi.Discover)
		if !ok {
			return false
		}
		if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil {
			log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err)
			// Attempt to reconfigure keys in case of a update failure
			// which can arise due to a mismatch of keys
			// if worker nodes get temporarily disconnected
			log.G(context.TODO()).Warnf("Reconfiguring datapath keys for  %s", name)
			drvCfgEnc := discoverapi.DriverEncryptionConfig{}
			drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
			err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
			if err != nil {
				log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
			}
		}
		return false
	})

	return nil
}

func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
	agent := c.getAgent()
	if agent != nil {
		// agent is already present, so there is no need initialize it again.
		return nil
	}

	bindAddr := clusterProvider.GetLocalAddress()
	advAddr := clusterProvider.GetAdvertiseAddress()
	dataAddr := clusterProvider.GetDataPathAddress()
	remoteList := clusterProvider.GetRemoteAddressList()
	remoteAddrList := make([]string, 0, len(remoteList))
	for _, remote := range remoteList {
		addr, _, _ := net.SplitHostPort(remote)
		remoteAddrList = append(remoteAddrList, addr)
	}

	listen := clusterProvider.GetListenAddress()
	listenAddr, _, _ := net.SplitHostPort(listen)

	log.G(context.TODO()).WithFields(log.Fields{
		"listen-addr":               listenAddr,
		"local-addr":                bindAddr,
		"advertise-addr":            advAddr,
		"data-path-addr":            dataAddr,
		"remote-addr-list":          remoteAddrList,
		"network-control-plane-mtu": c.Config().NetworkControlPlaneMTU,
	}).Info("Initializing Libnetwork Agent")
	if advAddr != "" {
		if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
			log.G(context.TODO()).WithError(err).Errorf("Error in agentInit")
			return err
		}
		c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
			if capability.ConnectivityScope == scope.Global {
				if d, ok := driver.(discoverapi.Discover); ok {
					c.agentDriverNotify(d)
				}
			}
			return false
		})
	}

	if len(remoteAddrList) > 0 {
		if err := c.agentJoin(remoteAddrList); err != nil {
			log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background")
		}
	}

	return nil
}

// For a given subsystem getKeys sorts the keys by lamport time and returns
// slice of keys and lamport time which can used as a unique tag for the keys
func (c *Controller) getKeys(subsystem string) (keys [][]byte, tags []uint64) {
	c.mu.Lock()
	defer c.mu.Unlock()

	sort.Sort(ByTime(c.keys))

	keys = make([][]byte, 0, len(c.keys))
	tags = make([]uint64, 0, len(c.keys))
	for _, key := range c.keys {
		if key.Subsystem == subsystem {
			keys = append(keys, key.Key)
			tags = append(tags, key.LamportTime)
		}
	}

	if len(keys) > 1 {
		// TODO(thaJeztah): why are we swapping order here? This code was added in https://github.com/moby/libnetwork/commit/e83d68b7d1fd9c479120914024242238f791b4dc
		keys[0], keys[1] = keys[1], keys[0]
		tags[0], tags[1] = tags[1], tags[0]
	}
	return keys, tags
}

// getPrimaryKeyTag returns the primary key for a given subsystem from the
// list of sorted key and the associated tag
func (c *Controller) getPrimaryKeyTag(subsystem string) (key []byte, lamportTime uint64, _ error) {
	c.mu.Lock()
	defer c.mu.Unlock()
	sort.Sort(ByTime(c.keys))
	keys := make([]*types.EncryptionKey, 0, len(c.keys))
	for _, k := range c.keys {
		if k.Subsystem == subsystem {
			keys = append(keys, k)
		}
	}
	if len(keys) < 2 {
		return nil, 0, fmt.Errorf("no primary key found for %s subsystem: %d keys found on controller, expected at least 2", subsystem, len(keys))
	}
	return keys[1].Key, keys[1].LamportTime, nil
}

func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
	bindAddr, err := resolveAddr(bindAddrOrInterface)
	if err != nil {
		return err
	}

	keys, _ := c.getKeys(subsysGossip)

	netDBConf := networkdb.DefaultConfig()
	netDBConf.BindAddr = listenAddr
	netDBConf.AdvertiseAddr = advertiseAddr
	netDBConf.Keys = keys
	if c.Config().NetworkControlPlaneMTU != 0 {
		// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
		// To be on the safe side let's cut 100 bytes
		netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100)
		log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
			c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
	}
	nDB, err := networkdb.New(netDBConf)
	if err != nil {
		return err
	}

	// Register the diagnostic handlers
	nDB.RegisterDiagnosticHandlers(c.diagnosticServer)

	var cancelList []func()
	ch, cancel := nDB.Watch(libnetworkEPTable, "")
	cancelList = append(cancelList, cancel)
	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
	cancelList = append(cancelList, cancel)

	c.mu.Lock()
	c.agent = &nwAgent{
		networkDB:         nDB,
		bindAddr:          bindAddr,
		advertiseAddr:     advertiseAddr,
		dataPathAddr:      dataPathAddr,
		coreCancelFuncs:   cancelList,
		driverCancelFuncs: make(map[string][]func()),
	}
	c.mu.Unlock()

	go c.handleTableEvents(ch, c.handleEpTableEvent)
	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)

	keys, tags := c.getKeys(subsysIPSec)
	c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
		if dr, ok := driver.(discoverapi.Discover); ok {
			if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
				Keys: keys,
				Tags: tags,
			}); err != nil {
				log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err)
			}
		}
		return false
	})

	c.WalkNetworks(joinCluster)

	return nil
}

func (c *Controller) agentJoin(remoteAddrList []string) error {
	agent := c.getAgent()
	if agent == nil {
		return nil
	}
	return agent.networkDB.Join(remoteAddrList)
}

func (c *Controller) agentDriverNotify(d discoverapi.Discover) {
	agent := c.getAgent()
	if agent == nil {
		return
	}

	if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
		Address:     agent.dataPathAddress(),
		BindAddress: agent.bindAddr.String(),
		Self:        true,
	}); err != nil {
		log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err)
	}

	keys, tags := c.getKeys(subsysIPSec)
	if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
		Keys: keys,
		Tags: tags,
	}); err != nil {
		log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err)
	}
}

func (c *Controller) agentClose() {
	// Acquire current agent instance and reset its pointer
	// then run closing functions
	c.mu.Lock()
	agent := c.agent
	c.agent = nil
	c.mu.Unlock()

	// when the agent is closed the cluster provider should be cleaned up
	c.SetClusterProvider(nil)

	if agent == nil {
		return
	}

	var cancelList []func()

	agent.mu.Lock()
	for _, cancelFuncs := range agent.driverCancelFuncs {
		cancelList = append(cancelList, cancelFuncs...)
	}

	// Add also the cancel functions for the network db
	cancelList = append(cancelList, agent.coreCancelFuncs...)
	agent.mu.Unlock()

	for _, cancel := range cancelList {
		cancel()
	}

	agent.networkDB.Close()
}

// Task has the backend container details
type Task struct {
	Name       string
	EndpointID string
	EndpointIP string
	Info       map[string]string
}

// ServiceInfo has service specific details along with the list of backend tasks
type ServiceInfo struct {
	VIP          string
	LocalLBIndex int
	Tasks        []Task
	Ports        []string
}

type epRecord struct {
	ep      EndpointRecord
	info    map[string]string
	lbIndex int
}

// Services returns a map of services keyed by the service name with the details
// of all the tasks that belong to the service. Applicable only in swarm mode.
func (n *Network) Services() map[string]ServiceInfo {
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}
	nwID := n.ID()
	d, err := n.driver(true)
	if err != nil {
		log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err)
		return nil
	}

	// Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
	eps := make(map[string]epRecord)
	c := n.getController()
	for eid, value := range agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID) {
		var epRec EndpointRecord
		if err := proto.Unmarshal(value.Value, &epRec); err != nil {
			log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err)
			continue
		}
		eps[eid] = epRecord{
			ep:      epRec,
			lbIndex: c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts),
		}
	}

	// Walk through the driver's tables, have the driver decode the entries
	// and return the tuple {ep ID, value}. value is a string that coveys
	// relevant info about the endpoint.
	for _, table := range n.driverTables {
		if table.objType != driverapi.EndpointObject {
			continue
		}
		for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
			epID, info := d.DecodeTableEntry(table.name, key, value.Value)
			if ep, ok := eps[epID]; !ok {
				log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
			} else {
				ep.info = info
				eps[epID] = ep
			}
		}
	}

	// group the endpoints into a map keyed by the service name
	sinfo := make(map[string]ServiceInfo)
	for ep, epr := range eps {
		s, ok := sinfo[epr.ep.ServiceName]
		if !ok {
			s = ServiceInfo{
				VIP:          epr.ep.VirtualIP,
				LocalLBIndex: epr.lbIndex,
			}
		}
		if s.Ports == nil {
			ports := make([]string, 0, len(epr.ep.IngressPorts))
			for _, port := range epr.ep.IngressPorts {
				ports = append(ports, fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort))
			}
			s.Ports = ports
		}
		s.Tasks = append(s.Tasks, Task{
			Name:       epr.ep.Name,
			EndpointID: ep,
			EndpointIP: epr.ep.EndpointIP,
			Info:       epr.info,
		})
		sinfo[epr.ep.ServiceName] = s
	}
	return sinfo
}

// clusterAgent returns the cluster agent if the network is a swarm-scoped,
// multi-host network.
func (n *Network) clusterAgent() (agent *nwAgent, ok bool) {
	if n.scope != scope.Swarm || !n.driverIsMultihost() {
		return nil, false
	}
	a := n.getController().getAgent()
	return a, a != nil
}

func (n *Network) joinCluster() error {
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}
	return agent.networkDB.JoinNetwork(n.ID())
}

func (n *Network) leaveCluster() error {
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}
	return agent.networkDB.LeaveNetwork(n.ID())
}

func (ep *Endpoint) addDriverInfoToCluster() error {
	if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
		return nil
	}
	n := ep.getNetwork()
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}

	nwID := n.ID()
	for _, te := range ep.joinInfo.driverTableEntries {
		if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil {
			return err
		}
	}
	return nil
}

func (ep *Endpoint) deleteDriverInfoFromCluster() error {
	if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
		return nil
	}
	n := ep.getNetwork()
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}

	nwID := n.ID()
	for _, te := range ep.joinInfo.driverTableEntries {
		if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil {
			return err
		}
	}
	return nil
}

func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
	if len(ep.dnsNames) == 0 || ep.Iface() == nil || ep.Iface().Address() == nil {
		return nil
	}

	n := ep.getNetwork()
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}

	sb.service.Lock()
	defer sb.service.Unlock()
	log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())

	// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
	// This is to handle a race between the EnableService and the sbLeave
	// It is possible that the EnableService starts, fetches the list of the endpoints and
	// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
	// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
	// This check under the Service lock of the sandbox ensure the correct behavior.
	// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
	// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
	// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
	// removed from the list, in this situation the delete will bail out not finding any data to cleanup
	// and the add will bail out not finding the endpoint on the sandbox.
	if err := sb.GetEndpoint(ep.ID()); err == nil {
		log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
		return nil
	}

	dnsNames := ep.getDNSNames()
	primaryDNSName, dnsAliases := dnsNames[0], dnsNames[1:]

	var ingressPorts []*PortConfig
	if ep.svcID != "" {
		// This is a task part of a service
		// Gossip ingress ports only in ingress network.
		if n.ingress {
			ingressPorts = ep.ingressPorts
		}
		if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), primaryDNSName, ep.virtualIP, ingressPorts, ep.svcAliases, dnsAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
			return err
		}
	} else {
		// This is a container simply attached to an attachable network
		if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), primaryDNSName, dnsAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
			return err
		}
	}

	buf, err := proto.Marshal(&EndpointRecord{
		Name:            primaryDNSName,
		ServiceName:     ep.svcName,
		ServiceID:       ep.svcID,
		VirtualIP:       ep.virtualIP.String(),
		IngressPorts:    ingressPorts,
		Aliases:         ep.svcAliases,
		TaskAliases:     dnsAliases,
		EndpointIP:      ep.Iface().Address().IP.String(),
		ServiceDisabled: false,
	})
	if err != nil {
		return err
	}

	if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
		log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
		return err
	}

	log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())

	return nil
}

func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error {
	if len(ep.dnsNames) == 0 {
		return nil
	}

	n := ep.getNetwork()
	agent, ok := n.clusterAgent()
	if !ok {
		return nil
	}

	sb.service.Lock()
	defer sb.service.Unlock()
	log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

	// Avoid a race w/ with a container that aborts preemptively.  This would
	// get caught in disableServiceInNetworkDB, but we check here to make the
	// nature of the condition more clear.
	// See comment in addServiceInfoToCluster()
	if err := sb.GetEndpoint(ep.ID()); err == nil {
		log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
		return nil
	}

	dnsNames := ep.getDNSNames()
	primaryDNSName, dnsAliases := dnsNames[0], dnsNames[1:]

	// First update the networkDB then locally
	if fullRemove {
		if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
			log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
		}
	} else {
		disableServiceInNetworkDB(agent, n, ep)
	}

	if ep.Iface() != nil && ep.Iface().Address() != nil {
		if ep.svcID != "" {
			// This is a task part of a service
			var ingressPorts []*PortConfig
			if n.ingress {
				ingressPorts = ep.ingressPorts
			}
			if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), primaryDNSName, ep.virtualIP, ingressPorts, ep.svcAliases, dnsAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
				return err
			}
		} else {
			// This is a container simply attached to an attachable network
			if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), primaryDNSName, dnsAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
				return err
			}
		}
	}

	log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())

	return nil
}

func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) {
	var epRec EndpointRecord

	log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())

	// Update existing record to indicate that the service is disabled
	inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
	if err != nil {
		log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
		return
	}
	// Should never fail
	if err := proto.Unmarshal(inBuf, &epRec); err != nil {
		log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
		return
	}
	epRec.ServiceDisabled = true
	// Should never fail
	outBuf, err := proto.Marshal(&epRec)
	if err != nil {
		log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
		return
	}
	// Send update to the whole cluster
	if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
		log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
	}
}

func (n *Network) addDriverWatches() {
	if len(n.driverTables) == 0 {
		return
	}
	agent, ok := n.clusterAgent()
	if !ok {
		return
	}

	c := n.getController()
	for _, table := range n.driverTables {
		ch, cancel := agent.networkDB.Watch(table.name, n.ID())
		agent.mu.Lock()
		agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
		agent.mu.Unlock()
		go c.handleTableEvents(ch, n.handleDriverTableEvent)
	}
}

func (n *Network) cancelDriverWatches() {
	agent, ok := n.clusterAgent()
	if !ok {
		return
	}

	agent.mu.Lock()
	cancelFuncs := agent.driverCancelFuncs[n.ID()]
	delete(agent.driverCancelFuncs, n.ID())
	agent.mu.Unlock()

	for _, cancel := range cancelFuncs {
		cancel()
	}
}

func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
	for {
		select {
		case ev := <-ch.C:
			fn(ev)
		case <-ch.Done():
			return
		}
	}
}

func (n *Network) handleDriverTableEvent(ev events.Event) {
	d, err := n.driver(false)
	if err != nil {
		log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
		return
	}

	var (
		etype driverapi.EventType
		tname string
		key   string
		value []byte
	)

	switch event := ev.(type) {
	case networkdb.CreateEvent:
		tname = event.Table
		key = event.Key
		value = event.Value
		etype = driverapi.Create
	case networkdb.DeleteEvent:
		tname = event.Table
		key = event.Key
		value = event.Value
		etype = driverapi.Delete
	case networkdb.UpdateEvent:
		tname = event.Table
		key = event.Key
		value = event.Value
		etype = driverapi.Delete
	}

	d.EventNotify(etype, n.ID(), tname, key, value)
}

func (c *Controller) handleNodeTableEvent(ev events.Event) {
	var (
		value    []byte
		isAdd    bool
		nodeAddr networkdb.NodeAddr
	)
	switch event := ev.(type) {
	case networkdb.CreateEvent:
		value = event.Value
		isAdd = true
	case networkdb.DeleteEvent:
		value = event.Value
	case networkdb.UpdateEvent:
		log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
	}

	err := json.Unmarshal(value, &nodeAddr)
	if err != nil {
		log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err)
		return
	}
	c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
}

func (c *Controller) handleEpTableEvent(ev events.Event) {
	var (
		nid   string
		eid   string
		value []byte
		epRec EndpointRecord
	)

	switch event := ev.(type) {
	case networkdb.CreateEvent:
		nid = event.NetworkID
		eid = event.Key
		value = event.Value
	case networkdb.DeleteEvent:
		nid = event.NetworkID
		eid = event.Key
		value = event.Value
	case networkdb.UpdateEvent:
		nid = event.NetworkID
		eid = event.Key
		value = event.Value
	default:
		log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
		return
	}

	err := proto.Unmarshal(value, &epRec)
	if err != nil {
		log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
		return
	}

	containerName := epRec.Name
	svcName := epRec.ServiceName
	svcID := epRec.ServiceID
	vip := net.ParseIP(epRec.VirtualIP)
	ip := net.ParseIP(epRec.EndpointIP)
	ingressPorts := epRec.IngressPorts
	serviceAliases := epRec.Aliases
	taskAliases := epRec.TaskAliases

	logger := log.G(context.TODO()).WithFields(log.Fields{
		"nid": nid,
		"eid": eid,
		"T":   fmt.Sprintf("%T", ev),
		"R":   epRec,
	})

	if containerName == "" || ip == nil {
		logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
		return
	}

	logger.Debug("handleEpTableEvent")

	switch ev.(type) {
	case networkdb.CreateEvent, networkdb.UpdateEvent:
		if svcID != "" {
			// This is a remote task part of a service
			if epRec.ServiceDisabled {
				if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
					logger.WithError(err).Error("failed disabling service binding")
					return
				}
			} else {
				if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
					logger.WithError(err).Error("failed adding service binding")
					return
				}
			}
		} else {
			// This is a remote container simply attached to an attachable network
			if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
				logger.WithError(err).Errorf("failed adding container name resolution")
			}
		}

	case networkdb.DeleteEvent:
		if svcID != "" {
			// This is a remote task part of a service
			if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
				logger.WithError(err).Error("failed removing service binding")
				return
			}
		} else {
			// This is a remote container simply attached to an attachable network
			if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
				logger.WithError(err).Errorf("failed removing container name resolution")
			}
		}
	}
}
