Skip to content

Commit

Permalink
ParseContainerLogLine when no new line (#2090)
Browse files Browse the repository at this point in the history
  • Loading branch information
quzard authored Feb 13, 2025
1 parent eb8c8ed commit a2e2282
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 3 deletions.
8 changes: 5 additions & 3 deletions plugins/input/docker/stdout/docker_stdout_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,11 @@ func (p *DockerStdoutProcessor) Process(fileBlock []byte, noChangeInterval time.

// no new line
if nowIndex == 0 && len(fileBlock) > 0 {
l := &LogMessage{Time: "_time_", StreamType: "_source_", Content: fileBlock}
p.collector.AddRawLogWithContext(p.newRawLogBySingleLine(l), map[string]interface{}{"source": p.source})
processedCount = len(fileBlock)
thisLog := p.ParseContainerLogLine(fileBlock)
if p.StreamAllowed(thisLog) {
p.collector.AddRawLogWithContext(p.newRawLogBySingleLine(thisLog), map[string]interface{}{"source": p.source})
processedCount = len(fileBlock)
}
}
return processedCount
}
Expand Down
152 changes: 152 additions & 0 deletions plugins/input/docker/stdout/docker_stdout_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,158 @@ func (s *inputProcessorTestSuite) TestNormal(c *check.C) {
}
}

func (s *inputProcessorTestSuite) TestStreamAllowed(c *check.C) {
stdoutAllowed := true
stderrAllowed := false
// docker
{
// stdout
{
// 合法的json
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine := `{"log":"123456","stream":"stdout", "time":"2018-05-16T06:28:41.2195434Z"}`
processor := NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes := []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))

c.Assert(len(s.collector.Logs), check.Equals, 1)

c.Assert(s.collector.Logs[0].Contents[0].GetKey(), check.Equals, "content")
c.Assert(s.collector.Logs[0].Contents[1].GetKey(), check.Equals, "_time_")
c.Assert(s.collector.Logs[0].Contents[2].GetKey(), check.Equals, "_source_")

c.Assert(s.collector.Logs[0].Contents[0].GetValue(), check.Equals, "123456")
c.Assert(s.collector.Logs[0].Contents[1].GetValue(), check.Equals, "2018-05-16T06:28:41.2195434Z")
c.Assert(s.collector.Logs[0].Contents[2].GetValue(), check.Equals, "stdout")

// 非法的json
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")
splitedlogNoNewLine = `{"log":"123456","stream":"stdout", "time":"2018-05-16`
processor = NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes = []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))

c.Assert(len(s.collector.Logs), check.Equals, 1)

c.Assert(s.collector.Logs[0].Contents[0].GetKey(), check.Equals, "content")
c.Assert(s.collector.Logs[0].Contents[1].GetKey(), check.Equals, "_time_")
c.Assert(s.collector.Logs[0].Contents[2].GetKey(), check.Equals, "_source_")

c.Assert(s.collector.Logs[0].Contents[0].GetValue(), check.Equals, `{"log":"123456","stream":"stdout", "time":"2018-05-16`)
c.Assert(s.collector.Logs[0].Contents[1].GetValue(), check.Equals, "")
c.Assert(s.collector.Logs[0].Contents[2].GetValue(), check.Equals, "")
}
// stderr
{
// 合法的json
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine := `{"log":"123456","stream":"stderr", "time":"2018-05-16T06:28:41.2195434Z"}`
processor := NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes := []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))
c.Assert(len(s.collector.Logs), check.Equals, 0)

// 非法的json
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine = `{"log":"123456","stream":"stderr", "time":"2018-05-16T06:28`
processor = NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes = []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))

c.Assert(len(s.collector.Logs), check.Equals, 1)

c.Assert(s.collector.Logs[0].Contents[0].GetKey(), check.Equals, "content")
c.Assert(s.collector.Logs[0].Contents[1].GetKey(), check.Equals, "_time_")
c.Assert(s.collector.Logs[0].Contents[2].GetKey(), check.Equals, "_source_")

c.Assert(s.collector.Logs[0].Contents[0].GetValue(), check.Equals, `{"log":"123456","stream":"stderr", "time":"2018-05-16T06:28`)
c.Assert(s.collector.Logs[0].Contents[1].GetValue(), check.Equals, "")
c.Assert(s.collector.Logs[0].Contents[2].GetValue(), check.Equals, "")
}
}
// containerd
{
// stdout
{
// 合法的containerd日志
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine := `2018-05-16T06:28:41.2195434Z stdout F 123456`
processor := NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes := []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))

c.Assert(len(s.collector.Logs), check.Equals, 1)

c.Assert(s.collector.Logs[0].Contents[0].GetKey(), check.Equals, "content")
c.Assert(s.collector.Logs[0].Contents[1].GetKey(), check.Equals, "_time_")
c.Assert(s.collector.Logs[0].Contents[2].GetKey(), check.Equals, "_source_")

c.Assert(s.collector.Logs[0].Contents[0].GetValue(), check.Equals, "123456")
c.Assert(s.collector.Logs[0].Contents[1].GetValue(), check.Equals, "2018-05-16T06:28:41.2195434Z")
c.Assert(s.collector.Logs[0].Contents[2].GetValue(), check.Equals, "stdout")

// 非法的containerd日志
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine = `2018-05-16T06:28:41.2195434Z stdout`
processor = NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes = []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))

c.Assert(len(s.collector.Logs), check.Equals, 1)

c.Assert(s.collector.Logs[0].Contents[0].GetKey(), check.Equals, "content")
c.Assert(s.collector.Logs[0].Contents[1].GetKey(), check.Equals, "_time_")
c.Assert(s.collector.Logs[0].Contents[2].GetKey(), check.Equals, "_source_")

c.Assert(s.collector.Logs[0].Contents[0].GetValue(), check.Equals, `2018-05-16T06:28:41.2195434Z stdout`)
c.Assert(s.collector.Logs[0].Contents[1].GetValue(), check.Equals, "")
c.Assert(s.collector.Logs[0].Contents[2].GetValue(), check.Equals, "")
}
// stderr
{
// 合法的containerd日志
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine := `2018-05-16T06:28:41.2195434Z stderr F 123456`
processor := NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes := []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))
c.Assert(len(s.collector.Logs), check.Equals, 0)

// 非法的containerd日志
s.collector.Logs = s.collector.Logs[:0]
s.context.InitContext("project", "logstore", "config")

splitedlogNoNewLine = `2018-05-16T06:28:41.2195434Z stderr`
processor = NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, stdoutAllowed, stderrAllowed, &s.context, &s.collector, s.tag, s.source)
splitedlogNoNewLineBytes = []byte(splitedlogNoNewLine)
processor.Process(splitedlogNoNewLineBytes, time.Duration(0))

c.Assert(len(s.collector.Logs), check.Equals, 1)

c.Assert(s.collector.Logs[0].Contents[0].GetKey(), check.Equals, "content")
c.Assert(s.collector.Logs[0].Contents[1].GetKey(), check.Equals, "_time_")
c.Assert(s.collector.Logs[0].Contents[2].GetKey(), check.Equals, "_source_")

c.Assert(s.collector.Logs[0].Contents[0].GetValue(), check.Equals, `2018-05-16T06:28:41.2195434Z stderr`)
c.Assert(s.collector.Logs[0].Contents[1].GetValue(), check.Equals, "")
c.Assert(s.collector.Logs[0].Contents[2].GetValue(), check.Equals, "")
}
}
}

func (s *inputProcessorTestSuite) TestSplitedLine(c *check.C) {
processor := NewDockerStdoutProcessor(nil, time.Second, 0, 512*1024, true, true, &s.context, &s.collector, s.tag, s.source)
splitedlog1Bytes := []byte(splitedlog1)
Expand Down

0 comments on commit a2e2282

Please sign in to comment.