分布式消息緩存Kafka
1、消息中間件:生產者和消費者 生產者、消費者、數據流(消息)
- 發布和訂閱消息
- 容錯存儲消息記錄
- 處理流數據
Kafka架構:
procedure:生產者
consumer:消費者
broker:容錯存儲
topic:分類主題、標簽
consumer group:一個consumer最多消費一個分區的數據 consumer數量=partitions
磁盤順序讀寫,省掉尋道時間,提高性能
零字節拷貝:內核空間和用戶空間不直接拷貝、SendFile
/opt/bigdata/kafka_2.11-1.0.0/kafka-log2s/logkafka-0/00000000000000000000.index index的序號就是message在日志文件中的相對offset(偏移量)
offsetIndex是稀疏索引,先根據offset找到對應log文件,計算 offset - (log文件第一個offset -1) 得到相對索引,再到index文件找到消息。如果index找不到,則取最近的,再去log文件對應位置向下查找
ack: 0 :不等待broker返回確認消息,無阻塞
1 :partitions 中的leader 保存成功
-1: partitions 中的leader和follower都成功
啟動ZK:
啟動Kafka:kafkaStart.sh
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server0.properties &
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server1.properties &
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server2.properties &
創建Topic:
kafka-topics.sh --create --zookeeper bigdata:2181,bigdata:2182,bigdata:2183 --replication-factor 1 --partitions 1 --topic logkafka
--partitions 可以提高消費並發
查看Topic:
kafka-topics.sh --list --zookeeper bigdata:2181
kafka-topics.sh --describe --zookeeper bigdata:2181 --topic test (指定Topic,否則查看所有topic的詳細信息)
發送消息:
kafka-console-producer.sh --broker-list localhost:9092 --topic logkafka
接受消息:
kafka-console-consumer.sh --zookeeper bigdata:2181 --topic logkafka --from-beginning (--from-beginning . 是否從頭開始消費消息)
停止Kafka:kafkaStop.sh
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server0.properties &
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server1.properties &
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server2.properties &
兩種方式連接Kafka:簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據
Receiver:
1、Kafka中topic的partition與Spark中RDD的partition是沒有關系的,因此,在KafkaUtils.createStream()中,提高partition的數量,只會增加Receiver的數量,也就是讀取Kafka中topic partition的線程數量,不會增加Spark處理數據的並行度。
2、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver並行接收數據。
3、如果基於容錯的文件系統,比如HDFS,啟用了預寫日志機制,接收到的數據都會被復制一份到預寫日志中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
Direct:
1、簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream,然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2、高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
3、一次且僅一次的事務機制:基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。由於數據消費偏移量是保存在checkpoint中,因此,如果后續想使用kafka高級API消費數據,需要手動的更新zookeeper中的偏移量
2、API操作
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
Scala版 Producer :
package com.kafka
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object producer {
def main(args: Array[String]): Unit = {
// 傳參
if (args.length < 4){
System.err.println("Usage: producer <metadataBrokerList> <topics> <messageSec> <wordsPerMessage>")
System.exit(1)
}
val Array(brokers, topics, messageSec, wordsPerMessage) = args
// ZK 配置
val zkProps = new HashMap[String, Object]()
zkProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
zkProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
zkProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Kafka Producer
val producer = new KafkaProducer[String, String](zkProps)
var i = 0
for ( i <- 1 to 10) {
(1 to messageSec.toInt).foreach { messageNum =>
val msg = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
val msgs = new ProducerRecord[String, String](topics, null, msg)
producer.send(msgs)
}
Thread.sleep(100)
}
}
}
3、整合Flume:
conf1:exec-mem-avro.conf
# Name the components on this agent a1.sources = exec-source a1.channels = memory-channel a1.sinks = avro-sink # configure for sources a1.sources.exec-source.type = exec a1.sources.exec-source.command = tail -F /opt/datas/log-collect-system/log_server.log # configure for channels a1.channels.memory-channel.type = memory a1.channels.memory-channel.capacity = 1000 a1.channels.memory-channel.transactionCapacity = 100 # configure for sinks a1.sinks.avro-sink.type = avro a1.sinks.avro-sink.hostname = localhost a1.sinks.avro-sink.port = 44444 # configure a1.sinks.avro-sink.channel = memory-channel a1.sources.exec-source.channels = memory-channel
Kafka conf:exec-memory-kafka.cnf
# Name the components on this agent a1.sources = avro-source a1.channels = memory-channel a1.sinks = logger-sink # configure for sources a1.sources.avro-source.type = avro a1.sources.avro-source.bind = localhost a1.sources.avro-source.port = 44444 # configure for channels a1.channels.memory-channel.type = memory a1.channels.memory-channel.capacity = 1000 a1.channels.memory-channel.transactionCapacity = 100 # configure for sinks a1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink #a1.sinks.kafka-sink.bootstrap.servers = bigdata:9092,bigdata:9093,bigdata:9094 a1.sinks.kafka-sink.brokerList = bigdata:9092,bigdata:9093,bigdata:9094 a1.sinks.kafka-sink.topic = logkafka # configure a1.sinks.kafka-sink.channel = memory-channel a1.sources.avro-source.channels = memory-channel
