Flume Data Collection into HDFS 2


In this post, we will discuss about setup of an agent for Flume data collection into HDFS .

In this post, we will setup an agent with Sequence Generator Source, HDFS Sink and Memory channel and start that agent and verify its functionality.

Flume data collection into HDFS

Flume Agent – Sequence Generator Source, HDFS Sink and Memory channel:
  • Add the below configuration properties in flume.conf file to create Agent4 with Sequence source, memory channel and HDFS Sink.

  • Now, start the hdfs daemons with start-dfs.sh command and create the destination folder /user/flume in hdfs and provide write access to other users. Below are the commands that can be used for this.

  •  Start the flume agent Agent4 with below command.

Flume Agent4

Once the agent is started successfully stop the agent (ctrl+c) within a few seconds otherwise sequence generator source will cause lots of files to be created in the destination directory. We have given hdfs.rollCount as 10000, so each log file will contain 10000 lines and each output file will be created with prefix log.*

  • Validate the output in destination folder /user/flume in hdfs.

Flume agent Op

In the above screen shots, we can see the sequence numbers (0, 1, 2, 3, 4, 5) created by sequence generator source and successful pushed into hdfs destination directory.

Flume gives the file currently being written a .tmp extension, which makes it easy to differentiate the completed files from in-progress files when specifying which files to process via MapReduce jobs.

So, we have successfully setup an agent to collect sample data into HDFS sink.

Component Details:

Below are the additional details of the components used in the above agent.

Sequence Generator Source:

A simple sequence generator that continuously generates events with a counter that starts from 0 and increments by 1. Useful mainly for testing.

Required properties are in bold.

Property Name Default Description
type   The component type name, needs to be seq
selector.type replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*  
batchSize 1 Max no of events to attempt to process per request loop
HDFS Sink:

HDFS sink writes events into the Hadoop Distributed File System (HDFS). It currently supports both text and sequence file creations and also compression in both
file types. These files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events.

It also partitions data by attributes like time stamp or source machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events.

The following are the escape sequences supported:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, …)
%A locale’s full weekday name (Monday, Tuesday, …)
%b locale’s short month name (Jan, Feb, …)
%B locale’s long month name (January, February, …)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

HDFS Sink supports many properties and a few of them are listed below. Required properties are in bold.

Property Name Default Description
type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro – NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile File format: currently SequenceFile , DataStream or CompressedStream (1)DataStream will not compress output
file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed
hdfs.minBlockReplicas Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config
in the classpath.
hdfs.writeFormat Format for sequence file records. One of “Text” or “Writable” (the default).
hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be
increased if many HDFS timeout operations are occurring.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.round FALSE Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit ), less than current
Time.
serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the
EventSerializer.Builder interface.

About Siva

Senior Hadoop developer with 4 years of experience in designing and architecture solutions for the Big Data domain and has been involved with several complex engagements. Technical strengths include Hadoop, YARN, Mapreduce, Hive, Sqoop, Flume, Pig, HBase, Phoenix, Oozie, Falcon, Kafka, Storm, Spark, MySQL and Java.


Leave a comment

Your email address will not be published. Required fields are marked *

2 thoughts on “Flume Data Collection into HDFS

  • kumar

    Hi

    Below is my conf file and when i run the flume start command i am unable to get any response after the below step in the log. Could you please review and help me what i did wrong and how to correct ?

    Log:

    hare/hadoop/mapreduce/lib/jersey-server-1.9.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/junit-4.11.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/leveldbjni-all-1.8.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/log4j-1.2.17.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/netty-3.6.2.Final.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/paranamer-2.3.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/protobuf-java-2.5.0.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/snappy-java-1.0.4.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib/xz-1.0.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-app-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-plugins-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.1-tests.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-client-shuffle-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/lib-examples:/usr/local/hadoop-2.7.1/share/hadoop/mapreduce/sources:/usr/local/hadoop-2.7.1/contrib/capacity-scheduler/*.jar:/usr/local/hive/lib/*’ -Djava.library.path=:/usr/local/hadoop-2.7.1/lib org.apache.flume.node.Application –conf-file /usr/local/flume/conf/seq_gen.conf –name SeqGenAgent

    SLF4J: Class path contains multiple SLF4J bindings.

    SLF4J: Found binding in [jar:file:/usr/local/flume/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

    SLF4J: Found binding in [jar:file:/usr/local/hive/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

    SLF4J: Found binding in [jar:file:/usr/local/hive/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class]

    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

    No response after this line once i run the below command. Could you please help.

    hduser@kumar:/usr/local/flume/bin$ ./flume-ng agent –conf /usr/local/flume/conf –conf-file /usr/local/flume/conf/seq_gen.conf –name SeqGenAgent


Review Comments
default image

I have attended Siva’s Spark and Scala training. He is good in presentation skills and explaining technical concepts easily to everyone in the group. He is having excellent real time experience and provided enough use cases to understand each concepts. Duration of the course and time management is awesome. Happy that I found a right person on time to learn Spark. Thanks Siva!!!

Dharmeswaran ETL / Hadoop Developer Spark Nov 2016 September 21, 2017

.