In this post, we will provide proof of concept for Flume Data collection into HDFS with Avro Serialization by using HDFS sink, Avro Serializer on Sequence Files with Snappy Compression. Also we will use the formatting escape sequences to store the events on HDFS Path.
In this post, we will create a flume agent with Spooling directory source with JDBC Channel and HDFS Sink.
Now lets create our agent Agent7 in flume.conf properties file under<FLUME_HOME/conf> directory.
Table of Contents
- Flume Data Collection into HDFS with Avro Serialization – Flume Agent – Spooling Directory Source, HDFS Sink (Formatting Escape Sequence, Avro Serializer, Sequence Files & Snappy Compression):
- Start Agent :
- Details of Components Used with HDFS Sink in this post:
- Event Serializers:
Flume Data Collection into HDFS with Avro Serialization – Flume Agent – Spooling Directory Source, HDFS Sink (Formatting Escape Sequence, Avro Serializer, Sequence Files & Snappy Compression):
Add the below configuration properties in flume.conf file to create Agent7 with HDFS Sink.
### Agent7 - HDFS Sink, Spooling Directory Source and JDBC Channel ###
# Name the components on this agent
Agent7.sources = spooldir-source
Agent7.channels = jdbc-channel
Agent7.sinks = hdfs-sink
# Describe/configure Source
Agent7.sources.spooldir-source.type = spooldir
Agent7.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
Agent7.sources.spooldir-source.fileHeader = true
# Describe the sink
Agent7.sinks.hdfs-sink.hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S
Agent7.sinks.hdfs-sink.hdfs.filePrefix = event
Agent7.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent7.sinks.hdfs-sink.hdfs.rollSize = 0
Agent7.sinks.hdfs-sink.hdfs.rollCount = 10000
Agent7.sinks.hdfs-sink.hdfs.fileType = SequenceFile
Agent7.sinks.hdfs-sink.serializer = avro_event
Agent7.sinks.hdfs-sink.serializer.compressionCodec = snappy
# Use a channel which buffers events in file
Agent7.channels.jdbc-channel.type = jdbc
# Bind the source and sink to the channel
Agent7.sources.spooldir-source.channels = jdbc-channel
Agent7.sinks.hdfs-sink.channel = jdbc-channel
In the above HDFS Sink setup, Observe below properties:
- We have used hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S to setup the path with formatting escape sequences. Here %y-%%m-%d/%H%M/%S is used to create sub directories with format Year-Month-Day/HourMinuteSecond under parent directory /user/flume/events.
- These directories are created based on the timestamp present on the header of each event from the channel. In case, if there are no timestamps on headers of events then we need to use hdfs.useLocalTimeStamp = true otherwise flume EventDelivery Exception occurs. If this property is set, the local time instead of the timestamp from the event header will be used while replacing the escape sequences.
- And hdfs.fileType = SequenceFile denotes that in HDFS sink, output files should be created with Sequence File format.
- Also we have specified avro_event serializer with snappy compression codec to store the output files with avro serialized format and compressing the sequence files with snappy Compression Codec.
Configuration Before Agent Start up:
- Start Hadoop and Yarn daemons. Also create the HDFS Path and make sure flume user has write access to HDFS Path. Below commands are helpful for this.
$ hadoop fs -mkdir /user/flume/events
$ hadoop fs -chmod -R 777 /user/flume/events
Below is the screen shot from terminal performing the above activities.
- Create the folder specified for spooling directory path, and make sure that flume user should have read+write+execute access to that folder. In our agent, it is/usr/lib/flume/spooldir directory.
Start Agent :
- Start the agent with below command.
$ flume-ng agent --conf $FLUME_CONF_DIR --conf-file $FLUME_CONF_DIR/flume.conf --name Agent7
- Copy our Input file into spool directory and wait for it to get absorbed by source.
After a few seconds, once file absorption is completed.
If we are done with feeding of input files we can stop the agent by Ctrl+C key press.
Verify the Output:
- We can verify the output of this agent process at the target HDFS Path: /user/flume/events/ as shown below.
In the above output we can clearly observe that there are 6 sub directories created for each second (54th through 59th Second of 19:16 (Hour:Minute)). So, Flume agent took total of 6 seconds to process our input file alternatives.log into HDFS sink. And HDFS Sink used local timestamp as there was no timestamp present in the input file event headers.
- We can confirm the file type of output files as Sequence files with hadoop fs -cat command as shown below.
$ hadoop fs -cat /user/flume/events/14-09-27/1916/54/event.1411825614452
In the output of above command we can observe that first three characters of content display will be SEQ which denotes that the file is in Sequence File format.
- We can view the contents of a Snappy compressed sequence file with hadoop fs -text command if our hadoop supports snappy decompression. By default, hadoop distribution v 2.0.0 onwards, snappy is included in hadoop distribution itself. So, we will not face any issues to view snappy compressed file contents. If there are any issues, please refer the post snappy codec
Screen shot of terminal performing above two commands can seen below.
As the file is in Avro Serialized format the contents are a bit weird.
So, we can use these avro serialized snappy compressed sequence files to further process inside HDFS using Mapreduce programs.
Details of Components Used with HDFS Sink in this post:
Flume supports serialization of events with File_Roll Sink and HDFS Sink only. But we can write our custom event serializers by implementing EventSerializer interface.
As of now, flume provides two serializers shown below.
Body Text Serializer:
By default, events are Text serialized : This serializer writes the body of the event to an output stream without any transformation or modification. The event headers are ignored.
|appendNewline||true||Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.|
Example for agent named a1:
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
Avro Event Serializer:
avro_event: This serializer serializes Flume events into an Avro container file. The schema used is the same schema used for Flume events in the Avro RPC mechanism. This serializers inherits from the AbstractAvroEventSerializer class.
Configuration options are as follows:
|syncIntervalBytes||2048000||Avro sync interval, in approximate bytes.|
|compressionCodec||null||Avro compression codec. E.g snappy|