1. Flume介紹
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
-
agent
agent本身是一個Java進程,運行在日志收集節點—所謂日志收集節點就是服務器節點。
agent里面包含3個核心的組件:source—->channel—–>sink,類似生產者、倉庫、消費者的架構。 -
source
source組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。 -
channel
source組件把數據收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對采集到的數據進行簡單的緩存,可以存放在memory、jdbc、file等等。 -
sink
sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。 -
event
將傳輸的數據進行封裝,是flume傳輸數據的基本單位,如果是文本文件,通常是一行記錄,event也是事務的基本單位。event從source,流向channel,再到sink,本身為一個字節數組,並可攜帶headers(頭信息)信息。event代表着一個數據的最小完整單元,從外部數據源來,向外部的目的地去。
2. Kafka Channel && Kafka Sink
2.1 Kafka channel
Kafka channel可以應用在多樣的場景中:
- Flume source and sink:
可以為event提供一個高可靠性和高可用的channel; - Flume source and interceptor but no sink:
其他應用可以將Fluem event寫入kafka topic中; - With Flume sink, but no source:
提供低延遲、高容錯的方式將Fluem event從kafka中寫入其他sink,例如:HDFS,HBase或者Solr。
- Kafka Channel配置
加粗部分為必填屬性。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
kafka.topic | flume-channel | Kafka topic which the channel will use |
kafka.consumer.group.id | flume | Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss. |
parseAsFlumeEvent | true | Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact |
migrateZookeeperOffsets | true | When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled. |
pollTimeout | 500 | The amount of time(in milliseconds) to wait in the “poll()” call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) |
defaultPartitionId | – | Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class). |
partitionIdHeader | – | When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId. |
kafka.consumer.auto.offset.reset | latest | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer. |
kafka.producer.security.protocol | PLAINTEXT | Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
kafka.consumer.security.protocol | PLAINTEXT | Same as kafka.producer.security.protocol but for reading/consuming from Kafka. |
more producer/consumer security props | If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer. |
2.2 Kafka Sink
Flume 支持將數據發布到一個kafka topic。目前支持Kafka 0.9.x版本。
- KafkaSink 配置
加粗部分為必填配置
Property | Name | Default Description |
---|---|---|
type | – | Must be set to org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | – | List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
kafka.topic | default-flume-topic | The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named “header”. (If using the substitution, it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.) |
flumeBatchSize | 100 | How many messages to process in one batch. Larger batches improve throughput while adding latency. |
kafka.producer.acks | 1 | How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure. |
useFlumeEventFormat | false | By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side. |
defaultPartitionId | – | Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class). |
partitionIdHeader | – | When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId. |
allowTopicOverride | true | When set, the sink will allow a message to be produced into a topic specified by the topicHeader property (if provided). |
topicHeader | topic | When set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property. Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback. |
kafka.producer.security.protocol | PLAINTEXT | Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
more producer security props | If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer. | |
Other Kafka Producer Properties | – | These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer. For example: kafka.producer.linger.ms |
3. Flume - Kafka配置示例
切換到flume/conf目錄下,編輯配置文件:
agent.sources = s1
agent.channels = c1
agent.sinks = k1
# Source Config
agent.sources.s1.type = spooldir
agent.sources.s1.channels = c1
agent.sources.s1.bind = 192.168.100.105
agent.sources.s1.port = 9696
agent.sources.s1.includePattern = *.log
agent.sources.s1.spoolDir = /home/usr/tomcat-test/logs
# Sink Config
## 輸出到kafka
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.channel = c1
agent.sinks.s1.topic = test_tomcat_logs
agent.sinks.s1.serializer.class = kafka.serializer.StringEncoder
agent.sinks.s1.brokerList = 192.168.100.105:9092
# Channel Config
agent.channels.c1.type = memory
agent.channels.c1.keep-alive = 10
agent.channels.c1.capacity = 65535
很明顯,由配置文件可以了解到:
-
我們需要讀取目錄:
/home/usr/tomcat-test/logs
下日志文件; -
flume連接到kafka的地址是
192.168.100.105:9092
,注意不要配置出錯了; -
flume會將采集后的內容輸出到Kafka topic 為
test_tomcat_logs
,所以我們啟動zk,kafka后需要打開一個終端消費topic kafkatest的內容。這樣就可以看到flume與kafka之間開始工作了。
4. 運行
運行flume直接切換到flume目錄執行以下命令即可:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
參考資料:
[1] Flume Doc:
http://flume.apache.org/FlumeUserGuide.html#kafka-channel