Flume Data Collection into HBase 5

We will discuss about collection of data into HBase directly through flume agent. In our previous posts under flume category, we have covered setup of flume agents for file roll, logger and HDFS sink types. In this, we are going to explore the details of HBase sink and its setup with live example.

As we have already covered File channel Memory channel and JDBC Channel, so we will try to make use of Spillable memory channel in this agent setup to cover the usage of all flume supported channels. This does not mean that HBase sink needs only spillable memory channel and it equally works well with other channel types as well.

Even in source types, we have already covered Netcat Source, Exec Source, Avro Source, and Sequence Generator Source types, so we will try to explain one more source (Spooling directory source) in this agent setup.

Now lets create our agent Agent5 in flume.conf properties file under <FLUME_HOME/conf> directory.

Flume data collection into Hbase – Spooling Directory Source, HBase Sink and Spillable Memory channel:

flume.conf file creation:

Add the below configuration properties in flume.conf file to create Agent5 with Spooling Directory source, spillable memory channel and HBase Sink.

Configuration Before Agent Start up:

  • Before starting this agent, we need to make sure below things are ready.
  • Start Hadoop and Yarn daemons. Also start Hbase daemons. make sure all the daemons are started running properly otherwise we will enter into hell lot of issues. For any assistance of Hadoop installation and running daemons we can refer to our previous posts under Hadoop category and for the same on Hbase, refer to hbase installation post. Below commands will be helpful for performing these activities.

  • In Hbase, Create the table with column family specified in flume.conf file.

Below is the screen shot of terminal for creation of hbase table through hbase shell after starting all daemons. In our agent, test_table and test_cf are table and column families respectively.

Hbase Table creation

  • Create the folder specified  for spooling directory path, and make sure that flume user should have read+write+execute access to that folder. In our agent, it is /usr/lib/flume/spooldir directory.

  • We will copy our input files into spool directory,  from which flume will write each line as a new row into Hbase table. We will copy the input file wordcount.hql into spooling directory and below are the contents of wordcount.hql file.

And also we will copy one more input file id.pig into spooling directory and below are the contents it.

So there are total of 10 (7+3) lines of input from two files. And each line will be treated as one event in flume.

Below is the snapshot of spool directory before starting the agent:

spooldir in

  • Make sure that flume user has full accesses to the directories mentioned in <checkpointDir> and <dataDirs> specified in spillable memory channel configuration.
Start Agent :

After confirming that all the above configuration is successful and there are no issues then we are ready to start the agent otherwise we will end up in undesired error messages or java exceptions.

Start the agent with below command.

Flume Agent5

After a few seconds (as there are only two small input files, a few seconds will be sufficient to copy into Hbase) stop the agent by pressing ctrl+c key.

Verify the Output:

We can verify this agent process both at source level and at sink level as well.

Verification at source:

If we verify the spool directory after the agent is started and stopped, then the files which were placed as input to copy events into hbase will be renamed to .COMPLETED files, once all the events in a file are successfully copied into channel. Thus we can see all the files replaced with their corresponding .COMPLETED files. Below is the snapshot of spool directory after agent is stopped.

spooldir out

In the above screen we can see id.pig.COMPLETED and wordcount.hql.COMPLETED files created in place of original input files.

Verification at sink:

We can verify the events at sink level by connecting to hbase shell and scanning test_table contents.

And we can see all the 10 lines of input inserted into pCol column of test_cf column family in test_table in the below screen shot.

Hbase sink op

As all the 10 input lines are inserted as 10 rows into our destination table at HBase, We successfully configured the flume agent to capture data onto HBase sink.

In below sections we will go into detailed descriptions of each component used to configure our above flume agent.

Component Details:

Spooling Directory Source:

In Spooling directory source, a spool directory will be configured in which users/applications are allowed to place their files, which need to be processed by flume agent. This source will watch the specified directory for new files, and will parse events out of new files as they appear. Once a given file is successfully read into the channel, it is either renamed to .COMPLETED file or deleted.

Only uniquely-named files must be dropped into the spooling directory. This source will report problems and stop processing in below scenarios.

  1. If a file is opened to write contents into it after it is placed into the spooling directory.
  2. If a file name is reused at a later time.

To avoid the above issues, it would be useful to add a unique identifier such as a timestamp to log file names when they are moved into the spooling directory.

Below is the property table for spooling directory source and required properties are in bold.

Property Name Default Description
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader FALSE Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader FALSE Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
ignorePattern ^$ Regular expression specifying which files to ignore (skip)
trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder oldest In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldestand youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest laxicographical order will be consumed first. In case ofrandom any file will be picked randomly.
maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full.
batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
decodeErrorPolicy FAIL What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.
deserializer LINE Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder.

Spillable Memory Channel:

Spillable Memory Channel can be treated as a combination of memory channel & file channel. It is introduced to overcome the limitations of memory channel of losing events when memory queue is filled. It uses below two storage mechanisms.

  • In-memory queue
  • disk

It stores events primarily in an in-memory queue and once the queue is filled, additional incoming events are stored on a disk backed up by the file channel.

This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages

Below is the property table for spillable memory channel and required properties are in bold.

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000 Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
overflowCapacity 100000000 Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel Use ‘overflowCapacity’ to set the File channel’s 
  • To disable the use of the in-memory queue and function like a file channel, we can set the property  memoryCapacity = 0 and need to provide  overflowCapacity, checkpointDir &  dataDirs properties of channel.
  • To disable the use of overflow disk and function as a in-memory channel, we can set the property  overflowCapacity = 0  and can omit  checkpointDir &  dataDirs properties but need to specify memoryCapacity value to non-zero value.

HBase Sink:

This sink reads events from a channel and writes them to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase.

This sink supports batch reading of events from the channel, to minimize the number

of flushes on the hbase tables. To use this sink, it has to be configured with certain mandatory parameters:

  • table – The name of the table in Hbase to write to.
  • columnFamily: The column family in Hbase to write to.

This sink will commit each transaction if the table’s write buffer size is reached or if the number of events in the current transaction reaches the batch size, whichever comes first.

Flume provides two serializers for HBase sink. The SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. The RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.

Below is the property table for HBase sink and required properties are in bold.

Property Name Default Description
type The component type name, needs to be hbase
table The name of the table in Hbase to write to.
columnFamily The column family in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the propertyhbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value ofzookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch.
serializer see description org.apache.flume.sink.hbase.SimpleHbaseEventSerializer. Default increment column = “iCol”, payload column = “pCol”.

About Siva

Senior Hadoop developer with 4 years of experience in designing and architecture solutions for the Big Data domain and has been involved with several complex engagements. Technical strengths include Hadoop, YARN, Mapreduce, Hive, Sqoop, Flume, Pig, HBase, Phoenix, Oozie, Falcon, Kafka, Storm, Spark, MySQL and Java.

Leave a comment

Your email address will not be published. Required fields are marked *

5 thoughts on “Flume Data Collection into HBase

Review Comments
default image

I have attended Siva’s Spark and Scala training. He is good in presentation skills and explaining technical concepts easily to everyone in the group. He is having excellent real time experience and provided enough use cases to understand each concepts. Duration of the course and time management is awesome. Happy that I found a right person on time to learn Spark. Thanks Siva!!!

Dharmeswaran ETL / Hadoop Developer Spark Nov 2016 September 21, 2017