-
Notifications
You must be signed in to change notification settings - Fork 529
How to convert a PayloadRegex MultiDecoder to a SandboxDecoder using an LPeg Grammar
Below is a user's configuration file that parses entries from a syslog file using a MultiDecoder setup. It extracts three unique message types and leaves everything else in the message payload. This tutorial is a result of a question posed on IRC on how to turn it into a single SandboxDecoder using LPeg. The regexs are pretty typical of what we see in the real world so they will be left as-is even though some optimizations are possible.
[LogstreamerInput]
log_directory = "/var/log"
file_match = "syslog"
decoder = "ServiceDecoder"
[ServiceDecoder]
type = "MultiDecoder"
subs = ['SphinxRequestDecoder', 'SphinxErrorDecoder', 'GearmanAdminDecoder', 'ElseDecoder']
cascade_strategy = "first-wins"
[SphinxRequestDecoder]
type = "PayloadRegexDecoder"
match_regex = '.+ (?P<Hostname>\S+) sphinx: (?P<Timestamp>.+) \[(?P<Uuid>.+)\] REQUEST: path=(?P<Path>\S+) remoteaddr=(?P<Remoteaddr>\S+) (?P<Headers>.+)'
timestamp_layout = "2006/01/02 15:04:05"
[SphinxRequestDecoder.message_fields]
Type = "SphinxRequest"
dsn = "{{ pillar.sentry_dsn }}"
Hostname = "%Hostname%"
Uuid = "%Uuid%"
Path = "%Path%"
Remoteaddr|ipv4 = "%remoteaddr%"
Headers = "%Headers%"
Payload = ""
[SphinxErrorDecoder]
type = "PayloadRegexDecoder"
match_regex = '.+ (?P<Hostname>\S+) sphinx: (?P<Timestamp>.+) \[(?P<Uuid>.+)\] ERROR: (?P<Message>.+)'
timestamp_layout = "2006/01/02 15:04:05"
[SphinxErrorDecoder.message_fields]
Type = "SphinxError"
dsn = "{{ pillar.sentry_dsn }}"
Hostname = "%Hostname%"
Uuid = "%Uuid%"
Message = "%Message%"
Payload = ""
[GearmanAdminDecoder]
type = "PayloadRegexDecoder"
match_regex = 'Job \S+ (?:finished|failed), marking complete: (?P<Data>.+)'
[GearmanAdminDecoder.message_fields]
Type = "GearmanAdmin"
Data|json = "%Data%"
Payload = ""
# Just to prevent logspam - otherwise heka outputs a log line for everything it can't match
[ElseDecoder]
type = "PayloadRegexDecoder"
match_regex = '(.*)'
[ElseDecoder.message_fields]
Type = "Ignore"
Sample input (we are only interested in the syslog message part): Feb 9 14:17:01 trink-x230 sphinx:
2006/01/02 15:04:05 [BD48B609-8922-4E59-A358-C242075CE088] REQUEST: path=/var/tmp remoteaddr=192.168.1.1 header data
Regex (from above): .+ (?P<Hostname>\S+) sphinx:
(?P<Timestamp>.+) \[(?P<Uuid>.+)\] REQUEST: path=(?P<Path>\S+) remoteaddr=(?P<Remoteaddr>\S+) (?P<Headers>.+)
timestamp * sp * uuid * sp * request * sp * path * sp * remoteaddr * sp * headers
- Use the strftime grammar builder
dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S")
- Convert the time to a Unix nanosecond timestamp
dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S") / dt.time_to_ns
- Capture the value as "Timestamp"
l.Cg(dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S") / dt.time_to_ns, "Timestamp")
l.space
- Define a block of four hex digits (x4)
l.xdigit * l.xdigit * l.xdigit * l.xdigit
- Construct the UUID grammar: bracket followed by a UUID captured as "Uuid" followed by a bracket
l.P"[" * l.Cg(x4 * x4 * "-" * x4 * "-" * x4 * "-" * x4 * "-" * x4 * x4 * x4, "Uuid") * "]"
- A literal "REQUEST:" followed by a constant capture setting the "Type"
l.P"REQUEST:" * l.Cg(l.Cc"SphinxRequest", "Type")
- A literal 'path=' followed by 1 or more printable US ASCII values captured into "Path"
l.P"path=" * l.Cg(l.R"!~"^1, "Path")
- Use the ip_address module:
ip.v4
- Capture zero or more characters until the end of the line into "Header"
l.Cg(l.P(1)^0, "Headers")
http://lpeg.trink.com/share/6395413193107056095
Sample input: Feb 9 14:17:01 trink-x230 sphinx:
2006/01/02 15:04:05 [BD48B609-8922-4E59-A358-C242075CE088] ERROR: bad things happened
Regex (from above): .+ (?P<Hostname>\S+) sphinx:
(?P<Timestamp>.+) \[(?P<Uuid>.+)\] ERROR: (?P<Message>.+)
timestamp * sp * uuid * sp * err * sp * message
- A literal "ERROR:" followed by a constant capture setting the "Type"
l.P"ERROR:" * l.Cg(l.Cc"SphinxError", "Type")
- Capture zero or more characters until the end of the line into "Message"
l.Cg(l.P(1)^0, "Message")
http://lpeg.trink.com/share/12793449786352980261
Sample input: Feb 9 14:17:01 trink-x230 sphinx:
Job DoSomething finished, marking complete: 100 widgets were processed
Regex (from above): Job \S+ (?:finished|failed), marking complete: (?P<Data>.+)
job * sp * status * data_header * sp * data
- A literal 'Job' followed by a space, followed by 1 or more non space characters with a constant capture setting the "Type"
l.P"Job" * sp * (l.P(1) - sp)^1 * l.Cg(l.Cc("GearmanAdmin"), "Type")
- finished or failed
l.P"finished" + "failed"
- A literal ', marking complete'
l.P", marking complete:"
- Capture zero or more characters until the end of the line into "Data"
l.Cg(l.P(1)^0, "Data")
http://lpeg.trink.com/share/18152294687818003697
[SphinxDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/sphinx.lua"
[SphinxDecoder.config]
dsn = "{{ pillar.sentry_dsn }}"
tz = "America/Los_Angeles"
local dt = require "date_time"
local ip = require "ip_address"
local l = require 'lpeg'
local syslog = require "syslog"
l.locale(l)
local msg = {
Timestamp = nil,
Hostname = nil,
Payload = nil,
Pid = nil,
Fields = nil
}
local dsn = read_config("dsn") or ""
local syslog_grammar = syslog.build_rsyslog_grammar("%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n")
local sp = l.space
local timestamp = l.Cg(dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S") / dt.time_to_ns, "Timestamp")
local x4 = l.xdigit * l.xdigit * l.xdigit * l.xdigit
local uuid = l.P"[" * l.Cg(x4 * x4 * "-" * x4 * "-" * x4 * "-" * x4 * "-" * x4 * x4 * x4, "Uuid") * "]"
local request = l.P"REQUEST:" * l.Cg(l.Cc"SphinxRequest", "Type")
local err = l.P"ERROR:" * l.Cg(l.Cc"SphinxError", "Type")
local path = l.P"path=" * l.Cg(l.R"!~"^1, "Path")
local remoteaddr = l.P"remoteaddr=" * l.Cg(l.Ct(l.Cg(ip.v4, "value") * l.Cg(l.Cc"ipv4", "representation")), "Remoteaddr")
local headers = l.Cg(l.P(1)^0, "Headers")
local message = l.Cg(l.P(1)^0, "Message")
local job = l.P"Job" * sp * (l.P(1) - sp)^1 * l.Cg(l.Cc("GearmanAdmin"), "Type")
local status = l.P"finished" + "failed"
local data_header = l.P", marking complete:"
local data = l.Cg(l.P(1)^0, "Data")
local sphinx_header = timestamp * sp * uuid * sp
local request_type = request * sp * path * sp * remoteaddr * sp * headers
local error_type = err * sp * message
local job_type = job * sp * status * data_header * sp * data
local msg_grammar = l.Ct((sphinx_header * (request_type + error_type)) + job_type)
function process_message ()
local log = read_message("Payload")
local fields = syslog_grammar:match(log)
if not fields then return -1 end
msg.Timestamp = fields.timestamp
fields.timestamp = nil
fields.programname = fields.syslogtag.programname
msg.Pid = fields.syslogtag.pid or nil
fields.syslogtag = nil
msg.Hostname = fields.hostname
fields.hostname = nil
local m = msg_grammar:match(fields.msg)
if m then
msg.Type = m.Type
msg.Payload = nil
if m.Type == "SphinxRequest" then
msg.Timestamp = m.Timestamp
fields.Uuid = m.Uuid
fields.Path = m.Path
fields.Remoteaddr = m.RemoteAddr
fields.Headers = m.Headers
fields.dsn = dsn
elseif m.Type == "SphinxError" then
msg.Timestamp = m.Timestamp
fields.Uuid = m.Uuid
fields.Message = m.Message
fields.dsn = dsn
elseif m.Type == "GearmanAdmin" then
fields.Data = {}
fields.Data.value = m.Data
fields.Data.representation = "json"
end
else
msg.Type = "Ignore"
msg.Payload = fields.msg
end
fields.msg = nil
msg.Fields = fields
inject_message(msg)
return 0
end
The output isn't identical but it is very close. The SandboxDecoder adds the Uuid as a field instead of in the header. Also, the SandboxDecoder always processes the syslog message variables so the 'Job' message will receive the correct timestamp and hostname.
9436 messages per second
31257 message per second