Using an HDFS Sink and rollInterval in Flume-ng to batch up 90 seconds of log information

briano1945849 picture briano1945849 · Jan 3, 2013 · Viewed 14.2k times · Source

I am trying to use Flume-ng to grab 90 seconds of log information and put it into a file in HDFS. I have flume working to look at the log file via an exec and tail however it is creating a file every 5 seconds instead of what I am trying to configure as every 90 seconds.

My flume.conf is as follows:

# example.conf: A single-node Flume configuration                                                                                                                  
# Name the components on this agent                                                                                                                                
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1                                                                                                                                       
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log

# Describe sink1                                                                                                                                                   
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# this parameter seems to be getting overridden                                                                                                                    
agent1.sinks.sink1.hdfs.hdfs.rollCount = 0

# Use a channel which buffers events in memory                                                                                                                     
agent1.channels.channel1.type = memory

# Bind the source and sink to the channel                                                                                                                          
agent1.sources.source1.channels = channel1 = channel1

I am attempting to control the file size by the parameter - agent1.sinks.sink1.hdfs.rollInterval=90.

Running this config produces:

13/01/03 09:43:02 INFO properties.PropertiesFileConfigurationProvider: Reloading configuration file:/etc/flume-ng/conf/flume.conf
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Added sinks: sink1 Agent: agent1
13/01/03 09:43:03 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration  for agents: [agent1]
13/01/03 09:43:03 INFO properties.PropertiesFileConfigurationProvider: Creating channels
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: channel1, registered successfully.
13/01/03 09:43:03 INFO properties.PropertiesFileConfigurationProvider: created channel channel1
13/01/03 09:43:03 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs
13/01/03 09:43:03 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SINK, name: sink1, registered successfully.
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:source1,state:IDLE} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1a50ca0c counterGroup:{ name:null counters:{} } }} channels:{{name: channel1}} }
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel channel1
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink sink1
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Source source1
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
13/01/03 09:43:03 INFO source.ExecSource: Exec source starting with command:tail -f /home/cloudera/LogCreator/fortune_log.log
13/01/03 09:43:07 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186506.tmp
13/01/03 09:43:08 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186506.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186506
13/01/03 09:43:08 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186507.tmp
13/01/03 09:43:12 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186507.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186507
13/01/03 09:43:12 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186508.tmp
13/01/03 09:43:12 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186508.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186508
13/01/03 09:43:12 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186509.tmp
13/01/03 09:43:18 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186509.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186509
13/01/03 09:43:18 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186510.tmp
13/01/03 09:43:18 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186510.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186510

As you can see by the timestamps it is creating a file about every 5 seconds or so. This creates to many small files.

I would like to be able to create the file on a larger time interval (90 seconds).


briano1945849 picture briano1945849 · Jan 4, 2013

A rewrite of the config file specifying a more complete selection of parameters did the trick. This example will write after 10k records or 10 min which ever comes first. In addition I changed from a memory channel to a file channel to aid in reliability on the data flow.

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1                                                                                                                                                                                                                 
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log

# Describe sink1                                                                                                                                                                                                                             
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)                                                                                                                                              
agent1.sinks.sink1.hdfs.rollInterval = 600
# File size to trigger roll, in bytes (0: never roll based on file size)                                                                                                                                                                     
agent1.sinks.sink1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)                                                                                                                                                
agent1.sinks.sink1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS                                                                                                                                                                                 
agent1.sinks.sink1.hdfs.batchSize = 10000
agent1.sinks.sink1.hdfs.txnEventMax = 40000
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy                                                                                                                                                                          
# hdfs.codeC = gzip                                                                                                                                                                                                                          
#format: currently SequenceFile, DataStream or CompressedStream                                                                                                                                                                              
#(1)DataStream will not compress output file and please don't set codeC                                                                                                                                                                      
#(2)CompressedStream requires set hdfs.codeC with an available codeC                                                                                                                                                                         
agent1.sinks.sink1.hdfs.fileType = DataStream
# -- "Text" or "Writable"                                                                                                                                                                                                                    
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)                                                                                                                                                                        
# Number of threads per HDFS sink for scheduling timed file rolling                                                                                                                                                                          
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS                                                                                                                                                                 
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS                                                                                                                                                                            
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)                                                                                                                         
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using                                                                                                                                                
# hdfs.roundUnit), less than current time.                                                                                                                                                                                                   
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.                                                                                                                                                           
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.                                                                                 
# serializer.*                                                                                                                                                                                                                               

# Use a channel which buffers events to a file                                                                                                                                                                                               
# -- The component type name, needs to be FILE.                                                                                                                                                                                              
agent1.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored                                                                                                                                          
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored                                                                                                                                                           
# The maximum size of transaction supported by the channel                                                                                                                                                                                   
agent1.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints                                                                                                                                                                                             
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file                                                                                                                                                                                                   
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel                                                                                                                                                                                                            
agent1.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation                                                                                                                                                                            
#write-timeout 3 Amount of time (in sec) to wait for a write operation                                                                                                                                                                       

# Bind the source and sink to the channel                                                                                                                                                                                                    
agent1.sources.source1.channels = channel1 = channel1