Docker のリングバッファとか ECS Shim Logger とか
- はじめに
- shim-loggers-for-containerd
- コンテナの stdout/err
- blocking mode と non-blocking mode
- blocking mode の実装
- non-blocking mode の実装
- Docker の Log メソッド
- Docker の non-blocking mode(リングバッファ)
- まとめ
- その他
はじめに
Docker のログドライバーの実装や、Containerd の shim logger 実装の一つである shim-loggers-for-containerd の実装をまとめる。
shim-loggers-for-containerd
ログドライバーは、コンテナの stdout/err をキャプチャし、対応した送信先にログを転送する。 転送先は、サイドカーやデーモンとして動作する fluentd や CloudWatch の実装である awslogs、ローカルに JSON 形式で保存する json などがある。 ecs shim logger は awslogs や、fluentd など Docker のログドライバーの一部に対応している。
ecs shim logger では、non-blocking mode に対応した NewBufferedLogger とブロッキングされる可能性のある NewLogger がある。
type LogDriver
func NewBufferedLogger(l LogDriver, bufferReadSize int, maxBufferSize int, containerID string) LogDriver
func NewLogger(options ...Opt) (LogDriver, error)
type LogDriver interface {
// Start functions starts sending container logs to destination.
Start(context.Context, *time.Duration, func() error) error
// GetPipes gets pipes of container that exposed by containerd.
GetPipes() (map[string]io.Reader, error)
// Log sends logs to destination.
Log(*dockerlogger.Message) error
// Read reads a single log message from container pipe and sends it to
// destination or saves it to ring buffer, depending on the mode of log
// driver.
Read(context.Context, io.Reader, string, int, sendLogToDestFunc) error
}
type Logger
func (l *Logger) GetPipes() (map[string]io.Reader, error)
func (l *Logger) Log(message *dockerlogger.Message) error
func (l *Logger) Read(ctx context.Context, pipe io.Reader, source string, bufferSizeInBytes int, ...) error
func (l *Logger) Start(ctx context.Context, cleanupTime *time.Duration, ready func() error) error
コンテナの stdout/err
コンテナの stdout(fd 1)と stderr(fd 2)は コンテナの作成時に shim が作成したパイプの fd に置き換わっている。 コンテナが stdout/err に書き込むと、それは上記パイプにつながっている。 そのため、ランタイムがパイプから定期的に入力を読み出し適切な処理を行うことでログドライバーを実現している。 なお、containerd ではパイプの繋がる先は、binary、fifo と file が提供されている。 ecs の実装では binary である。
binary は具体的に、
- shim が stdout/err 用のパイプ(pip2 システムコール)を作成する。
- shim が ecs shim logger のバイナリを実行する。実行時に ecs shim logger の fd 3 と 4 に 1. のパイプの reader を渡す
- shim は runc を通じて コンテナの fd 1 と 2 にパイプの writer を渡す
- ecs shim logger は fd 3 と 4 からコンテナのログを定期的に読み込み処理をする
// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
(snip)
out, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
closers = append(closers, out.Close)
serr, err := newPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
closers = append(closers, serr.Close)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
closers = append(closers, r.Close, w.Close)
cmd := NewBinaryCmd(uri, id, ns)
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start binary process: %w", err)
}
ecs shim logger 側の実装(shim のライブラリを使用)
func Run(fn LoggerFunc) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := &Config{
ID: os.Getenv("CONTAINER_ID"),
Namespace: os.Getenv("CONTAINER_NAMESPACE"),
Stdout: os.NewFile(3, "CONTAINER_STDOUT"),
Stderr: os.NewFile(4, "CONTAINER_STDERR"),
}
func runFluentdDriver(globalArgs *logger.GlobalArgs) {
args := getFluentdArgs()
loggerArgs := fluentd.InitLogger(globalArgs, args)
logging.Run(loggerArgs.RunLogDriver)
}
blocking mode と non-blocking mode
前述の通り、ecs shim logger は stdout/err とつながっているパイプから定期的にログを読み出し、適切な処理を行うことでログドライバーを実現している。
パイプにはバッファサイズがある。多くの Linux 環境では、64 KB がデフォルトだと思われる。設定可能な最大サイズは /proc/sys/fs/pipe-max-size
で確認できる。
このパイプはノンブロッキングではないため、バッファが一杯の時の stdout/err への書き込みはブロックされる。 ブロックの発生はコンテナのアプリケーションの処理が停止を意味する。 特にシングルスレッドの同期 I/O のアプリケーションであれば、アプリケーションの処理が完全に停止してしまう。
上記パイプへの書き込みのブロックは、コンテナによる stdout/err への書き込みに対して、ランタイムによるパイプからのデータの読み出しが追いつかない時に発生する。 特にアプリケーションのログ出力のスループットが高く、ログドライバーがインターネット越しの API を実行する場合、ログの出力に対して API 実行のレイテンシーが大きいので発生する可能性がある。
上記問題に対して、ecs shim logger では non-blocking mode を使用することで、ログの欠損の可能性がありつつもアプリケーションのブロックを回避できる可能性が高くなる。 具体的に ecs shim logger は内部でリングバッファを用意し、パイプからリングバッファへコピーする go routine と リングバッファからデータを読み出しログドライバーの処理をする go routine を実行する。 簡単に言えば、内部実装としてキュー(リングバッファ)を介したキューワーカーアーキテクチャと言える。 留意点としてリングバッファの性質上、パイプからリングバッファへの書き込みに対して、リングバッファからの読み込みが追いつかない場合、ログが欠損する可能性がある。 なお、リングバッファのサイズはユーザが指定可能である。
blocking mode の実装
サイズを決定する変数として、パイプからの一度で読み込み可能な最大サイズと、パイプから読み出したログを保持するバッファのサイズがある。
デフォルトでは、前者が defaultMaxReadBytes
として 2KiB、後者が DefaultBufSizeInBytes
として 16 KiB である。
これはログドライバーの実装よって上書きされる可能性がある。
初期化
NewLogger
は LogDriver インターフェースを返す。デフォルト値を上書きする場合、Opt で指定する。
// NewLogger creates a LogDriver with the provided LoggerOpt.
func NewLogger(options ...Opt) (LogDriver, error) {
l := &Logger{
Info: &dockerlogger.Info{},
bufferSizeInBytes: DefaultBufSizeInBytes,
maxReadBytes: defaultMaxReadBytes,
}
for _, opt := range options {
opt(l)
}
return l, nil
}
開始
Start
はパイプからの読み出しとログドライバーの処理を開始する。
具体的に stdout/err のパイプ(io.Reader)に対応した go routine をそれぞれ実行する。
Start
は全ての go routine の終了を待つが、それはパイプが全てクローズ(コンテナが終了)した場合か何かしらの問題が発生した場合である。
func (l *Logger) Start(
ctx context.Context,
cleanupTime *time.Duration,
ready func() error,
) error {
pipeNameToPipe, err := l.GetPipes()
(snip)
errGroup, ctx := errgroup.WithContext(ctx)
for pn, p := range pipeNameToPipe {
// Copy pn and p to new variables source and pipe, accordingly.
source := pn
pipe := p
errGroup.Go(func() error {
logErr := l.sendLogs(ctx, pipe, source, cleanupTime)
if logErr != nil {
err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
return nil
})
}
// Signal that the container is ready to be started
if err := ready(); err != nil {
return fmt.Errorf("failed to check container ready status: %w", err)
}
// Wait() will return the first error it receives.
return errGroup.Wait()
送信
sendLogs
は Start
とさほど変わらない。
Read
はパイプから読み出せる限り読み出す。Read
が終了するのはパイプが close した場合か、何か問題が生じた場合である。
終了した場合、最後のログが十分に転送できるよう待機する。
// sendLogs sends logs to destination.
func (l *Logger) sendLogs(
ctx context.Context,
f io.Reader,
source string,
cleanupTime *time.Duration,
) error {
if err := l.Read(ctx, f, source, l.bufferSizeInBytes, l.sendLogMsgToDest); err != nil {
err := fmt.Errorf("failed to read logs from %s pipe: %w", source, err)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
// Sleep sometime to let shim logger clean up, for example, to allow enough time for the last
// few log messages be flushed to destination like CloudWatch.
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("Pipe %s is closed. Sleeping %s for cleanning up.", source, cleanupTime.String()),
debug.INFO,
0)
time.Sleep(*cleanupTime)
return nil
}
buf
はパイプから読み出したログを保持する変数である。デフォルトは DefaultBufSizeInBytes
の 16 KiB のバッファを確保する。
partial
系 は、一つのログのサイズが大きすぎる場合に分割する際に、分割したことを表すフラグである。
bytesInBuffer
は buf
内で、どこまでが未処理のログであるかを示すカーソルとなる。
readFromContainerPipe
がパイプからログを読み出し buf
に格納する役割を担う。
// Read gets container logs, saves them to our own buffer. Then we will read logs line by line
// and send them to destination. In non-blocking mode, the destination is the ring buffer. More
// log messages will be sent in verbose mode for debugging.
func (l *Logger) Read(
ctx context.Context,
pipe io.Reader,
source string,
bufferSizeInBytes int,
sendLogMsgToDest sendLogToDestFunc,
) error {
var (
msgTimestamp time.Time
bytesInBuffer int
err error
eof bool
)
// Initiate an in-memory buffer to hold bytes read from container pipe.
buf := make([]byte, bufferSizeInBytes)
// isFirstPartial indicates if current message saved in buffer is not a complete line,
// and is the first partial of the whole log message. Initialize to true.
isFirstPartial := true
// isPartialMsg indicates if current message is a partial log message. Initialize to false.
isPartialMsg := false
// isLastPartial indicates if this message completes a partial message
isLastPartial := false
// partialID is a random ID given to each split message
partialID := ""
// partialOrdinal orders the split messages and count up from 1
partialOrdinal := 1
for {
select {
case <-ctx.Done():
(snip)
default:
eof, bytesInBuffer, err = readFromContainerPipe(pipe, buf, bytesInBuffer, l.maxReadBytes)
if err != nil {
return err
}
(snip)
readFromContainerPipe
が読み込むバイト数は、デフォルト 2KiB
もしくは bytesInBuffer - デフォルト 16KiB
の小さい方となる。
言い換えると、最大で 2KiB
であり、もし buf
の残りサイズが 2KiB
にも満たなければ buf
の残りの容量のサイズとなる。
読み込んだバイト数を bytesInBuffer
に加算して終える。
EOF はパイプが close した時に返される。もし、パイプにデータがない場合、pipe.Read
はブロックされる。
// readFromContainerPipe reads bytes from container pipe, upto max read size in bytes of 2048.
func readFromContainerPipe(pipe io.Reader, buf []byte, bytesInBuffer, maxReadBytes int) (bool, int, error) {
// eof indicates if we have already met EOF error.
eof := false
// Decide how many bytes we can read from container pipe for this iteration. It's either
// the current bytes in buffer plus 2048 bytes or the available spaces left, whichever is
// smaller.
readBytesUpto := int(math.Min(float64(bytesInBuffer+maxReadBytes), float64(cap(buf))))
// Read logs from container pipe if there are available spaces for new log messages.
if readBytesUpto > bytesInBuffer {
readBytesFromPipe, err := pipe.Read(buf[bytesInBuffer:readBytesUpto])
if err != nil {
if err != io.EOF {
return false, bytesInBuffer, fmt.Errorf("failed to read log stream from container pipe: %w", err)
}
// Pipe is closed, set flag to true.
eof = true
}
atomic.AddUint64(&bytesReadFromSrc, uint64(readBytesFromPipe))
bytesInBuffer += readBytesFromPipe
}
return eof, bytesInBuffer, nil
}
head
は buf
内の未処理のログの先頭を示すカーソルなので、buf[head:bytesInBuffer]
が未処理のログを表す。
未処理のログのうち、改行単位でログを転送するため bytes.IndexByte
で改行の位置(lenOfLine
)を見つけ、buf[head:head+lenOfLine]
のログを転送する。
ログを転送したら head
の位置を head + lenOfLine + 1
までずらし、buf[head:bytesInBuffer]
に改行が含まれている限り同じ処理をする。
バッファに複数の改行を含むログがある場合、改行がなくなるまでパイプからは読み出されない。
// If container pipe is closed and no bytes left in our buffer, directly return.
if eof && bytesInBuffer == 0 {g
return nil
}
// Iteratively scan the unread part in our own buffer and read logs line by line.
// Then send it to destination.
head := 0
// This function returns -1 if '\n' in not present in buffer.
lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
for lenOfLine >= 0 {
// If this is the end of a partial message
// use the existing timestamp, so that all
// partials split from the same message have the same timestamp
// If not, new timestamp.
if isPartialMsg {
isLastPartial = true
} else {
msgTimestamp = time.Now().UTC()
}
curLine := buf[head : head+lenOfLine]
err = sendLogMsgToDest(
curLine,
source,
isPartialMsg,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
}
atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
atomic.AddUint64(&numberOfNewLineChars, 1)
// Since we have found a newline symbol, it means this line has ended.
// Reset flags.
isFirstPartial = true
isPartialMsg = false
isLastPartial = false
partialID = ""
partialOrdinal = 1
// Update the index of head of next line message.
head += lenOfLine + 1
lenOfLine = bytes.IndexByte(buf[head:bytesInBuffer], newline)
}
Read
に戻ると未処理のログについて、改行単位のログの転送が終了したら、パイプが close している(EOF)か、バッファが一杯であるか確認する。
バッファが一杯とは head
が 0 かつ bytesInBuffer == len(buf)
が成り立つ時、つまり buf
が全て改行を含まない未処理のログである。
改行を含まないため、isPartialMsg
フラグを有効にしてバッファのログを転送する。
なお、partial フラグが有効化され、次の読み込みで改行が含まれる場合、次のログは isLastPartial
を有効化し転送する。
転送すると head
と bytesInBuffer
を 0 にクリアする。
// If the pipe is closed and the last line does not end with a newline symbol, send whatever left
// in the buffer to destination as a single log message. Or if our buffer is full but there is
// no newline symbol yet, record it as a partial log message and send it as a single log message
// to destination.
if eof || bufferIsFull(buf, head, bytesInBuffer) {
// Still bytes left in the buffer after we identified all newline symbols.
if head < bytesInBuffer {
curLine := buf[head:bytesInBuffer]
// Record as a partial message.
isPartialMsg = true
if isFirstPartial {
msgTimestamp = time.Now().UTC()
partialID, err = generateRandomID()
}
if err != nil {
return err
}
err = sendLogMsgToDest(
curLine,
source,
isPartialMsg,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
}
atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
// reset head and bytesInBuffer
head = 0
bytesInBuffer = 0
// increment partial flags
partialOrdinal++
if isFirstPartial {
// if this was the first partial message
// the next one is not the first if it is also partial
isFirstPartial = false
}
}
// If pipe is closed after we send all bytes left in buffer, then directly return.
if eof {
return nil
}
}
Read
の最後について、改行のログを処理し EOF やバッファが一杯ではない場合、未処理のログをバッファの先頭にコピーする。
そのため、次のパイプからの読みでは常に head = 0
から bytesInBuffer
までが未処理のログとなる。
// If there are any bytes left in the buffer, move them to the head and handle them in the
// next round.
if head > 0 {
copy(buf[0:], buf[head:bytesInBuffer])
bytesInBuffer -= head
}
}
}
}
sendLogMsgToDest
はパイプから読み出したログを転送する関数であり、[]byte
のログを Docker の Message
に変換して Log
メソッドを実行する。
Log
メソッドは各ログドライバーの実装の Log
メソッドを実行する。
// sendLogMsgToDest sends a single line of log message to destination.
func (l *Logger) sendLogMsgToDest(
line []byte,
source string,
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
(snip)
message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}
err := l.Log(message)
if err != nil {
return fmt.Errorf("failed to log msg for container %s: %w", l.Info.ContainerName, err)
}
return nil
}
// Log sends logs to destination.
func (l *Logger) Log(message *dockerlogger.Message) error {
return l.Stream.Log(message)
}
転送の実態
ログドライバーの Log
メソッドは splunk、awslogs、fluentd どれも Log インターフェースを満たす Docker の実装を使い回している。
// Client is a wrapper for docker logger's Log method, which is mostly used for testing
// purposes.
type Client interface {
Log(*dockerlogger.Message) error
}
Docker ログドライバーの Log メソッドによるログの転送は同期実行に思えるが実装によっては非同期である。
non-blocking mode の実装
これは blocking mode の NewLogger をオーバーライドする形で実装される。以降 blocking mode の実装を common と呼ぶ。
初期化
blocking mode で使用した LogDriver インターフェースを引数に受け取る。
bufferReadSize
は、common の Read
に渡される値であり、前述の buf
のサイズを決定する値である。
maxBufferSize
はリングバッファの最大バッファサイズを表し、maxSizeInBytes
にマップされる。
未送信のログの合計サイズが maxBufferSize
を超えるとログは破棄される(欠損する)。
func NewBufferedLogger(l LogDriver, bufferReadSize int, maxBufferSize int, containerID string) LogDriver {
return &bufferedLogger{
l: l,
buffer: newLoggerBuffer(maxBufferSize),
bufReadSizeInBytes: bufferReadSize,
containerID: containerID,
}
}
開始
common に比べると特徴は 2 つである。
- パイプからリングバッファへ、ログをコピーする go routine をパイプごとに実行する
- リングバッファからログを読み出し、ログドライバーの処理をする go routine を 1 つ実行する
// Start starts the non-blocking mode logger.
func (bl *bufferedLogger) Start(
ctx context.Context,
cleanupTime *time.Duration,
ready func() error,
) error {
pipeNameToPipe, err := bl.l.GetPipes()
if err != nil {
return err
}
(snip)
errGroup, ctx := errgroup.WithContext(ctx)
// Start the goroutine of underlying log driver to consume logs from ring buffer and
// send logs to destination when there's any.
errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, "Starting consuming logs from ring buffer", debug.INFO, 0)
return bl.sendLogMessagesToDestination(cleanupTime)
})
// Start reading logs from container pipes.
for pn, p := range pipeNameToPipe {
// Copy pn and p to new variables source and pipe, accordingly.
source := pn
pipe := p
errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, fmt.Sprintf("Reading logs from pipe %s", source), debug.DEBUG, 0)
logErr := bl.saveLogMessagesToRingBuffer(ctx, pipe, source)
if logErr != nil {
err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
return nil
})
}
// Signal that the container is ready to be started
if err := ready(); err != nil {
return fmt.Errorf("failed to check container ready status: %w", err)
}
// Wait() will return the first error it receives.
return errGroup.Wait()
}
リングバッファ
curSizeInBytes
は現在のリングバッファ内の未処理のログのサイズの合計を表す。
curSizeInBytes
が maxSizeInBytes
を超えると、そのログを破棄する。
// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L128
// as this struct is not exported.
type ringBuffer struct {
// A mutex lock is used here when writing/reading log messages from the queue
// as there exists three go routines accessing the buffer.
lock sync.Mutex
// A condition variable wait is used here to notify goroutines that get access to
// the buffer should wait or continue.
wait *sync.Cond
// current total bytes stored in the buffer
curSizeInBytes int
// maximum bytes capacity provided by the buffer
maxSizeInBytes int
// queue saves all the log messages read from pipes exposed by containerd, and
// is consumed by underlying log driver.
queue []*dockerlogger.Message
// closedPipesCount is the number of closed container pipes for a single container.
closedPipesCount int
// isClosed indicates if ring buffer is closed.
isClosed bool
}
パイプからリングバッファへ
bl.Read
は common の Read
を実行するだけである。
common との違いは、引数のログを転送する関数が saveSingleLogMessageToRingBuffer
という点である。
これにより、パイプから読み込んだログは、ログドライバーの転送処理ではなく、リングバッファへの転送処理がされる。
// saveLogMessagesToRingBuffer saves container log messages to ring buffer.
func (bl *bufferedLogger) saveLogMessagesToRingBuffer(
ctx context.Context,
f io.Reader,
source string,
) error {
if err := bl.Read(ctx, f, source, bl.bufReadSizeInBytes, bl.saveSingleLogMessageToRingBuffer); err != nil {
err := fmt.Errorf("failed to read logs from %s pipe: %w", source, err)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
// No messages in the pipe, send signal to closed pipe channel.
debug.SendEventsToLog(DaemonName, fmt.Sprintf("Pipe %s is closed", source), debug.INFO, 1)
bl.buffer.closedPipesCount++
// If both container pipes are closed, wake up the Dequeue goroutine which is waiting on wait.
if bl.buffer.closedPipesCount == expectedNumOfPipes {
bl.buffer.isClosed = true
bl.buffer.wait.Broadcast()
}
return nil
}
// Read reads log messages from container pipe and saves them to ring buffer line by line.
func (bl *bufferedLogger) Read(
ctx context.Context,
pipe io.Reader,
source string,
bufferSizeInBytes int,
sendLogMsgToDest sendLogToDestFunc,
) error {
return bl.l.Read(ctx, pipe, source, bufferSizeInBytes, sendLogMsgToDest)
}
saveSingleLogMessageToRingBuffer
は、改行で分割された一つのログメッセージ([]byte
)を Docker 形式のログメッセージに変換し、リングバッファへエンキューする。
// saveSingleLogMessageToRingBuffer enqueues a single line of log message to ring buffer.
func (bl *bufferedLogger) saveSingleLogMessageToRingBuffer(
line []byte,
source string,
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
(snip)
message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}
err := bl.buffer.Enqueue(message)
if err != nil {
return fmt.Errorf("failed to save logs to buffer: %w", err)
}
return nil
}
エンキュー
既にキューにメッセージがあり、未処理のログとこれから処理するログの合計サイズが maxBufferSize
(maxSizeInBytes
) を超える場合、Dequeue(ログの送信用)の go routine を起動させる。
この場合、引数のログメッセージはエンキューされないためログは欠損する(最も古いログメッセージが破棄されるわけではない点に注意)。
maxBufferSize
(maxSizeInBytes
)を超えない場合、queue
にメッセージを追加および未処理のログのサイズを加算し、ログの送信用の go routine を起動させる。
// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L155
// as messageRing struct is not exported.
// Enqueue adds a single log message to the tail of intermediate buffer.
func (b *ringBuffer) Enqueue(msg *dockerlogger.Message) error {
b.lock.Lock()
defer b.lock.Unlock()
lineSizeInBytes := len(msg.Line)
// If there is already at least one log message in the queue and not enough space left
// for the new coming log message to take up, drop this log message. Otherwise, save this
// message to ring buffer anyway.
if len(b.queue) > 0 &&
b.curSizeInBytes+lineSizeInBytes > b.maxSizeInBytes {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
"buffer is full/message is too long, waiting for available bytes",
debug.DEBUG, 0)
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("message size: %d, current buffer size: %d, max buffer size %d",
lineSizeInBytes,
b.curSizeInBytes,
b.maxSizeInBytes),
debug.DEBUG, 0)
}
// Wake up "Dequeue" or the other "Enqueue" go routine (called by the other pipe)
// waiting on current mutex lock if there's any
b.wait.Signal()
return nil
}
b.queue = append(b.queue, msg)
b.curSizeInBytes += lineSizeInBytes
// Wake up "Dequeue" or the other "Enqueue" go routine (called by the other pipe)
// waiting on current mutex lock if there's any
b.wait.Signal()
return nil
}
リングバッファからログの転送
Start
の go routine で実行された sendLogMessagesToDestination
は、パイプがクローズするまで for-loop で sendLogMessageToDestination
を実行する。
// sendLogMessagesToDestination consumes logs from ring buffer and use the
// underlying log driver to send logs to destination.
func (bl *bufferedLogger) sendLogMessagesToDestination(cleanupTime *time.Duration) error {
// Keep sending log message to destination defined by the underlying log driver until
// the ring buffer is closed.
for !bl.buffer.isClosed {
if err := bl.sendLogMessageToDestination(); err != nil {
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
}
// If both container pipes are closed, flush messages left in ring buffer.
debug.SendEventsToLog(DaemonName, "All pipes are closed, flushing buffer.", debug.INFO, 0)
if err := bl.flushMessages(); err != nil {
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
// Sleep sometime to let shim logger clean up, for example, to allow enough time for the last
// few log messages be flushed to destination like CloudWatch.
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("Sleeping %s for cleanning up.", cleanupTime.String()),
debug.INFO, 0)
time.Sleep(*cleanupTime)
return nil
}
sendLogMessageToDestination
はリングバッファからログメッセージをデキューし、ログドライバーの Log
を実行する。
// sendLogMessageToDestination dequeues a single log message from buffer and sends to destination.
func (bl *bufferedLogger) sendLogMessageToDestination() error {
msg, err := bl.buffer.Dequeue()
// Do an early return if ring buffer is closed.
if bl.buffer.isClosed {
return nil
}
if err != nil {
return fmt.Errorf("failed to read logs from buffer: %w", err)
}
err = bl.Log(msg)
if err != nil {
return fmt.Errorf("failed to send logs to destination: %w", err)
}
return nil
}
// Log lets underlying log driver send logs to destination.
func (bl *bufferedLogger) Log(message *dockerlogger.Message) error {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("[BUFFER] Sending message: %s", string(message.Line)),
debug.DEBUG, 0)
}
return bl.l.Log(message)
}
デキュー
もし、キューにメッセージがなければ go routine を一時停止する。 メッセージがあれば、一つ取り出してそれを返す。また、現在の未処理のログメッセージのサイズを減算する。
// Dequeue gets a line of log message from the head of intermediate buffer.
func (b *ringBuffer) Dequeue() (*dockerlogger.Message, error) {
b.lock.Lock()
defer b.lock.Unlock()
// If there is no log yet in the buffer, and the ring buffer is still open, wait
// suspends current go routine.
for len(b.queue) == 0 && !b.isClosed {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
"No messages in queue, waiting...",
debug.DEBUG, 0)
}
b.wait.Wait()
}
// Directly return if ring buffer is closed.
if b.isClosed {
return nil, nil //nolint: nilnil // swallow the error
}
// Get and remove the oldest message saved in buffer/queue from head and update
// the current used bytes of buffer.
msg := b.queue[0]
b.queue = b.queue[1:]
b.curSizeInBytes -= len(msg.Line)
return msg, nil
}
キューのサイズ
cap が 1000 としてスライスが作成されるためデフォルトは 1000 である。 また、append でキューへ追加されるため、もし 1000 を超える場合も自動で拡張される。
ringCap = 1000
(snip)
func newLoggerBuffer(maxBufferSize int) *ringBuffer {
rb := &ringBuffer{
maxSizeInBytes: maxBufferSize,
queue: make([]*dockerlogger.Message, 0, ringCap),
closedPipesCount: 0,
isClosed: false,
}
rb.wait = sync.NewCond(&rb.lock)
return rb
}
Docker の Log メソッド
ecs shim logger でも使用されているログドライバーの Log メソッドを見ていく。
fluentd
fluent-logger-golang の実装に依存しておりシンプルである。
import (
(snip)
"github.com/fluent/fluent-logger-golang/fluent"
const (
defaultBufferLimit = 1024 * 1024
(snip)
func (f *fluentd) Log(msg *logger.Message) error {
data := map[string]string{
"container_id": f.containerID,
"container_name": f.containerName,
"source": msg.Source,
"log": string(msg.Line),
}
(snip)
// fluent-logger-golang buffers logs from failures and disconnections,
// and these are transferred again automatically.
return f.writer.PostWithTime(f.tag, ts, data)
}
PostWithTime は最終的に postRawData を呼び出す。 Async が有効であればバッファリングされる。Async は Docker の fluentd-async に対応している(デフォルトで無効)。
Async が無効であれば、バッファリングされず(UDS かもしれないが)ネットワーク越しに送信を実行するため、この送信によって律速される可能性がある。
func (f *Fluent) postRawData(msg *msgToSend) error {
if f.Config.Async {
return f.appendBuffer(msg)
}
// Synchronous write
if f.closed {
return fmt.Errorf("fluent#postRawData: Logger already closed")
}
return f.writeWithRetry(context.Background(), msg)
}
バッファサイズ(キューサイズ pending
)は、デフォルトでは Docker 側の定義の 1,048,576 である。
ログの合計バイトではなくキューのサイズなので、このキューで問題になることはなさそうであるが、もしバッファサイズを超える場合そのログは破棄されるため、メソッドはブロックされない。
しかし、キューを溢れた場合や同期の writeWithRetry
が失敗した場合、そのエラーは Log まで伝播しさらに ecs shim logger まで伝播しそうである。
結果として、ecs shim logger は停止し、パイプのバッファが一杯になり、アプリケーションがブロックしそうである。
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
(snip)
if config.Async {
ctx, cancel := context.WithCancel(context.Background())
f = &Fluent{
Config: config,
dialer: d,
stopRunning: make(chan struct{}),
cancelDialings: cancel,
pending: make(chan *msgToSend, config.BufferLimit),
pendingMutex: sync.RWMutex{},
muconn: sync.RWMutex{},
}
f.wg.Add(1)
go f.run(ctx)
(snip)
// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(msg *msgToSend) error {
f.pendingMutex.RLock()
defer f.pendingMutex.RUnlock()
if f.closed {
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
}
select {
case f.pending <- msg:
default:
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
}
return nil
}
awslogs
awslogs の Log は、fluentd とは異なりキュー(messages
)にメッセージを送信して終了するため、常に成功する。
messages
のサイズは、デフォルト(defaultMaxBufferedEvents
)で 4096 である。
select を使用していないため、キューが一杯の場合、Log はブロックされる。
// Log submits messages for logging by an instance of the awslogs logging driver
func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if l.closed {
return errors.New("awslogs is closed")
}
l.messages <- msg
return nil
}
New
によって起動する go routine が、キューからログを読み出し CloudWatch に送信する。
func New(info logger.Info) (logger.Logger, error) {
(snip)
go containerStream.collectBatch(creationDone)
return containerStream, nil
}
awslogs-multiline-pattern が指定されていない場合、messages
からログを読み出して processEvent
を実行する。
newEventBatch
は未送信のログイベントとその合計サイズを保持する変数を生成する関数であり、変数 batch
が使用される。
processEvent
は batch
にログイベントを追加をするが、これにより API が実行されるとは限らない。
そのため、デフォルトの 5 秒のタイマーで publishBatch
を実行し、未送信のログイベントに対して API を実行させる。
func (l *logStream) collectBatch(created chan bool) {
// Wait for the logstream/group to be created
<-created
flushInterval := l.forceFlushInterval
if flushInterval <= 0 {
flushInterval = defaultForceFlushInterval
}
ticker := newTicker(flushInterval)
var eventBuffer []byte
var eventBufferTimestamp int64
batch := newEventBatch()
for {
select {
case t := <-ticker.C:
(snip)
l.publishBatch(batch)
batch.reset()
case msg, more := <-l.messages:
if !more {
// Flush event buffer and release resources
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
l.publishBatch(batch)
batch.reset()
return
}
if eventBufferTimestamp == 0 {
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
}
line := msg.Line
if l.multilinePattern != nil {
(snip)
} else {
l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
logger.PutMessage(msg)
}
CloudWatch Logs の PutLogEvents は一度に複数のログイベントを送信できるが次の制限がある。
- 最大のバッチサイズが 1 MiB
- 一つのログイベントは最大 256 KiB(だけど無効な UTF-8 シーケンスが含まれているかもしれないので - 26Bytes)
- バッチイベントの数は最大で 10,000
そのため制限を超過しないように(API が失敗しないように)しつつ、なるべく一度の API 実行で同時に複数のログを送信をしようとする。
processEvent
は引数のログイベントを未送信のログイベントのコレクションの batch
に制限を超過しないように追加を試みる。
追加が成功する場合処理を終え、失敗する場合 publishBatch
で既存のログイベントを送信し batch
をリセットする。
失敗した場合、次の for-loop で引数のログイベントの追加が期待される。
なお、2. の 256 KiB - 26 bytes を超える場合、findValidSplit
によって、最大サイズに分割される。
func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
for len(bytes) > 0 {
// Split line length so it does not exceed the maximum
splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
line := bytes[:splitOffset]
event := wrappedEvent{
inputLogEvent: types.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(timestamp),
},
insertOrder: batch.count(),
}
added := batch.add(event, lineBytes)
if added {
bytes = bytes[splitOffset:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
}
func (b *eventBatch) add(event wrappedEvent, size int) bool {
addBytes := size + perEventBytes
// verify we are still within service limits
switch {
case len(b.batch)+1 > maximumLogEventsPerPut:
return false
case b.bytes+addBytes > maximumBytesPerPut:
return false
}
b.bytes += addBytes
b.batch = append(b.batch, event)
return true
}
func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
for offset, rune := range line {
splitOffset = offset
if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
return splitOffset, effectiveBytes
}
effectiveBytes += utf8.RuneLen(rune)
}
splitOffset = len(line)
return
}
publishBatch
が API を実行する。ExpectedSequenceToken は現状発生しないので無視して良さそうである。
一方で、もしネットワークの問題等で API が失敗した場合も Docker や ecs shim logger の標準エラーにエラーが出力されるだけに見える。
そのため、ecs shim logger までエラーは伝播されない。
なお、API のリトライは AWS SDK のデフォルトの設定に従いそうである。
func (l *logStream) publishBatch(batch *eventBatch) {
if batch.isEmpty() {
return
}
cwEvents := unwrapEvents(batch.events())
nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
if err != nil {
if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
// already submitted, just grab the correct sequence token
nextSequenceToken = apiErr.ExpectedSequenceToken
log.G(context.TODO()).WithFields(log.Fields{
"errorCode": apiErr.ErrorCode(),
"message": apiErr.ErrorMessage(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}).Info("Data already accepted, ignoring error")
err = nil
} else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
}
}
if err != nil {
log.G(context.TODO()).Error(err)
} else {
l.sequenceToken = nextSequenceToken
}
}
Docker の non-blocking mode(リングバッファ)
コンテナの作成時に ecs shim logger と同じようにベースのロガーを引数にリングバッファのロガーにスイッチする。
l, err := initDriver(info)
if err != nil {
return nil, err
}
if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock {
bufferSize := int64(-1)
if s, exists := cfg.Config["max-buffer-size"]; exists {
bufferSize, err = units.RAMInBytes(s)
if err != nil {
return nil, err
}
}
l = logger.NewRingLogger(l, info, bufferSize)
}
リングバッファの実装は ecs shim logger と大体同じに見える(というより ecs shim logger 側が Docker の実装を参考にしている)。
リングバッファのオーバーライドした Log ではリングバッファにログを Enqueue する。 加えて、ロガーの作成時に、リングバッファから Dequeue し、ログドライバーの Log を実行する go routine を実行する。
func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
l := &RingLogger{
buffer: newRing(maxSize),
l: driver,
logInfo: logInfo,
}
l.wg.Add(1)
go l.run()
return l
}
// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
if maxSize < 0 {
maxSize = defaultRingMaxSize
}
l := newRingLogger(driver, logInfo, maxSize)
if _, ok := driver.(LogReader); ok {
return &ringWithReader{l}
}
return l
}
まとめ
気が向いたら図に起こすかもしれないです。
fluentd
変数 | blocking | non-blocking | 備考 |
---|---|---|---|
読み出しバッファ(buf ) | 16KiB | 16KiB | |
一度にパイプから読み出す最大サイズ(defaultMaxReadBytes ) | 2KiB | 2KiB | |
メッセージキュー(pending ) | 1,048,576 | 1,048,576 | Async 時のみ、超えると破棄される、fluentd-buffer-limit で変更可 |
リングバッファメッセージキュー(queue ) | N/A | 1000+ | 自動で拡張される |
ecs リングバッファ(maxSizeInBytes ) | N/A | 1MiB | max-buffer-size で変更可、超えると破棄される |
awslogs
複数のログイベントに対してイベントの総数が 10,000 or 合計が 1MiB、単一のログイベントに対して最大サイズが 256KiB-26Bytes
変数 | blocking | non-blocking | 備考 |
---|---|---|---|
読み出しバッファ(buf ) | 256KiB-26bytes | 256KiB | |
一度にパイプから読み出す最大サイズ(defaultMaxReadBytes ) | 2KiB | 2KiB | |
メッセージキュー(messages ) | 4096 | 4096 | awslogs-max-buffered-events で変更可 |
ログイベント集約送信バッファ(batch ) | 複数のログイベントに対してイベントの総数が 10,000 or 合計が 1MiB、単一のログイベントに対して最大サイズが 256KiB-26Bytes | 複数のログイベントに対してイベントの総数が 10,000 or 合計が 1MiB、単一のログイベントに対して最大サイズが 256KiB-26Bytes | |
リングバッファメッセージキュー(queue ) | N/A | 1000+ | 自動で拡張される |
ecs リングバッファ(maxSizeInBytes ) | N/A | 1MiB | max-buffer-size で変更可、超えると破棄される |
buf
のサイズが微妙に違うためエッジケースではあるが次のケースにて動作が変わると思われる。
具体的に 256 KiB + 1 Bytes のデータを送信する場合を考える。
blocking の場合、shim logger のバッファ(buf
)に律速され 256KiB - 26Bytes の 262118 Bytes だけ読み出されて Docker の Log が実行される。
Docker の Log では、制限には引っかからないため、262118 Bytes のログが転送される。そして残りの 27 Bytes が転送される。
non-blocking の場合、shim logger のバッファ(buf
)に律速され 256 KiB だけ読み出されて Docker の Log が実行される。
Docker の Log では、制限に該当するため、262118 Bytes と 26 Bytes に分割されて送信される。そして残りの 1 Bytes が転送される。
したがって、262118 Bytes のログと 26 Bytes のログと、1 Bytes のログの 3 つに分割される。
その他
その他のバッファ
その他のバッファで言えば、stdout への書き込みは一般的に glibc などの実装によりバッファリングされ、stderr はバッファリングされない。
また TCP や UDS などでログを転送する場合、TCP の送信バッファによってバッファリングされる。
具体的に net.ipv4.tcp_wmem
などで設定可能である。
containerd の fifo と file
binary の場合、パイプからのログの読み出しは shim が実行するバイナリ(ecs shim logger)が行なっていた。 一方で、fifo と file の場合、shim がパイプからログを読み出し、containerd が作成した fifo や shim が作成したファイルへコピーする。
引数の stdout
や stderr
は、file であればファイルのパス、fifo であれば fifo のパスである。
rio
はコンテナの stdout/err に対応したパイプを保持している。
fifo であれば fifo.OpenFifo
、file であれば os.OpenFile
で開き、io.CopyBuffer
でコンテナのパイプから、fifo もしくはファイルへコピーする(go routine を実行する)。
また、コピーに際して bufPool
を使用してバッファリングしている。
stdout/err に対応する fifo はブロッキングモードなので、fifo から読み出されない場合、コンテナのログ出力は fifo へ書き込みできずブロッキングされる可能性がある。
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
var sameFile *countingWriteCloser
for _, i := range []struct {
name string
dest func(wc io.WriteCloser, rc io.Closer)
}{
{
name: stdout,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
}, {
name: stderr,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
log.G(ctx).Warn("error copying stderr")
}
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
},
} {
ok, err := fifo.IsFifo(i.name)
if err != nil {
return err
}
var (
fw io.WriteCloser
fr io.Closer
)
if ok {
if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil {
return fmt.Errorf("containerd-shim: opening w/o fifo %q failed: %w", i.name, err)
}
if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil {
return fmt.Errorf("containerd-shim: opening r/o fifo %q failed: %w", i.name, err)
}
} else {
if sameFile != nil {
sameFile.count++
i.dest(sameFile, nil)
continue
}
if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil {
return fmt.Errorf("containerd-shim: opening file %q failed: %w", i.name, err)
}
if stdout == stderr {
sameFile = &countingWriteCloser{
WriteCloser: fw,
count: 1,
}
}
}
i.dest(fw, fr)
}
if stdin == "" {
return nil
}
f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
}
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(rio.Stdin(), f, *p)
rio.Stdin().Close()
f.Close()
}()
return nil
}