In this post, we will provide proof of concept for Flume Data collection into HDFS with Avro Serialization by using HDFS sink, Avro Serializer on Sequence Files with Snappy Compression. Also we will use the formatting escape sequences to store the events on HDFS Path.
In this post, we will create a flume agent with Spooling directory source with JDBC Channel and HDFS Sink.
Now lets create our agent Agent7 in flume.conf properties file under<FLUME_HOME/conf> directory.
In the above HDFS Sink setup, Observe below properties:
We have used hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S to setup the path with formatting escape sequences. Here %y-%%m-%d/%H%M/%S is used to create sub directories with format Year-Month-Day/HourMinuteSecond under parent directory /user/flume/events.
These directories are created based on the timestamp present on the header of each event from the channel. In case, if there are no timestamps on headers of events then we need to use hdfs.useLocalTimeStamp = trueotherwise flume EventDelivery Exception occurs. If this property is set, the local time instead of the timestamp from the event header will be used while replacing the escape sequences.
And hdfs.fileType = SequenceFiledenotes that in HDFS sink, output files should be created with Sequence File format.
Also we have specified avro_event serializer with snappy compression codec to store the output files with avro serialized format and compressing the sequence files with snappy Compression Codec.
Configuration Before Agent Start up:
StartHadoop and Yarn daemons. Also create the HDFS Path and make sure flume user has write access to HDFS Path. Below commands are helpful for this.
Below is the screen shot from terminal performing the above activities.
Create the folder specified for spooling directory path, and make sure that flume user should have read+write+execute access to that folder. In our agent, it is/usr/lib/flume/spooldir directory.
Copy our Input file into spool directory and wait for it to get absorbed by source.
After a few seconds, once file absorption is completed.
If we are done with feeding of input files we can stop the agent by Ctrl+C key press.
Verify the Output:
We can verify the output of this agent process at the target HDFS Path: /user/flume/events/ as shown below.
In the above output we can clearly observe that there are 6 sub directories created for each second (54th through 59th Second of 19:16 (Hour:Minute)). So, Flume agent took total of 6 seconds to process our input file alternatives.log into HDFS sink. And HDFS Sink used local timestamp as there was no timestamp present in the input file event headers.
We can confirm the file type of output files as Sequence files with hadoop fs -cat command as shown below.
In the output of above command we can observe that first three characters of content display will be SEQ which denotes that the file is in Sequence File format.
We can view the contents of a Snappy compressed sequence file with hadoop fs -text command if our hadoop supports snappy decompression. By default, hadoop distribution v 2.0.0 onwards, snappy is included in hadoop distribution itself. So, we will not face any issues to view snappy compressed file contents. If there are any issues, please refer the post snappy codec
Screen shot of terminal performing above two commands can seen below.
As the file is in Avro Serialized format the contents are a bit weird.
So, we can use these avro serialized snappy compressed sequence files to further process inside HDFS using Mapreduce programs.
Details of Components Used with HDFS Sink in this post:
Flume supports serialization of events with File_Roll Sink and HDFS Sink only. But we can write our custom event serializers by implementing EventSerializer interface.
As of now, flume provides two serializers shown below.
Body Text Serializer:
By default, events are Text serialized : This serializer writes the body of the event to an output stream without any transformation or modification. The event headers are ignored.
Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.
avro_event: This serializer serializes Flume events into an Avro container file. The schema used is the same schema used for Flume events in the Avro RPC mechanism. This serializers inherits from the AbstractAvroEventSerializer class.