アットランタイム

Docker のリングバッファとか ECS Shim Logger とか

はじめに

Docker のログドライバーの実装や、Containerd の shim logger 実装の一つである shim-loggers-for-containerd の実装をまとめる。

shim-loggers-for-containerd

ログドライバーは、コンテナの stdout/err をキャプチャし、対応した送信先にログを転送する。 転送先は、サイドカーやデーモンとして動作する fluentd や CloudWatch の実装である awslogs、ローカルに JSON 形式で保存する json などがある。 ecs shim logger は awslogs や、fluentd など Docker のログドライバーの一部に対応している。

ecs shim logger では、non-blocking mode に対応した NewBufferedLogger とブロッキングされる可能性のある NewLogger がある。

インターフェースとメソッド

 type LogDriver
    func NewBufferedLogger(l LogDriver, bufferReadSize int, maxBufferSize int, containerID string) LogDriver
    func NewLogger(options ...Opt) (LogDriver, error)

type LogDriver interface {
	// Start functions starts sending container logs to destination.
	Start(context.Context, *time.Duration, func() error) error
	// GetPipes gets pipes of container that exposed by containerd.
	GetPipes() (map[string]io.Reader, error)
	// Log sends logs to destination.
	Log(*dockerlogger.Message) error
	// Read reads a single log message from container pipe and sends it to
	// destination or saves it to ring buffer, depending on the mode of log
	// driver.
	Read(context.Context, io.Reader, string, int, sendLogToDestFunc) error
}

type Logger
    func (l *Logger) GetPipes() (map[string]io.Reader, error)
    func (l *Logger) Log(message *dockerlogger.Message) error
    func (l *Logger) Read(ctx context.Context, pipe io.Reader, source string, bufferSizeInBytes int, ...) error
    func (l *Logger) Start(ctx context.Context, cleanupTime *time.Duration, ready func() error) error

コンテナの stdout/err

コンテナの stdout(fd 1)と stderr(fd 2)は コンテナの作成時に shim が作成したパイプの fd に置き換わっている。 コンテナが stdout/err に書き込むと、それは上記パイプにつながっている。 そのため、ランタイムがパイプから定期的に入力を読み出し適切な処理を行うことでログドライバーを実現している。 なお、containerd ではパイプの繋がる先として binary、fifo と file を提供している。 ecs の実装は binary である。

binary は具体的に、

  1. shim が stdout/err 用のパイプ(pip2 システムコール)を作成する。
  2. shim が ecs shim logger のバイナリを実行する。実行時に ecs shim logger の fd 3 と 4 に 1. のパイプの reader を渡す
  3. shim は runc を通じてコンテナの fd 1 と 2 にパイプの writer を渡す
  4. ecs shim logger は fd 3 と 4 からコンテナのログを定期的に読み込み処理をする

shim 側実装

// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
(snip)
	out, err := newPipe()
	if err != nil {
		return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
	}
	closers = append(closers, out.Close)

	serr, err := newPipe()
	if err != nil {
		return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
	}
	closers = append(closers, serr.Close)

	r, w, err := os.Pipe()
	if err != nil {
		return nil, err
	}
	closers = append(closers, r.Close, w.Close)

	cmd := NewBinaryCmd(uri, id, ns)
	cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
	// don't need to register this with the reaper or wait when
	// running inside a shim
	if err := cmd.Start(); err != nil {
		return nil, fmt.Errorf("failed to start binary process: %w", err)
	}

ecs shim logger 側の実装(shim のライブラリを使用)

func Run(fn LoggerFunc) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	config := &Config{
		ID:        os.Getenv("CONTAINER_ID"),
		Namespace: os.Getenv("CONTAINER_NAMESPACE"),
		Stdout:    os.NewFile(3, "CONTAINER_STDOUT"),
		Stderr:    os.NewFile(4, "CONTAINER_STDERR"),
	}
	var (
		sigCh = make(chan os.Signal, 32)
		errCh = make(chan error, 1)
		wait  = os.NewFile(5, "CONTAINER_WAIT")
	)
	signal.Notify(sigCh, unix.SIGTERM)

	go func() {
		errCh <- fn(ctx, config, wait.Close)
	}()
    (snip)

ecs shim logger 側の処理

func runFluentdDriver(globalArgs *logger.GlobalArgs) {
	args := getFluentdArgs()
	loggerArgs := fluentd.InitLogger(globalArgs, args)
	logging.Run(loggerArgs.RunLogDriver)
}

blocking mode と non-blocking mode

前述の通り、ecs shim logger は stdout/err とつながっているパイプから定期的にログを読み出し、適切な処理を行うことでログドライバーを実現している。 パイプにはバッファサイズがある。多くの Linux 環境では、64 KB がデフォルトだと思われる。設定可能な最大サイズは /proc/sys/fs/pipe-max-size で確認できる。

このパイプはノンブロッキングではないため、バッファが一杯の時の stdout/err への書き込みはブロックされる。 ブロックの発生はコンテナのアプリケーションの処理が停止を意味する。 特にシングルスレッドの同期 I/O のアプリケーションであれば、アプリケーションの処理が完全に停止してしまう。

上記パイプへの書き込みのブロックは、コンテナによる stdout/err への書き込みに対して、ランタイムによるパイプからのデータの読み出しが追いつかない時に発生する。 特にアプリケーションのログ出力のスループットが高く、ログドライバーがインターネット越しの API を実行する場合、ログの出力に対して API 実行のレイテンシーが大きいので発生する可能性がある。

上記問題に対して、ecs shim logger では non-blocking mode を使用することで、ログの欠損の可能性がありつつもアプリケーションのブロックを回避できる可能性が高くなる。 具体的に ecs shim logger は内部でリングバッファを用意し、パイプからリングバッファへコピーする go routine と リングバッファからデータを読み出しログドライバーの処理をする go routine を実行する。 簡単に言えば、内部実装としてキュー(リングバッファ)を介したキューワーカーアーキテクチャと言える。 留意点としてリングバッファの性質上、パイプからリングバッファへの書き込みに対して、リングバッファからの読み込みが追いつかない場合、ログが欠損する可能性がある。 なお、リングバッファのサイズはユーザが指定可能である。

blocking mode の実装

サイズを決定する変数として、パイプからの一度で読み込み可能な最大サイズと、パイプから読み出したログを保持するバッファのサイズがある。 デフォルトでは、前者が defaultMaxReadBytes として 2KiB、後者が DefaultBufSizeInBytes として 16 KiB である。 これはログドライバーの実装よって上書きされる可能性がある。

初期化

NewLogger は LogDriver インターフェースを返す。デフォルト値を上書きする場合、Opt で指定する。

// NewLogger creates a LogDriver with the provided LoggerOpt.
func NewLogger(options ...Opt) (LogDriver, error) {
	l := &Logger{
		Info:              &dockerlogger.Info{},
		bufferSizeInBytes: DefaultBufSizeInBytes,
		maxReadBytes:      defaultMaxReadBytes,
	}
	for _, opt := range options {
		opt(l)
	}
	return l, nil
}

開始

Start はパイプからの読み出しとログドライバーの処理を開始する。 具体的に stdout/err のパイプ(io.Reader)に対応した go routine をそれぞれ実行する。 Start は全ての go routine の終了を待つが、それはパイプが全てクローズ(コンテナが終了)した場合か何かしらの問題が発生した場合である。

func (l *Logger) Start(
	ctx context.Context,
	cleanupTime *time.Duration,
	ready func() error,
) error {
	pipeNameToPipe, err := l.GetPipes()

(snip)
	errGroup, ctx := errgroup.WithContext(ctx)
	for pn, p := range pipeNameToPipe {
		// Copy pn and p to new variables source and pipe, accordingly.
		source := pn
		pipe := p

		errGroup.Go(func() error {
			logErr := l.sendLogs(ctx, pipe, source, cleanupTime)
			if logErr != nil {
				err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
				debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
				return err
			}
			return nil
		})
	}

	// Signal that the container is ready to be started
	if err := ready(); err != nil {
		return fmt.Errorf("failed to check container ready status: %w", err)
	}

	// Wait() will return the first error it receives.
	return errGroup.Wait()

送信

sendLogsStart とさほど変わらない。 Read はパイプから読み出せる限り読み出す。Read が終了するのはパイプが close した場合か、何か問題が生じた場合である。 終了した場合、最後のログが十分に転送できるよう待機する。

// sendLogs sends logs to destination.
func (l *Logger) sendLogs(
	ctx context.Context,
	f io.Reader,
	source string,
	cleanupTime *time.Duration,
) error {
	if err := l.Read(ctx, f, source, l.bufferSizeInBytes, l.sendLogMsgToDest); err != nil {
		err := fmt.Errorf("failed to read logs from %s pipe: %w", source, err)
		debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
		return err
	}

	// Sleep sometime to let shim logger clean up, for example, to allow enough time for the last
	// few log messages be flushed to destination like CloudWatch.
	debug.SendEventsToLog(DaemonName,
		fmt.Sprintf("Pipe %s is closed. Sleeping %s for cleanning up.", source, cleanupTime.String()),
		debug.INFO,
		0)
	time.Sleep(*cleanupTime)
	return nil
}

buf はパイプから読み出したログを保持する変数である。デフォルトは DefaultBufSizeInBytes の 16 KiB のバッファを確保する。 partial 系 は、一つのログのサイズが大きすぎる場合に分割する際に、分割したことを表すフラグである。 bytesInBufferbuf 内で、どこまでが未処理のログであるかを示すカーソルとなる。

readFromContainerPipe がパイプからログを読み出し buf に格納する役割を担う。

// Read gets container logs, saves them to our own buffer. Then we will read logs line by line
// and send them to destination. In non-blocking mode, the destination is the ring buffer. More
// log messages will be sent in verbose mode for debugging.
func (l *Logger) Read(
	ctx context.Context,
	pipe io.Reader,
	source string,
	bufferSizeInBytes int,
	sendLogMsgToDest sendLogToDestFunc,
) error {
	var (
		msgTimestamp  time.Time
		bytesInBuffer int
		err           error
		eof           bool
	)
	// Initiate an in-memory buffer to hold bytes read from container pipe.
	buf := make([]byte, bufferSizeInBytes)
	// isFirstPartial indicates if current message saved in buffer is not a complete line,
	// and is the first partial of the whole log message. Initialize to true.
	isFirstPartial := true
	// isPartialMsg indicates if current message is a partial log message. Initialize to false.
	isPartialMsg := false
	// isLastPartial indicates if this message completes a partial message
	isLastPartial := false
	// partialID is a random ID given to each split message
	partialID := ""
	// partialOrdinal orders the split messages and count up from 1
	partialOrdinal := 1

	for {
		select {
		case <-ctx.Done():
(snip)
		default:
			eof, bytesInBuffer, err = readFromContainerPipe(pipe, buf, bytesInBuffer, l.maxReadBytes)
			if err != nil {
				return err
			}
(snip)

readFromContainerPipe が読み込むバイト数は、デフォルト 2KiB もしくは bytesInBuffer - デフォルト 16KiB の小さい方となる。 言い換えると、最大で 2KiB であり、もし buf の残りサイズが 2KiB にも満たなければ buf の残りの容量のサイズとなる。 読み込んだバイト数を bytesInBuffer に加算して終える。 EOF はパイプが close した時に返される。もし、パイプにデータがない場合、pipe.Read はブロックされる。

// readFromContainerPipe reads bytes from container pipe, upto max read size in bytes of 2048.
func readFromContainerPipe(pipe io.Reader, buf []byte, bytesInBuffer, maxReadBytes int) (bool, int, error) {
	// eof indicates if we have already met EOF error.
	eof := false
	// Decide how many bytes we can read from container pipe for this iteration. It's either
	// the current bytes in buffer plus 2048 bytes or the available spaces left, whichever is
	// smaller.
	readBytesUpto := int(math.Min(float64(bytesInBuffer+maxReadBytes), float64(cap(buf))))
	// Read logs from container pipe if there are available spaces for new log messages.
	if readBytesUpto > bytesInBuffer {
		readBytesFromPipe, err := pipe.Read(buf[bytesInBuffer:readBytesUpto])
		if err != nil {
			if err != io.EOF {
				return false, bytesInBuffer, fmt.Errorf("failed to read log stream from container pipe: %w", err)
			}
			// Pipe is closed, set flag to true.
			eof = true
		}
		atomic.AddUint64(&bytesReadFromSrc, uint64(readBytesFromPipe))
		bytesInBuffer += readBytesFromPipe
	}

	return eof, bytesInBuffer, nil
}

headbuf 内の未処理のログの先頭を示すカーソルなので、buf[head:bytesInBuffer] が未処理のログを表す。 未処理のログのうち、改行単位でログを転送するため bytes.IndexByte で改行の位置(lenOfLine)を見つけ、buf[head:head+lenOfLine] のログを転送する。 ログを転送したら head の位置を head + lenOfLine + 1 までずらし、buf[head:bytesInBuffer] に改行が含まれている限り同じ処理をする。 バッファに複数の改行を含むログがある場合、改行がなくなるまでパイプからは読み出されない。

			// If container pipe is closed and no bytes left in our buffer, directly return.
			if eof && bytesInBuffer == 0 {g
				return nil
			}

			// Iteratively scan the unread part in our own buffer and read logs line by line.
			// Then send it to destination.
			head := 0
			// This function returns -1 if '\n' in not present in buffer.
			lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
			for lenOfLine >= 0 {
				// If this is the end of a partial message
				// use the existing timestamp, so that all
				// partials split from the same message have the same timestamp
				// If not, new timestamp.
				if isPartialMsg {
					isLastPartial = true
				} else {
					msgTimestamp = time.Now().UTC()
				}
				curLine := buf[head : head+lenOfLine]
				err = sendLogMsgToDest(
					curLine,
					source,
					isPartialMsg,
					isLastPartial,
					partialID,
					partialOrdinal,
					msgTimestamp,
				)
				if err != nil {
					return err
				}

				atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
				atomic.AddUint64(&numberOfNewLineChars, 1)
				// Since we have found a newline symbol, it means this line has ended.
				// Reset flags.
				isFirstPartial = true
				isPartialMsg = false
				isLastPartial = false
				partialID = ""
				partialOrdinal = 1

				// Update the index of head of next line message.
				head += lenOfLine + 1
				lenOfLine = bytes.IndexByte(buf[head:bytesInBuffer], newline)
			}

Read に戻ると未処理のログについて、改行単位のログの転送が終了したら、パイプが close している(EOF)か、バッファが一杯であるか確認する。 バッファが一杯とは head が 0 かつ bytesInBuffer == len(buf) が成り立つ時、つまり buf のデータが全て改行を含まない未処理のログである。

改行を含まないため、isPartialMsg フラグを有効にしてバッファのログを転送する。 なお、partial フラグが有効化され、次の読み込みで改行が含まれる場合、次のログは isLastPartial を有効化し転送する。 転送すると headbytesInBuffer を 0 にクリアする。

			// If the pipe is closed and the last line does not end with a newline symbol, send whatever left
			// in the buffer to destination as a single log message. Or if our buffer is full but there is
			// no newline symbol yet, record it as a partial log message and send it as a single log message
			// to destination.
			if eof || bufferIsFull(buf, head, bytesInBuffer) {
				// Still bytes left in the buffer after we identified all newline symbols.
				if head < bytesInBuffer {
					curLine := buf[head:bytesInBuffer]

					// Record as a partial message.
					isPartialMsg = true
					if isFirstPartial {
						msgTimestamp = time.Now().UTC()
						partialID, err = generateRandomID()
					}
					if err != nil {
						return err
					}

					err = sendLogMsgToDest(
						curLine,
						source,
						isPartialMsg,
						isLastPartial,
						partialID,
						partialOrdinal,
						msgTimestamp,
					)
					if err != nil {
						return err
					}

					atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
					// reset head and bytesInBuffer
					head = 0
					bytesInBuffer = 0
					// increment partial flags
					partialOrdinal++
					if isFirstPartial {
						// if this was the first partial message
						// the next one is not the first if it is also partial
						isFirstPartial = false
					}
				}


				// If pipe is closed after we send all bytes left in buffer, then directly return.
				if eof {
					return nil
				}
			}

Read の最後について、改行のログを処理し EOF やバッファが一杯ではない場合、未処理のログをバッファの先頭にコピーする。 そのため、次のパイプからの読みでは常に head = 0 から bytesInBuffer までが未処理のログとなる。

			// If there are any bytes left in the buffer, move them to the head and handle them in the
			// next round.
			if head > 0 {
				copy(buf[0:], buf[head:bytesInBuffer])
				bytesInBuffer -= head
			}
		}
	}
}

sendLogMsgToDest はパイプから読み出したログを転送する関数であり、[]byte のログを Docker の Message に変換して Log メソッドを実行する。 Log メソッドは各ログドライバーの実装の Log メソッドを実行する。

// sendLogMsgToDest sends a single line of log message to destination.
func (l *Logger) sendLogMsgToDest(
	line []byte,
	source string,
	isPartialMsg, isLastPartial bool,
	partialID string,
	partialOrdinal int,
	msgTimestamp time.Time,
) error {
(snip)

	message := newMessage(line, source, msgTimestamp)
	if isPartialMsg {
		message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
	}
	err := l.Log(message)
	if err != nil {
		return fmt.Errorf("failed to log msg for container %s: %w", l.Info.ContainerName, err)
	}

	return nil
}

// Log sends logs to destination.
func (l *Logger) Log(message *dockerlogger.Message) error {
	return l.Stream.Log(message)
}

転送の実態

ログドライバーの Log メソッドは splunkawslogsfluentd どれも Log インターフェースを満たす Docker の実装を使い回している。

// Client is a wrapper for docker logger's Log method, which is mostly used for testing
// purposes.
type Client interface {
	Log(*dockerlogger.Message) error
}

Docker ログドライバーの Log メソッドによるログの転送は同期実行に思えるが実装によっては非同期である。

non-blocking mode の実装

これは blocking mode の NewLogger をオーバーライドする形で実装される。以降 blocking mode の実装を common と呼ぶ。

初期化

blocking mode で使用した LogDriver インターフェースを引数に受け取る。 bufferReadSize は、common の Read に渡される値であり、前述の buf のサイズを決定する値である。 maxBufferSize はリングバッファの最大バッファサイズを表し、maxSizeInBytes にマップされる。 未送信のログの合計サイズが maxBufferSize を超えるとログは破棄される(欠損する)。

func NewBufferedLogger(l LogDriver, bufferReadSize int, maxBufferSize int, containerID string) LogDriver {
	return &bufferedLogger{
		l:                  l,
		buffer:             newLoggerBuffer(maxBufferSize),
		bufReadSizeInBytes: bufferReadSize,
		containerID:        containerID,
	}
}

開始

common に比べると特徴は 2 つである。

  1. パイプからリングバッファへ、ログをコピーする go routine をパイプごとに実行する
  2. リングバッファからログを読み出し、ログドライバーの処理をする go routine を 1 つ実行する
// Start starts the non-blocking mode logger.
func (bl *bufferedLogger) Start(
	ctx context.Context,
	cleanupTime *time.Duration,
	ready func() error,
) error {
	pipeNameToPipe, err := bl.l.GetPipes()
	if err != nil {
		return err
	}
(snip)
	errGroup, ctx := errgroup.WithContext(ctx)
	// Start the goroutine of underlying log driver to consume logs from ring buffer and
	// send logs to destination when there's any.
	errGroup.Go(func() error {
		debug.SendEventsToLog(DaemonName, "Starting consuming logs from ring buffer", debug.INFO, 0)
		return bl.sendLogMessagesToDestination(cleanupTime)
	})

	// Start reading logs from container pipes.
	for pn, p := range pipeNameToPipe {
		// Copy pn and p to new variables source and pipe, accordingly.
		source := pn
		pipe := p

		errGroup.Go(func() error {
			debug.SendEventsToLog(DaemonName, fmt.Sprintf("Reading logs from pipe %s", source), debug.DEBUG, 0)
			logErr := bl.saveLogMessagesToRingBuffer(ctx, pipe, source)
			if logErr != nil {
				err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
				debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
				return err
			}
			return nil
		})
	}

	// Signal that the container is ready to be started
	if err := ready(); err != nil {
		return fmt.Errorf("failed to check container ready status: %w", err)
	}

	// Wait() will return the first error it receives.
	return errGroup.Wait()
}

リングバッファ

curSizeInBytes は現在のリングバッファ内の未処理のログのサイズの合計を表す。 curSizeInBytesmaxSizeInBytes を超えると、そのログを破棄する。

// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L128
// as this struct is not exported.
type ringBuffer struct {
	// A mutex lock is used here when writing/reading log messages from the queue
	// as there exists three go routines accessing the buffer.
	lock sync.Mutex
	// A condition variable wait is used here to notify goroutines that get access to
	// the buffer should wait or continue.
	wait *sync.Cond
	// current total bytes stored in the buffer
	curSizeInBytes int
	// maximum bytes capacity provided by the buffer
	maxSizeInBytes int
	// queue saves all the log messages read from pipes exposed by containerd, and
	// is consumed by underlying log driver.
	queue []*dockerlogger.Message
	// closedPipesCount is the number of closed container pipes for a single container.
	closedPipesCount int
	// isClosed indicates if ring buffer is closed.
	isClosed bool
}

パイプからリングバッファへ

bl.Read は common の Read を実行するだけである。 common との違いは、引数のログを転送する関数が saveSingleLogMessageToRingBuffer という点である。 これにより、パイプから読み込んだログは、ログドライバーの転送処理ではなく、リングバッファへの転送処理がされる。

// saveLogMessagesToRingBuffer saves container log messages to ring buffer.
func (bl *bufferedLogger) saveLogMessagesToRingBuffer(
	ctx context.Context,
	f io.Reader,
	source string,
) error {
	if err := bl.Read(ctx, f, source, bl.bufReadSizeInBytes, bl.saveSingleLogMessageToRingBuffer); err != nil {
		err := fmt.Errorf("failed to read logs from %s pipe: %w", source, err)
		debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
		return err
	}

	// No messages in the pipe, send signal to closed pipe channel.
	debug.SendEventsToLog(DaemonName, fmt.Sprintf("Pipe %s is closed", source), debug.INFO, 1)
	bl.buffer.closedPipesCount++
	// If both container pipes are closed, wake up the Dequeue goroutine which is waiting on wait.
	if bl.buffer.closedPipesCount == expectedNumOfPipes {
		bl.buffer.isClosed = true
		bl.buffer.wait.Broadcast()
	}

	return nil
}

// Read reads log messages from container pipe and saves them to ring buffer line by line.
func (bl *bufferedLogger) Read(
	ctx context.Context,
	pipe io.Reader,
	source string,
	bufferSizeInBytes int,
	sendLogMsgToDest sendLogToDestFunc,
) error {
	return bl.l.Read(ctx, pipe, source, bufferSizeInBytes, sendLogMsgToDest)
}

saveSingleLogMessageToRingBuffer は、改行で分割された一つのログメッセージ([]byte)を Docker 形式のログメッセージに変換し、リングバッファへエンキューする。

// saveSingleLogMessageToRingBuffer enqueues a single line of log message to ring buffer.
func (bl *bufferedLogger) saveSingleLogMessageToRingBuffer(
	line []byte,
	source string,
	isPartialMsg, isLastPartial bool,
	partialID string,
	partialOrdinal int,
	msgTimestamp time.Time,
) error {
(snip)
	message := newMessage(line, source, msgTimestamp)
	if isPartialMsg {
		message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
	}
	err := bl.buffer.Enqueue(message)
	if err != nil {
		return fmt.Errorf("failed to save logs to buffer: %w", err)
	}

	return nil
}

エンキュー

既にキューにメッセージがあり、未処理のログとこれから処理するログの合計サイズが maxBufferSize(maxSizeInBytes) を超える場合、Dequeue(ログの送信用)の go routine を起動させる。 この場合、引数のログメッセージはエンキューされないためログは欠損する(最も古いログメッセージが破棄されるわけではない点に注意)。

maxBufferSize(maxSizeInBytes)を超えない場合、queue にメッセージを追加および未処理のログのサイズを加算し、ログの送信用の go routine を起動させる。

// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L155
// as messageRing struct is not exported.
// Enqueue adds a single log message to the tail of intermediate buffer.
func (b *ringBuffer) Enqueue(msg *dockerlogger.Message) error {
	b.lock.Lock()
	defer b.lock.Unlock()

	lineSizeInBytes := len(msg.Line)
	// If there is already at least one log message in the queue and not enough space left
	// for the new coming log message to take up, drop this log message. Otherwise, save this
	// message to ring buffer anyway.
	if len(b.queue) > 0 &&
		b.curSizeInBytes+lineSizeInBytes > b.maxSizeInBytes {
		if debug.Verbose {
			debug.SendEventsToLog(DaemonName,
				"buffer is full/message is too long, waiting for available bytes",
				debug.DEBUG, 0)
			debug.SendEventsToLog(DaemonName,
				fmt.Sprintf("message size: %d, current buffer size: %d, max buffer size %d",
					lineSizeInBytes,
					b.curSizeInBytes,
					b.maxSizeInBytes),
				debug.DEBUG, 0)
		}

		// Wake up "Dequeue" or the other "Enqueue" go routine (called by the other pipe)
		// waiting on current mutex lock if there's any
		b.wait.Signal()
		return nil
	}

	b.queue = append(b.queue, msg)
	b.curSizeInBytes += lineSizeInBytes
	// Wake up "Dequeue" or the other "Enqueue" go routine (called by the other pipe)
	// waiting on current mutex lock if there's any
	b.wait.Signal()

	return nil
}

リングバッファからログの転送

Start の go routine で実行された sendLogMessagesToDestination は、パイプがクローズするまで for-loop で sendLogMessageToDestination を実行する。

// sendLogMessagesToDestination consumes logs from ring buffer and use the
// underlying log driver to send logs to destination.
func (bl *bufferedLogger) sendLogMessagesToDestination(cleanupTime *time.Duration) error {
	// Keep sending log message to destination defined by the underlying log driver until
	// the ring buffer is closed.
	for !bl.buffer.isClosed {
		if err := bl.sendLogMessageToDestination(); err != nil {
			debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
			return err
		}
	}
	// If both container pipes are closed, flush messages left in ring buffer.
	debug.SendEventsToLog(DaemonName, "All pipes are closed, flushing buffer.", debug.INFO, 0)
	if err := bl.flushMessages(); err != nil {
		debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
		return err
	}

	// Sleep sometime to let shim logger clean up, for example, to allow enough time for the last
	// few log messages be flushed to destination like CloudWatch.
	debug.SendEventsToLog(DaemonName,
		fmt.Sprintf("Sleeping %s for cleanning up.", cleanupTime.String()),
		debug.INFO, 0)
	time.Sleep(*cleanupTime)
	return nil
}

sendLogMessageToDestination はリングバッファからログメッセージをデキューし、ログドライバーの Log を実行する。

// sendLogMessageToDestination dequeues a single log message from buffer and sends to destination.
func (bl *bufferedLogger) sendLogMessageToDestination() error {
	msg, err := bl.buffer.Dequeue()
	// Do an early return if ring buffer is closed.
	if bl.buffer.isClosed {
		return nil
	}
	if err != nil {
		return fmt.Errorf("failed to read logs from buffer: %w", err)
	}

	err = bl.Log(msg)
	if err != nil {
		return fmt.Errorf("failed to send logs to destination: %w", err)
	}

	return nil
}

// Log lets underlying log driver send logs to destination.
func (bl *bufferedLogger) Log(message *dockerlogger.Message) error {
	if debug.Verbose {
		debug.SendEventsToLog(DaemonName,
			fmt.Sprintf("[BUFFER] Sending message: %s", string(message.Line)),
			debug.DEBUG, 0)
	}
	return bl.l.Log(message)
}

デキュー

もし、キューにメッセージがなければ go routine を一時停止する。 メッセージがあれば、一つ取り出してそれを返す。また、現在の未処理のログメッセージのサイズを減算する。

// Dequeue gets a line of log message from the head of intermediate buffer.
func (b *ringBuffer) Dequeue() (*dockerlogger.Message, error) {
	b.lock.Lock()
	defer b.lock.Unlock()

	// If there is no log yet in the buffer, and the ring buffer is still open, wait
	// suspends current go routine.
	for len(b.queue) == 0 && !b.isClosed {
		if debug.Verbose {
			debug.SendEventsToLog(DaemonName,
				"No messages in queue, waiting...",
				debug.DEBUG, 0)
		}
		b.wait.Wait()
	}

	// Directly return if ring buffer is closed.
	if b.isClosed {
		return nil, nil //nolint: nilnil // swallow the error
	}

	// Get and remove the oldest message saved in buffer/queue from head and update
	// the current used bytes of buffer.
	msg := b.queue[0]
	b.queue = b.queue[1:]
	b.curSizeInBytes -= len(msg.Line)

	return msg, nil
}

キューのサイズ

cap が 1000 としてスライスが作成されるためデフォルトは 1000 である。 また、append でキューへ追加されるため、もし 1000 を超える場合も自動で拡張される。

	ringCap = 1000
(snip)

func newLoggerBuffer(maxBufferSize int) *ringBuffer {
	rb := &ringBuffer{
		maxSizeInBytes:   maxBufferSize,
		queue:            make([]*dockerlogger.Message, 0, ringCap),
		closedPipesCount: 0,
		isClosed:         false,
	}
	rb.wait = sync.NewCond(&rb.lock)

	return rb
}

Docker の Log メソッド

ecs shim logger でも使用されているログドライバーの Log メソッドを見ていく。

fluentd

fluent-logger-golang の実装に依存しておりシンプルである。

Log メソッド

import (
(snip)
	"github.com/fluent/fluent-logger-golang/fluent"


const (
	defaultBufferLimit = 1024 * 1024

(snip)

func (f *fluentd) Log(msg *logger.Message) error {
	data := map[string]string{
		"container_id":   f.containerID,
		"container_name": f.containerName,
		"source":         msg.Source,
		"log":            string(msg.Line),
	}
(snip)

	// fluent-logger-golang buffers logs from failures and disconnections,
	// and these are transferred again automatically.
	return f.writer.PostWithTime(f.tag, ts, data)
}

PostWithTime は最終的に postRawData を呼び出す。 Async が有効であればバッファリングされる。Async は Docker の fluentd-async に対応している(デフォルトで無効)。

Async が無効であれば、バッファリングされず(UDS かもしれないが)ネットワーク越しに送信を実行するため、この送信によって律速される可能性がある。

func (f *Fluent) postRawData(msg *msgToSend) error {
	if f.Config.Async {
		return f.appendBuffer(msg)
	}

	// Synchronous write
	if f.closed {
		return fmt.Errorf("fluent#postRawData: Logger already closed")
	}
	return f.writeWithRetry(context.Background(), msg)
}

バッファサイズ(キューサイズ pending)は、デフォルトでは Docker 側の定義の 1,048,576 である。 ログの合計バイトではなくキューのサイズなので、このキューで問題になることはなさそうであるが、もしバッファサイズを超える場合そのログは破棄されるため、メソッドはブロックされない。

しかし、キューを溢れた場合や同期の writeWithRetry が失敗した場合、そのエラーは Log まで伝播しさらに ecs shim logger まで伝播しそうである。 結果として、ecs shim logger は停止し、パイプのバッファが一杯になり、アプリケーションがブロックしそうである。

func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
(snip)
	if config.Async {
		ctx, cancel := context.WithCancel(context.Background())

		f = &Fluent{
			Config:         config,
			dialer:         d,
			stopRunning:    make(chan struct{}),
			cancelDialings: cancel,
			pending:        make(chan *msgToSend, config.BufferLimit),
			pendingMutex:   sync.RWMutex{},
			muconn:         sync.RWMutex{},
		}

		f.wg.Add(1)
		go f.run(ctx)
(snip)

// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(msg *msgToSend) error {
	f.pendingMutex.RLock()
	defer f.pendingMutex.RUnlock()
	if f.closed {
		return fmt.Errorf("fluent#appendBuffer: Logger already closed")
	}
	select {
	case f.pending <- msg:
	default:
		return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
	}
	return nil
}

awslogs

awslogs の Log は、fluentd とは異なりキュー(messages)にメッセージを送信して終了するため、常に成功する。 messages のサイズは、デフォルト(defaultMaxBufferedEvents)で 4096 である。 select を使用していないため、キューが一杯の場合、Log はブロックされる。

Log メソッド

// Log submits messages for logging by an instance of the awslogs logging driver
func (l *logStream) Log(msg *logger.Message) error {
	l.lock.RLock()
	defer l.lock.RUnlock()
	if l.closed {
		return errors.New("awslogs is closed")
	}
	l.messages <- msg
	return nil
}

New によって起動する go routine が、キューからログを読み出し CloudWatch に送信する。

func New(info logger.Info) (logger.Logger, error) {

(snip)
	go containerStream.collectBatch(creationDone)

	return containerStream, nil
}

awslogs-multiline-pattern が指定されていない場合、messages からログを読み出して processEvent を実行する。 newEventBatch は未送信のログイベントとその合計サイズを保持する変数を生成する関数であり、変数 batchが使用される。 processEventbatch にログイベントを追加をするが、これにより API が実行されるとは限らない。 そのため、デフォルトの 5 秒のタイマーで publishBatch を実行し、未送信のログイベントに対して API を実行させる。

func (l *logStream) collectBatch(created chan bool) {
	// Wait for the logstream/group to be created
	<-created
	flushInterval := l.forceFlushInterval
	if flushInterval <= 0 {
		flushInterval = defaultForceFlushInterval
	}
	ticker := newTicker(flushInterval)
	var eventBuffer []byte
	var eventBufferTimestamp int64
	batch := newEventBatch()
	for {
		select {
		case t := <-ticker.C:
(snip)
			l.publishBatch(batch)
			batch.reset()
		case msg, more := <-l.messages:
			if !more {
				// Flush event buffer and release resources
				l.processEvent(batch, eventBuffer, eventBufferTimestamp)
				l.publishBatch(batch)
				batch.reset()
				return
			}
			if eventBufferTimestamp == 0 {
				eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
			}
			line := msg.Line
			if l.multilinePattern != nil {
(snip)
			} else {
				l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
				logger.PutMessage(msg)
			}

CloudWatch Logs の PutLogEvents API は一度に複数のログイベントを送信できるが次の制限がある。

  1. 最大のバッチサイズが 1 MiB
  2. 一つのログイベントは最大 256 KiB(だけど無効な UTF-8 シーケンスが含まれているかもしれないので - 26Bytes)
  3. バッチイベントの数は最大で 10,000

そのため制限を超過しないように(API が失敗しないように)しつつ、なるべく一度の API 実行で同時に複数のログを送信をしようとする。

processEvent は引数のログイベントを未送信のログイベントのコレクションの batch に制限を超過しないように追加を試みる。 追加が成功する場合処理を終え、失敗する場合 publishBatch で既存のログイベントを送信し batch をリセットする。 失敗した場合、次の for-loop で引数のログイベントの追加が期待される。 なお、2. の 256 KiB - 26 bytes を超える場合、findValidSplit によって、最大サイズに分割される。

func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
	for len(bytes) > 0 {
		// Split line length so it does not exceed the maximum
		splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
		line := bytes[:splitOffset]
		event := wrappedEvent{
			inputLogEvent: types.InputLogEvent{
				Message:   aws.String(string(line)),
				Timestamp: aws.Int64(timestamp),
			},
			insertOrder: batch.count(),
		}

		added := batch.add(event, lineBytes)
		if added {
			bytes = bytes[splitOffset:]
		} else {
			l.publishBatch(batch)
			batch.reset()
		}
	}
}

func (b *eventBatch) add(event wrappedEvent, size int) bool {
	addBytes := size + perEventBytes

	// verify we are still within service limits
	switch {
	case len(b.batch)+1 > maximumLogEventsPerPut:
		return false
	case b.bytes+addBytes > maximumBytesPerPut:
		return false
	}

	b.bytes += addBytes
	b.batch = append(b.batch, event)

	return true
}


func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
	for offset, rune := range line {
		splitOffset = offset
		if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
			return splitOffset, effectiveBytes
		}
		effectiveBytes += utf8.RuneLen(rune)
	}
	splitOffset = len(line)
	return
}

publishBatch が API を実行する。ExpectedSequenceToken は現状発生しないので無視して良さそうである。 一方で、もしネットワークの問題等で API が失敗した場合も Docker や ecs shim logger の標準エラーにエラーが出力されるだけに見える。 そのため、ecs shim logger までエラーは伝播されない。 なお、API のリトライは AWS SDK のデフォルトの設定に従いそうである。

func (l *logStream) publishBatch(batch *eventBatch) {
	if batch.isEmpty() {
		return
	}
	cwEvents := unwrapEvents(batch.events())

	nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
	if err != nil {
		if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
			// already submitted, just grab the correct sequence token
			nextSequenceToken = apiErr.ExpectedSequenceToken
			log.G(context.TODO()).WithFields(log.Fields{
				"errorCode":     apiErr.ErrorCode(),
				"message":       apiErr.ErrorMessage(),
				"logGroupName":  l.logGroupName,
				"logStreamName": l.logStreamName,
			}).Info("Data already accepted, ignoring error")
			err = nil
		} else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
			nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
		}
	}
	if err != nil {
		log.G(context.TODO()).Error(err)
	} else {
		l.sequenceToken = nextSequenceToken
	}
}

Docker の non-blocking mode(リングバッファ)

ecs shim logger ではなく、Docker と non-blocking mode を使用した場合の実装である。 リングバッファの実装は ecs shim logger と大体同じに見える(というより ecs shim logger 側が Docker の実装を参考にしている)。

コンテナの作成時に ecs shim logger と同じようにベースのロガーを引数にリングバッファのロガーにスイッチする。

	l, err := initDriver(info)
	if err != nil {
		return nil, err
	}

	if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock {
		bufferSize := int64(-1)
		if s, exists := cfg.Config["max-buffer-size"]; exists {
			bufferSize, err = units.RAMInBytes(s)
			if err != nil {
				return nil, err
			}
		}
		l = logger.NewRingLogger(l, info, bufferSize)
	}

リングバッファのオーバーライドした Log ではリングバッファにログを Enqueue する。 加えて、ロガーの作成時に、リングバッファから Dequeue し、ログドライバーの Log を実行する go routine を実行する。

func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
	l := &RingLogger{
		buffer:  newRing(maxSize),
		l:       driver,
		logInfo: logInfo,
	}
	l.wg.Add(1)
	go l.run()
	return l
}

// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
	if maxSize < 0 {
		maxSize = defaultRingMaxSize
	}
	l := newRingLogger(driver, logInfo, maxSize)
	if _, ok := driver.(LogReader); ok {
		return &ringWithReader{l}
	}
	return l
}

まとめ

気が向いたら図に起こすかもしれないです。

fluentd

変数blockingnon-blocking備考
読み出しバッファ(buf16KiB16KiB
一度にパイプから読み出す最大サイズ(defaultMaxReadBytes2KiB2KiB
メッセージキュー(pending1,048,5761,048,576Async 時のみ、超えると破棄される、fluentd-buffer-limit で変更可
リングバッファメッセージキュー(queueN/A1000+自動で拡張される
ecs リングバッファ(maxSizeInBytesN/A1MiBmax-buffer-size で変更可、超えると破棄される

awslogs

複数のログイベントに対してイベントの総数が 10,000 or 合計が 1MiB、単一のログイベントに対して最大サイズが 256KiB-26Bytes

変数blockingnon-blocking備考
読み出しバッファ(buf256KiB-26bytes256KiB
一度にパイプから読み出す最大サイズ(defaultMaxReadBytes2KiB2KiB
メッセージキュー(messages40964096awslogs-max-buffered-events で変更可
ログイベント集約送信バッファ(batch複数のログイベントに対してイベントの総数が 10,000 or 合計が 1MiB、単一のログイベントに対して最大サイズが 256KiB-26Bytes複数のログイベントに対してイベントの総数が 10,000 or 合計が 1MiB、単一のログイベントに対して最大サイズが 256KiB-26Bytes
リングバッファメッセージキュー(queueN/A1000+自動で拡張される
ecs リングバッファ(maxSizeInBytesN/A1MiBmax-buffer-size で変更可、超えると破棄される

buf のサイズが微妙に違うためエッジケースではあるが次の場合、動作が変わると思われる。 具体的に 256 KiB + 1 Bytes のデータを送信する場合を考える。

blocking の場合、shim logger のバッファ(buf)に律速され 256KiB - 26Bytes の 262118 Bytes だけ読み出されて Docker の Log が実行される。 Docker の Log では、制限には引っかからないため、262118 Bytes のログが転送される。そして残りの 27 Bytes が転送される。

non-blocking の場合、shim logger のバッファ(buf)に律速され 256 KiB だけ読み出されて Docker の Log が実行される。 Docker の Log では、制限に該当するため、262118 Bytes と 26 Bytes に分割されて送信される。そして残りの 1 Bytes が転送される。 したがって、262118 Bytes のログと 26 Bytes のログと、1 Bytes のログの 3 つに分割される。

その他

その他のバッファ

その他のバッファで言えば、stdout への書き込みは一般的に glibc などの実装によりバッファリングされ、stderr はバッファリングされない。 また TCP や UDS などでログを転送する場合、TCP の送信バッファによってバッファリングされる。 具体的に net.ipv4.tcp_wmem などで設定可能である。

containerd の fifo と file

binary の場合、パイプからのログの読み出しは shim が実行するバイナリ(ecs shim logger)が行なっていた。 一方で、fifo と file の場合、shim がパイプからログを読み出し、containerd が作成した fifo や shim が作成したファイルへコピーする。

引数の stdoutstderr は、file であればファイルのパス、fifo であれば fifo のパスである。 rio はコンテナの stdout/err に対応したパイプを保持している。

fifo であれば fifo.OpenFifo、file であれば os.OpenFile で開き、io.CopyBuffer でコンテナのパイプから、fifo もしくはファイルへコピーする(go routine を実行する)。 また、コピーに際して bufPool を使用してバッファリングしている。 stdout/err に対応する fifo はブロッキングモードなので、fifo から読み出されない場合、コンテナのログ出力は fifo へ書き込みできずブロッキングされる可能性がある。

copyPipes 関数

func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
	var sameFile *countingWriteCloser
	for _, i := range []struct {
		name string
		dest func(wc io.WriteCloser, rc io.Closer)
	}{
		{
			name: stdout,
			dest: func(wc io.WriteCloser, rc io.Closer) {
				wg.Add(1)
				cwg.Add(1)
				go func() {
					cwg.Done()
					p := bufPool.Get().(*[]byte)
					defer bufPool.Put(p)
					if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
						log.G(ctx).Warn("error copying stdout")
					}
					wg.Done()
					wc.Close()
					if rc != nil {
						rc.Close()
					}
				}()
			},
		}, {
			name: stderr,
			dest: func(wc io.WriteCloser, rc io.Closer) {
				wg.Add(1)
				cwg.Add(1)
				go func() {
					cwg.Done()
					p := bufPool.Get().(*[]byte)
					defer bufPool.Put(p)
					if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
						log.G(ctx).Warn("error copying stderr")
					}
					wg.Done()
					wc.Close()
					if rc != nil {
						rc.Close()
					}
				}()
			},
		},
	} {
		ok, err := fifo.IsFifo(i.name)
		if err != nil {
			return err
		}
		var (
			fw io.WriteCloser
			fr io.Closer
		)
		if ok {
			if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil {
				return fmt.Errorf("containerd-shim: opening w/o fifo %q failed: %w", i.name, err)
			}
			if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil {
				return fmt.Errorf("containerd-shim: opening r/o fifo %q failed: %w", i.name, err)
			}
		} else {
			if sameFile != nil {
				sameFile.count++
				i.dest(sameFile, nil)
				continue
			}
			if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil {
				return fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err)
			}
			if stdout == stderr {
				sameFile = &countingWriteCloser{
					WriteCloser: fw,
					count:       1,
				}
			}
		}
		i.dest(fw, fr)
	}
	if stdin == "" {
		return nil
	}
	f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
	if err != nil {
		return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
	}
	cwg.Add(1)
	go func() {
		cwg.Done()
		p := bufPool.Get().(*[]byte)
		defer bufPool.Put(p)

		io.CopyBuffer(rio.Stdin(), f, *p)
		rio.Stdin().Close()
		f.Close()
	}()
	return nil
}