Skip to content

michaelgibson/heka-stream-aggregator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
 
 
 
 
 
 
 
 

Repository files navigation

heka-stream-aggregator

Stream aggregation plugin for Mozilla Heka

StreamAggregatorFilter

This is a filter that can be used for aggregating multiple payloads into a single message. It accepts an encoder option that will be used prior to aggregating each payload. Each payload is separated by the delimiter config value.

Config:

  • stream_aggregator_tag: Optional tagging for identifying new pack down the pipeline since you will lose any Fields previously held. This setting creates a new Heka Field called "StreamAggregatorTag" and is given the value of this option. Defaults to "aggregated"

  • flush_interval: Interval at which accumulated payloads should be compressed in milliseconds. Defaults to 1000 (i.e. one second)

  • flush_bytes: Number of payloads that, if processed, will trigger them to be compressed. Defaults to 10.

  • encoder: This option will run each Payload through the specified encoder prior to aggregating.(required)

Example:

[filter_stream_aggregator]
type = "StreamAggregatorFilter"
message_matcher = "Fields[decoded] == 'True'"
stream_aggregator_tag = "aggregated"
flush_interval = 30000
flush_bytes = 1000000
encoder = "encoder_json"
delimiter = "\n" # Default

Example2 for bulk ES inserts:

[filter_stream_aggregator]
type = "StreamAggregatorFilter"
message_matcher = "Fields[decoded] == 'True'"
stream_aggregator_tag = "aggregated"
flush_interval = 30000
flush_bytes = 1000000
encoder = "ESLogstashV0Encoder"
delimiter = ""

[ESLogstashV0Encoder]
index = "logstash-%{program}-%{2006.01.02}"
type_name = "%{program}"
es_index_from_timestamp = true
id = "%{id}"

HTTP Output heka instance

[HttpOutput]
message_matcher = "Fields[StreamAggregatorTag] == 'aggregated'"
address = "http://es01.foo.bar:9200/_bulk"
encoder	= "encoder_payload"

StreamSplitterDecoder

Used inside of MultiDecoder for splitting a payload generated by a StreamAggregatorFilter

Example:

[multi_decoder]
type = "MultiDecoder"
order = ['zlib_decoder', 'split_decoder', 'json_decoder']

[multi_decoder.subs.zlib_decoder]
type = "ZlibDecoder"

[multi_decoder.subs.split_decoder]
type = "StreamSplitterDecoder"
delimiter = "\n"

[multi_decoder.subs.json_decoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "/usr/share/heka/lua_decoders/json_decoder.lua"
preserve_data = true

To Build

See Building hekad with External Plugins for compiling in plugins.

Edit cmake/plugin_loader.cmake file and add

add_external_plugin(git https://github.com/michaelgibson/heka-stream-aggregator master)

Build Heka: . ./build.sh

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages