package networkdb

import (
	"errors"
	"time"

	"github.com/hashicorp/memberlist"
	"github.com/hashicorp/serf/serf"
)

const broadcastTimeout = 5 * time.Second

type networkEventMessage struct {
	id   string
	node string
	msg  []byte
}

func (m *networkEventMessage) Invalidates(other memberlist.Broadcast) bool {
	otherm := other.(*networkEventMessage)
	return m.id == otherm.id && m.node == otherm.node
}

func (m *networkEventMessage) Message() []byte {
	return m.msg
}

func (m *networkEventMessage) Finished() {
}

func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltime serf.LamportTime) error {
	nEvent := NetworkEvent{
		Type:      event,
		LTime:     ltime,
		NodeName:  nDB.config.NodeID,
		NetworkID: nid,
	}

	raw, err := encodeMessage(MessageTypeNetworkEvent, &nEvent)
	if err != nil {
		return err
	}

	nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
		msg:  raw,
		id:   nid,
		node: nDB.config.NodeID,
	})
	return nil
}

type nodeEventMessage struct {
	msg    []byte
	notify chan<- struct{}
}

func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool {
	return false
}

func (m *nodeEventMessage) Message() []byte {
	return m.msg
}

func (m *nodeEventMessage) Finished() {
	if m.notify != nil {
		close(m.notify)
	}
}

func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
	nEvent := NodeEvent{
		Type:     event,
		LTime:    nDB.networkClock.Increment(),
		NodeName: nDB.config.NodeID,
	}

	raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
	if err != nil {
		return err
	}

	notifyCh := make(chan struct{})
	nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
		msg:    raw,
		notify: notifyCh,
	})

	nDB.RLock()
	noPeers := len(nDB.nodes) <= 1
	nDB.RUnlock()

	// Message enqueued, do not wait for a send if no peer is present
	if noPeers {
		return nil
	}

	// Wait for the broadcast
	select {
	case <-notifyCh:
	case <-time.After(broadcastTimeout):
		return errors.New("timed out broadcasting node event")
	}

	return nil
}

type tableEventMessage struct {
	id    string
	tname string
	key   string
	msg   []byte
}

func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
	otherm := other.(*tableEventMessage)
	return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
}

func (m *tableEventMessage) Message() []byte {
	return m.msg
}

func (m *tableEventMessage) Finished() {
}

func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname string, key string, entry *entry) error {
	tEvent := TableEvent{
		Type:      event,
		LTime:     entry.ltime,
		NodeName:  nDB.config.NodeID,
		NetworkID: nid,
		TableName: tname,
		Key:       key,
		Value:     entry.value,
		// The duration in second is a float that below would be truncated
		ResidualReapTime: int32(entry.reapTime.Seconds()),
	}

	raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
	if err != nil {
		return err
	}

	var broadcastQ *memberlist.TransmitLimitedQueue
	nDB.RLock()
	thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
	if ok {
		// The network may have been removed
		network, networkOk := thisNodeNetworks[nid]
		if !networkOk {
			nDB.RUnlock()
			return nil
		}

		broadcastQ = network.tableBroadcasts
	}
	nDB.RUnlock()

	// The network may have been removed
	if broadcastQ == nil {
		return nil
	}

	broadcastQ.QueueBroadcast(&tableEventMessage{
		msg:   raw,
		id:    nid,
		tname: tname,
		key:   key,
	})
	return nil
}
