Architecture:
Data Flow model:
A Flume event is defined as
a unit of data flow having a byte payload and an optional set of string
attributes. A Flume agent is a (JVM) process that hosts the components through
which events flow from an external source to the next destination (hop).
A Flume source consumes
events delivered to it by an external source like a web server. The external
source sends events to Flume in a format that is recognized by the target Flume
source. For example, an Avro Flume source can be used to receive Avro events
from Avro clients or other Flume agents in the flow that send events from an
Avro sink. A similar flow can be defined using a Thrift Flume Source to receive
events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients
written in any language generated from the Flume thrift protocol. When a Flume
source receives an event, it stores it into one or more channels. The channel
is a passive store that keeps the event until it’s consumed by a Flume sink.
The file channel is one example – it is backed by the local filesystem. The
sink removes the event from the channel and puts it into an external repository
like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next
Flume agent (next hop) in the flow. The source and sink within the given agent
run asynchronously with the events staged in the channel.
Install Flume:
yum install mapr-flume
Configuration File:
# the channels and the sinks.
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /opt/mapr/logs/configure.log
# Describe sink1
agent1.sinks.sink1.type = hdfs
agent.sinks.sink-1.hdfs.kerberosPrincipal = kuser1
agent.sinks.sink-1.hdfs.kerberosKeytab = /tmp/krb5new_0.keytab
agent1.sinks.sink1.hdfs.path = /user/root/flume/logtest4_kerberosTKT/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.sink1.hdfs.rollInterval = 6
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.sink1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)
agent1.sinks.sink1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS
agent1.sinks.sink1.hdfs.batchSize = 10000
agent1.sinks.sink1.hdfs.txnEventMax = 40000
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy
# hdfs.codeC = gzip
#format: currently SequenceFile, DataStream or CompressedStream #(1)DataStream will not compress output file and please don't set codeC
#(2)CompressedStream requires set hdfs.codeC with an available codeC agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.maxOpenFiles=50
# -- "Text" or "Writable"
#hdfs.writeFormat
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
agent1.sinks.sink1.hdfs.threadsPoolSize=100
# Number of threads per HDFS sink for scheduling timed file rolling
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using
# hdfs.roundUnit), less than current time.
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
# serializer.*
# Use a channel which buffers events to a file
# -- The component type name, needs to be FILE.
agent1.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored
# The maximum size of transaction supported by the channel
agent1.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel
agent1.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation write-timeout 3 Amount of time (in sec) to wait for a write operation
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1