Flume Data Collection into HDFS with Avro Serialization 6


In this post, we will provide proof of concept for Flume Data collection into HDFS with Avro Serialization by using HDFS sink, Avro Serializer on Sequence Files with Snappy Compression. Also we will use the formatting escape sequences to store the events on HDFS Path.

In this post, we will create a flume agent with Spooling directory source with JDBC Channel and HDFS Sink.

Now lets create our agent Agent7 in flume.conf properties file under<FLUME_HOME/conf> directory.

Flume Data Collection into HDFS with Avro Serialization – Flume Agent – Spooling Directory Source, HDFS Sink (Formatting Escape Sequence, Avro Serializer, Sequence Files & Snappy Compression):

Add the below configuration properties in flume.conf file to create Agent7 with HDFS Sink.

In the above HDFS Sink setup, Observe below properties:

  • We have used hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S to setup the path with formatting escape sequences. Here %y-%%m-%d/%H%M/%S is used to create sub directories with format Year-Month-Day/HourMinuteSecond under parent directory /user/flume/events.
  • These directories are created based on the timestamp present on the header of each event from the channel. In case, if there are no timestamps on headers of events then we need to use hdfs.useLocalTimeStamp = true otherwise flume EventDelivery Exception occurs. If this property is set, the local time instead of the timestamp from the event header will be used while replacing the escape sequences.
  • And hdfs.fileType = SequenceFile denotes that in HDFS sink, output files should be created with Sequence File format.
  • Also we have specified avro_event serializer with snappy compression codec to store the output files with avro serialized format and compressing the sequence files with snappy Compression Codec.
Configuration Before Agent Start up:
  • Start Hadoop and Yarn daemons. Also create the HDFS Path and make sure flume user has write access to HDFS Path. Below commands are helpful for this.

Below is the screen shot from terminal performing the above activities.

HDFS Path

  • Create the folder specified for spooling directory path, and make sure that flume user should have read+write+execute access to that folder. In our agent, it is/usr/lib/flume/spooldir directory.
Start Agent :
  • Start the agent with below command.

Flume Agent7

  • Copy our Input file into spool directory and wait for it to get absorbed by source.

Spool input

After a few seconds, once file absorption is completed.

Spool out

If we are done with feeding of input files we can stop the agent by Ctrl+C key press.

Verify the Output:
  • We can verify the output of this agent process at the target HDFS Path: /user/flume/events/ as shown below.

HDFS out

In the above output we can clearly observe that there are 6 sub directories created for each second (54th through 59th Second of 19:16 (Hour:Minute)). So, Flume agent took total of 6 seconds to process our input file alternatives.log into HDFS sink. And HDFS Sink used local timestamp as there was no timestamp present in the input file event headers.

  • We can confirm the file type of output files as Sequence files with hadoop fs -cat command as shown below.