Guyz... we are stuck.. bail us out! :-)
We are having a 3 step log aggregation pipeline using Fluentd.
[#1 - Tail logs (raw logs)] --(TCP)--> [#2 - Parse the read logs into JSON] --(TCP)--> [#3 - Filter and output to Redis & Mongo]
We are NOT converting the tail'd logs into JSON in the first step. This is mainly due to the fact that we want to avoid any extra CPU consumption on that server. The log lines that we have are pretty complex and parsing is intentionally deferred for step #2 (on a different cluster/servers).
So phase #1 emits: time, tag & record (raw log line). We use in_tail plugin here so by default 'time' attribute indicates the time the record was read from the file. So it's possible under load that the read time may not match the log line's actual timestamp.
The JSON parsing is deferred to the 2nd phase.
At the 2nd phase once we have the log converted to JSON... we want to override the 'time' attribute sent by Phase #1 to the time attribute from the JSON record.
We use Fluent-Plugin-Parser at step #2 (https://github.com/tagomoris/fluent-plugin-parser).
How can we override the time attribute and make FluentD use that instead of the 'time' that was read in step #1?
Yes, you can do this with fluent-plugin-parser's undocumented feature "time_key" like this:
<source>
type exec
run_interval 3s
format json
command echo '{"message":"hello,2013-03-03 12:00:13"}'
tag first
</source>
<match first>
type parser
key_name message
time_key my_time
time_format %Y-%m-%d %H:%M:%S
format /^(?<some_field>[^,]*),(?<my_time>.*)/
tag second
</match>
<match second>
type stdout
</match>
What the above code snippet does is:
{"message":"hello,2013-03-03 12:00:13"}
every 3 seconds with the tag "first". This is for the purpose of testing.<match first>
. Then, the parser plugin parses the field called "message" with the regular expression. In your case, it would be format json
.time_key my_time
tells the parser plugin to look for a field inside the parsed value of the "message" field, and if it exists, it parsed that field with time_format %Y-%m-%d %H:%M:%S
. From this point on, this is the new timeIf you run the above conf, you should get an output like this:
root@ae4a398d41ef:/home/fluentd# fluentd -c fluent.conf
2014-05-31 00:01:19 +0000 [info]: starting fluentd-0.10.46
2014-05-31 00:01:19 +0000 [info]: reading config file path="fluent.conf"
2014-05-31 00:01:19 +0000 [info]: gem 'fluent-plugin-parser' version '0.3.4'
2014-05-31 00:01:19 +0000 [info]: gem 'fluentd' version '0.10.46'
2014-05-31 00:01:19 +0000 [info]: using configuration file: <ROOT>
<source>
type exec
run_interval 3s
format json
command echo '{"message":"hello,2013-03-03 12:00:13"}'
tag first
</source>
<match first>
type parser
key_name message
time_key my_time
time_format %Y-%m-%d %H:%M:%S
format /^(?<some_field>[^,]*),(?<my_time>.*)/
tag second
</match>
<match second>
type stdout
</match>
</ROOT>
2014-05-31 00:01:19 +0000 [info]: adding source type="exec"
2014-05-31 00:01:19 +0000 [info]: adding match pattern="first" type="parser"
2014-05-31 00:01:19 +0000 [info]: adding match pattern="second" type="stdout"
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}