Flume Data Collection into HDFS with Avro Serialization 4


In this post, we will provide proof of concept for Flume Data collection into HDFS with Avro Serialization by using HDFS sink, Avro Serializer on Sequence Files with Snappy Compression. Also we will use the formatting escape sequences to store the events on HDFS Path.

In this post, we will create a flume agent with Spooling directory source with JDBC Channel and HDFS Sink.

Now lets create our agent Agent7 in flume.conf properties file under<FLUME_HOME/conf> directory.

Flume Data Collection into HDFS with Avro Serialization – Flume Agent – Spooling Directory Source, HDFS Sink (Formatting Escape Sequence, Avro Serializer, Sequence Files & Snappy Compression):

Add the below configuration properties in flume.conf file to create Agent7 with HDFS Sink.

In the above HDFS Sink setup, Observe below properties:

  • We have used hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S to setup the path with formatting escape sequences. Here %y-%%m-%d/%H%M/%S is used to create sub directories with format Year-Month-Day/HourMinuteSecond under parent directory /user/flume/events.
  • These directories are created based on the timestamp present on the header of each event from the channel. In case, if there are no timestamps on headers of events then we need to use hdfs.useLocalTimeStamp = true otherwise flume EventDelivery Exception occurs. If this property is set, the local time instead of the timestamp from the event header will be used while replacing the escape sequences.
  • And hdfs.fileType = SequenceFile denotes that in HDFS sink, output files should be created with Sequence File format. 
  • Also we have specified avro_event  serializer with snappy compression codec to store the output files with avro serialized format and compressing the sequence files with snappy Compression Codec.
Configuration Before Agent Start up:
  • Start Hadoop and Yarn daemons. Also create the HDFS Path and make sure flume user has write access to HDFS Path. Below commands are helpful for this.

Below is the screen shot from terminal performing the above activities.

HDFS Path

  • Create the folder specified  for spooling directory path, and make sure that flume user should have read+write+execute access to that folder. In our agent, it is/usr/lib/flume/spooldir directory.
Start Agent :
  • Start the agent with below command.

Flume Agent7

  • Copy our Input file into spool directory and wait for it to get absorbed by source.

Spool input

After a few seconds, once file absorption is completed.

Spool out

If we are done with feeding of input files we can stop the agent by Ctrl+C key press.

Verify the Output:
  • We can verify the output of this agent process at the target HDFS Path: /user/flume/events/ as shown below.

HDFS out

In the above output we can clearly observe that there are 6 sub directories created for each second (54th through 59th Second of 19:16 (Hour:Minute)). So, Flume agent took total of 6 seconds to process our input file alternatives.log into HDFS sink. And HDFS Sink used local timestamp as there was no timestamp present in the input file event headers.

  • We can confirm the file type of output files as Sequence files with hadoop fs -cat command as shown below.

In the output of above command we can observe that first three characters of content display will be SEQ which denotes that the file is in Sequence File format.

  • We can view the contents of a Snappy compressed sequence file with hadoop fs -text command if our hadoop supports snappy decompression. By default, hadoop distribution v 2.0.0 onwards, snappy is included in hadoop distribution itself. So, we will not face any issues to view snappy compressed file contents. If there are any issues, please refer the post snappy codec

Screen shot of terminal performing above two commands can seen below.

HDFS Seq2

As the file is in Avro Serialized format the contents are a bit weird.

So, we can use these avro serialized snappy compressed sequence files to further process inside HDFS using Mapreduce programs.

Details of Components Used with HDFS Sink in this post:
Event Serializers: 

Flume supports serialization of events with File_Roll Sink and HDFS Sink only. But we can write our custom event serializers by implementing EventSerializer interface.

As of now, flume provides two serializers shown below.

Body Text Serializer:

By default, events are Text serialized : This serializer writes the body of the event to an output stream without any transformation or modification. The event headers are ignored.

Property Name Default Description
appendNewline true Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.

Example for agent named a1:

a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

Avro Event Serializer:

avro_event: This serializer serializes Flume events into an Avro container file. The schema used is the same schema used for Flume events in the Avro RPC mechanism. This serializers inherits from the AbstractAvroEventSerializer class.

Configuration options are as follows:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. E.g snappy

About Siva

Senior Hadoop developer with 4 years of experience in designing and architecture solutions for the Big Data domain and has been involved with several complex engagements. Technical strengths include Hadoop, YARN, Mapreduce, Hive, Sqoop, Flume, Pig, HBase, Phoenix, Oozie, Falcon, Kafka, Storm, Spark, MySQL and Java.


Leave a comment

Your email address will not be published. Required fields are marked *

4 thoughts on “Flume Data Collection into HDFS with Avro Serialization

  • Dhaval Patel

    Hey This is Dhaval Patel. I am new in hadoop. I have problem to make changes any file in cloudera hadoop system . when i am trying to do config flume.xml file its shows that it cant save it. when i tried to change read write and execuate access then got message that i dont have access to change it. Please help me out here. Thanks a lot.

  • Daniel

    Hi,

     

    what if I’m using a kafka source wich receives avro-serialized objects with different schemas??

    I want to write only the body of the messages to HDFS (the body is already an avro record serialized)

    Should I use Body Text Serializer (Since I just want to copy a stream of avro-serilized bytes to HDFS files) ?

     

    Thanks


Review Comments
default image

I have attended Siva’s Spark and Scala training. He is good in presentation skills and explaining technical concepts easily to everyone in the group. He is having excellent real time experience and provided enough use cases to understand each concepts. Duration of the course and time management is awesome. Happy that I found a right person on time to learn Spark. Thanks Siva!!!

Dharmeswaran ETL / Hadoop Developer Spark Nov 2016 September 21, 2017

.