//go:build linux && cgo && !static_build && journald

package journald // import "github.com/docker/docker/daemon/logger/journald"

import (
	"context"
	"runtime"
	"strconv"
	"time"

	"github.com/containerd/log"
	"github.com/coreos/go-systemd/v22/journal"
	"github.com/docker/docker/api/types/backend"
	"github.com/docker/docker/daemon/logger"
	"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
)

const (
	closedDrainTimeout = 5 * time.Second
	waitInterval       = 250 * time.Millisecond
)

var _ logger.LogReader = (*journald)(nil)

// Fields which we know are not user-provided attribute fields.
var wellKnownFields = map[string]bool{
	"MESSAGE":             true,
	"MESSAGE_ID":          true,
	"PRIORITY":            true,
	"CODE_FILE":           true,
	"CODE_LINE":           true,
	"CODE_FUNC":           true,
	"ERRNO":               true,
	"SYSLOG_FACILITY":     true,
	fieldSyslogIdentifier: true,
	"SYSLOG_PID":          true,
	fieldSyslogTimestamp:  true,
	fieldContainerName:    true,
	fieldContainerID:      true,
	fieldContainerIDFull:  true,
	fieldContainerTag:     true,
	fieldImageName:        true,
	fieldPLogID:           true,
	fieldPLogOrdinal:      true,
	fieldPLogLast:         true,
	fieldPartialMessage:   true,
	fieldLogEpoch:         true,
	fieldLogOrdinal:       true,
}

type reader struct {
	s             *journald
	j             *sdjournal.Journal
	logWatcher    *logger.LogWatcher
	config        logger.ReadConfig
	maxOrdinal    uint64
	ready         chan struct{}
	drainDeadline time.Time
}

func getMessage(d map[string]string) (line []byte, ok bool) {
	m, ok := d["MESSAGE"]
	if ok {
		line = []byte(m)
		if d[fieldPartialMessage] != "true" {
			line = append(line, "\n"...)
		}
	}
	return line, ok
}

func getPriority(d map[string]string) (journal.Priority, bool) {
	if pri, ok := d["PRIORITY"]; ok {
		i, err := strconv.Atoi(pri)
		return journal.Priority(i), err == nil
	}
	return -1, false
}

// getSource recovers the stream name from the entry data by mapping from the
// journal priority field back to the stream that we would have assigned that
// value.
func getSource(d map[string]string) string {
	priority, ok := getPriority(d)
	if ok {
		switch priority {
		case journal.PriErr:
			return "stderr"
		case journal.PriInfo:
			return "stdout"
		default:
			return ""
		}
	}
	return ""
}

func getAttrs(d map[string]string) []backend.LogAttr {
	var attrs []backend.LogAttr
	for k, v := range d {
		if k[0] != '_' && !wellKnownFields[k] {
			attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
		}
	}
	return attrs
}

// The SeekXYZ() methods all move the journal read pointer to a "conceptual"
// position which does not correspond to any journal entry. A subsequent call to
// Next(), Previous() or similar is necessary to resolve the read pointer to a
// discrete entry.
// https://github.com/systemd/systemd/pull/5930#issuecomment-300878104
// But that's not all! If there is no discrete entry to resolve the position to,
// the call to Next() or Previous() will just leave the read pointer in a
// conceptual position, or do something even more bizarre.
// https://github.com/systemd/systemd/issues/9934

// initialSeekHead positions the journal read pointer at the earliest journal
// entry with a timestamp of at least r.config.Since. It returns true if there
// is an entry to read at the read pointer.
func (r *reader) initialSeekHead() (bool, error) {
	var err error
	if r.config.Since.IsZero() {
		err = r.j.SeekHead()
	} else {
		err = r.j.SeekRealtime(r.config.Since)
	}
	if err != nil {
		return false, err
	}
	return r.j.Next()
}

// initialSeekTail positions the journal read pointer at a journal entry
// relative to the tail of the journal at the time of the call based on the
// specification in r.config. It returns true if there is an entry to read at
// the read pointer. Otherwise the read pointer is set to a conceptual position
// which will be resolved to the desired entry (once written) by advancing
// forward with r.j.Next() or similar.
func (r *reader) initialSeekTail() (bool, error) {
	var err error
	if r.config.Until.IsZero() {
		err = r.j.SeekTail()
	} else {
		err = r.j.SeekRealtime(r.config.Until)
	}
	if err != nil {
		return false, err
	}

	var ok bool
	if r.config.Tail == 0 {
		ok, err = r.j.Previous()
	} else {
		var n int
		n, err = r.j.PreviousSkip(uint(r.config.Tail))
		ok = n > 0
	}
	if err != nil {
		return ok, err
	}
	if !ok {
		// The (filtered) journal has no entries. The tail is the head: all new
		// entries which get written into the journal from this point forward
		// should be read from the journal. However the read pointer is
		// positioned at a conceptual position which is not conducive to reading
		// those entries. The tail of the journal is resolved to the last entry
		// in the journal _at the time of the first successful Previous() call_,
		// which means that an arbitrary number of journal entries added in the
		// interim may be skipped: race condition. While the realtime conceptual
		// position is not so racy, it is also unhelpful: it is the timestamp
		// past where reading should stop, so all logs that should be followed
		// would be skipped over.
		// Reset the read pointer position to avoid these problems.
		return r.initialSeekHead()
	} else if r.config.Tail == 0 {
		// The journal read pointer is positioned at the discrete position of
		// the journal entry _before_ the entry to send.
		return r.j.Next()
	}

	// Check if the PreviousSkip went too far back.
	timestamp, err := r.j.Realtime()
	if err != nil {
		return false, err
	}
	if timestamp.Before(r.config.Since) {
		if err := r.j.SeekRealtime(r.config.Since); err != nil {
			return false, err
		}
		return r.j.Next()
	}
	return true, nil
}

// wait blocks until the journal has new data to read, the reader's drain
// deadline is exceeded, or the log reading consumer is gone.
func (r *reader) wait(ctx context.Context) (bool, error) {
	deadline := r.drainDeadline
	if d, ok := ctx.Deadline(); ok && d.Before(deadline) {
		deadline = d
	}

	for {
		if err := ctx.Err(); err != nil {
			return false, err
		}

		dur := waitInterval
		if !deadline.IsZero() {
			dur = time.Until(deadline)
			if dur < 0 {
				// Container is gone but we haven't found the end of the
				// logs before the deadline. Maybe it was dropped by
				// journald, e.g. due to rate-limiting.
				return false, nil
			} else if dur > waitInterval {
				dur = waitInterval
			}
		}
		status, err := r.j.Wait(dur)
		if err != nil {
			return false, err
		} else if status != sdjournal.StatusNOP {
			return true, nil
		}
		select {
		case <-r.logWatcher.WatchConsumerGone():
			return false, nil
		case <-ctx.Done():
			return false, ctx.Err()
		case <-r.s.closed:
			// Container is gone; don't wait indefinitely for journal entries that will never arrive.
			if r.maxOrdinal >= r.s.ordinal.Load() {
				return false, nil
			}
			if r.drainDeadline.IsZero() {
				r.drainDeadline = time.Now().Add(closedDrainTimeout)
			}
		default:
		}
	}
}

// nextWait blocks until there is a new journal entry to read, and advances the
// journal read pointer to it.
func (r *reader) nextWait(ctx context.Context) (bool, error) {
	for {
		if err := ctx.Err(); err != nil {
			return false, err
		}
		if ok, err := r.j.Next(); err != nil || ok {
			return ok, err
		}
		if ok, err := r.wait(ctx); err != nil || !ok {
			return false, err
		}
	}
}

// drainJournal reads and sends log messages from the journal, starting from the
// current read pointer, until the end of the journal or a terminal stopping
// condition is reached.
//
// It returns false when a terminal stopping condition has been reached:
//   - the watch consumer is gone, or
//   - (if until is nonzero) a log entry is read which has a timestamp after
//     until
func (r *reader) drainJournal(ctx context.Context) (bool, error) {
	for i := 0; ; i++ {
		// Read the entry's timestamp.
		timestamp, err := r.j.Realtime()
		if err != nil {
			return true, err
		}
		if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
			return false, nil
		}

		// Read and send the logged message, if there is one to read.
		data, err := r.j.Data()
		if err != nil {
			return true, err
		}

		if data[fieldLogEpoch] == r.s.epoch {
			seq, err := strconv.ParseUint(data[fieldLogOrdinal], 10, 64)
			if err == nil && seq > r.maxOrdinal {
				r.maxOrdinal = seq
			}
		}

		if line, ok := getMessage(data); ok {
			// Send the log message, unless the consumer is gone
			msg := &logger.Message{
				Line:      line,
				Source:    getSource(data),
				Timestamp: timestamp.In(time.UTC),
				Attrs:     getAttrs(data),
			}
			// The daemon timestamp will differ from the "trusted"
			// timestamp of when the event was received by journald.
			// We can efficiently seek around the journal by the
			// event timestamp, and the event timestamp is what
			// journalctl displays. The daemon timestamp is just an
			// application-supplied field with no special
			// significance; libsystemd won't help us seek to the
			// entry with the closest timestamp.
			/*
				if sts := data["SYSLOG_TIMESTAMP"]; sts != "" {
					if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil {
						msg.Timestamp = tv
					}
				}
			*/
			select {
			case <-r.logWatcher.WatchConsumerGone():
				return false, nil
			case <-ctx.Done():
				return false, ctx.Err()
			case r.logWatcher.Msg <- msg:
			}
		}

		// Call sd_journal_process() periodically during the processing loop
		// to close any opened file descriptors for rotated (deleted) journal files.
		if i != 0 && i%1024 == 0 {
			if _, err := r.j.Process(); err != nil {
				// log a warning but ignore it for now
				log.G(ctx).WithField("container", r.s.vars[fieldContainerIDFull]).
					WithField("error", err).
					Warn("journald: error processing journal")
			}
		}

		if err := ctx.Err(); err != nil {
			return false, err
		}

		if ok, err := r.j.Next(); err != nil || !ok {
			return true, err
		}
	}
}

func (r *reader) readJournal(ctx context.Context) error {
	caughtUp := r.s.ordinal.Load()
	if more, err := r.drainJournal(ctx); err != nil || !more {
		return err
	}

	if !r.config.Follow {
		if r.s.readSyncTimeout == 0 {
			return nil
		}
		r.drainDeadline = time.Now().Add(r.s.readSyncTimeout)
	}

	for {
		select {
		case <-r.s.closed:
			// container is gone, drain journal
			lastSeq := r.s.ordinal.Load()
			if r.maxOrdinal >= lastSeq {
				// All caught up with the logger!
				return nil
			}
		default:
		}

		if more, err := r.nextWait(ctx); err != nil || !more {
			return err
		}
		if more, err := r.drainJournal(ctx); err != nil || !more {
			return err
		}
		if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
			return nil
		}
	}
}

func (r *reader) readLogs(ctx context.Context) {
	defer close(r.logWatcher.Msg)

	// Make sure the ready channel is closed in the event of an early
	// return.
	defer r.signalReady()

	// Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
	//     Functions that operate on sd_journal objects are thread
	//     agnostic — given sd_journal pointer may only be used from one
	//     specific thread at all times (and it has to be the very same one
	//     during the entire lifetime of the object), but multiple,
	//     independent threads may use multiple, independent objects safely.
	//
	// sdjournal.Open returns a value which wraps an sd_journal pointer so
	// we need to abide by those rules.
	runtime.LockOSThread()
	defer runtime.UnlockOSThread()

	// Get a handle to the journal.
	var err error
	if r.s.journalReadDir != "" {
		r.j, err = sdjournal.OpenDir(r.s.journalReadDir, 0)
	} else {
		r.j, err = sdjournal.Open(0)
	}
	if err != nil {
		r.logWatcher.Err <- err
		return
	}
	defer r.j.Close()

	if r.config.Follow {
		// Initialize library inotify watches early
		if err := r.j.InitializeInotify(); err != nil {
			r.logWatcher.Err <- err
			return
		}
	}

	// Remove limits on the size of data items that we'll retrieve.
	if err := r.j.SetDataThreshold(0); err != nil {
		r.logWatcher.Err <- err
		return
	}
	// Add a match to have the library do the searching for us.
	if err := r.j.AddMatch(fieldContainerIDFull, r.s.vars[fieldContainerIDFull]); err != nil {
		r.logWatcher.Err <- err
		return
	}

	var ok bool
	if r.config.Tail >= 0 {
		ok, err = r.initialSeekTail()
	} else {
		ok, err = r.initialSeekHead()
	}
	if err != nil {
		r.logWatcher.Err <- err
		return
	}
	r.signalReady()
	if !ok {
		if !r.config.Follow {
			return
		}
		// Either the read pointer is positioned at a discrete journal entry, in
		// which case the position will be unaffected by subsequent logging, or
		// the read pointer is in the conceptual position corresponding to the
		// first journal entry to send once it is logged in the future.
		if more, err := r.nextWait(ctx); err != nil || !more {
			if err != nil {
				r.logWatcher.Err <- err
			}
			return
		}
	}

	if err := r.readJournal(ctx); err != nil {
		r.logWatcher.Err <- err
		return
	}
}

func (r *reader) signalReady() {
	select {
	case <-r.ready:
	default:
		close(r.ready)
	}
}

func (s *journald) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher {
	r := &reader{
		s:          s,
		logWatcher: logger.NewLogWatcher(),
		config:     config,
		ready:      make(chan struct{}),
	}
	go r.readLogs(ctx)
	// Block until the reader is in position to read from the current config
	// location to prevent race conditions in tests.
	<-r.ready
	return r.logWatcher
}

func waitUntilFlushedImpl(s *journald) error {
	if s.readSyncTimeout == 0 {
		return nil
	}

	ordinal := s.ordinal.Load()
	if ordinal == 0 {
		return nil // No logs were sent; nothing to wait for.
	}

	flushed := make(chan error)
	go func() {
		defer close(flushed)
		runtime.LockOSThread()
		defer runtime.UnlockOSThread()

		var (
			j   *sdjournal.Journal
			err error
		)
		if s.journalReadDir != "" {
			j, err = sdjournal.OpenDir(s.journalReadDir, 0)
		} else {
			j, err = sdjournal.Open(0)
		}
		if err != nil {
			flushed <- err
			return
		}
		defer j.Close()

		if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
			flushed <- err
			return
		}
		if err := j.AddMatch(fieldLogEpoch, s.epoch); err != nil {
			flushed <- err
			return
		}
		if err := j.AddMatch(fieldLogOrdinal, strconv.FormatUint(ordinal, 10)); err != nil {
			flushed <- err
			return
		}

		deadline := time.Now().Add(s.readSyncTimeout)
		for time.Now().Before(deadline) {
			if ok, err := j.Next(); ok {
				// Found it!
				return
			} else if err != nil {
				flushed <- err
				return
			}
			if _, err := j.Wait(100 * time.Millisecond); err != nil {
				flushed <- err
				return
			}
		}
		log.G(context.TODO()).WithField("container", s.vars[fieldContainerIDFull]).
			Warn("journald: deadline exceeded waiting for logs to be committed to journal")
	}()
	return <-flushed
}

func init() {
	waitUntilFlushed = waitUntilFlushedImpl
}
