一、硬件環境
假設有4台機,IP及主機名如下:
192.168.100.105 c1 192.168.100.110 c2 192.168.100.115 c3 192.168.100.120 c4
二、軟件環境
操作系統:Ubuntu Server 18.04
JDK:1.8.0
1.安裝JDK
https://www.cnblogs.com/live41/p/14235891.html
2.安裝ZooKeeper
https://www.cnblogs.com/live41/p/15522363.html
* 新版Kafka已內置了ZooKeeper,如果沒有其它大數據組件需要使用ZooKeeper的話,直接用內置的會更方便維護。
* 使用內置ZK的資料:
https://www.cnblogs.com/caoweixiong/p/11060533.html
三、搭建分布式Kafka
* 先登錄root賬號再進行以下操作
1.下載安裝包
http://kafka.apache.org/downloads
這里下載的是kafka_2.12-3.0.0.tgz。
* 以下步驟在每台機都要執行
2.上傳安裝包到服務器
假設安裝在home目錄
cd /home
rz
3.解壓
tar -xvf kafka_2.12-3.0.0.tgz mv kafka_2.12-3.0.0 kafka
4.配置系統環境變量
vim ~/.bashrc
添加以下內容:
export PATH=$PATH:/home/kafka/bin
保存退出后,更新環境變量:
source ~/.bashrc
5.編輯Kafka配置文件
cd /home/kafka/config
vim server.properties
添加以下內容:
vim server.properties broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 zookeeper.connect=c1:2181,c2:2181,c3:2181
其中0.0.0.0是同時監聽localhost(127.0.0.1)和內網IP(例如c1或192.168.100.105),建議改為localhost或c1或192.168.100.105。
每台機的broker.id要設置一個唯一的值,例如c1機是1、c2機是2、c3機是3、c4機是4,只要唯一即可,不一定要按順序。
6.啟動
先逐台機啟動ZooKeeper
zkServer.sh start
再逐台機啟動Kafka
kafka-server-start.sh -daemon home/kafka/config/server.properties
7.檢查
jps
會看到jps、QuorumPeerMain、Kafka
8.Kafka命令測試
#創建topic kafka-topics.sh --bootstrap-server c1:9092 --create --topic topic1 --partitions 8 --replication-factor 2 #列出所有topic kafka-topics.sh --bootstrap-server c1:9092 --list #列出所有topic的信息 kafka-topics.sh --bootstrap-server c1:9092 --describe #列出指定topic的信息 kafka-topics.sh --bootstrap-server c1:9092 --describe --topic topic1 #生產者(消息發送程序) kafka-console-producer.sh --broker-list c1:9092 --topic topic1 #消費者(消息接收程序) kafka-console-consumer.sh --bootstrap-server c1:9092 --topic topic1
其中,topic1是topic名,可自定義。
* 由於Apache開發團隊的版本升級原因,不同版本的命令會有所區別。
https://www.cnblogs.com/live41/p/15522207.html
9.Java代碼測試
(1) 配置maven
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
(2) 調用代碼
public class KafkaHandler { public static void main(String[] args) { try { // 先監聽,再發送消息 consume(); produce(); } catch (Exception e) { System.out.println(e); } } private static void produce() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "c1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); try { kafkaProducer.send(new ProducerRecord<String, String>("topic1", "這是測試文本")); } finally { kafkaProducer.close(); } } private static void consume() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "c1:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); try { consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
10.停止
kafka-server-stop.sh
附1:
1.acks參數
acks = 1 只保證leader保存成功,如果剛好leader掛了,數據丟失 acks = 0 使用異步模式,該模式下kafka無法保證消息,可能會丟失 acks = all 所有副本都寫入成功並確認
2.數據丟失問題的相關參數
acks = all 所有副本都寫入成功並確認 retries = n 重試次數,設置為3或以上 min.insync.replicas = 2 消息至少要被寫入到2個副本才算成功 unclean.leader.election.enable = false 關閉ubclean leader選舉,不允許非ISR中的副本被選舉為leader,防止數據不一致的情況
unclean.leader.election.enable參數的資料:
https://honeypps.com/mq/kafka-params-analysis-of-unclean-leader-election-enable/
附2:
利用腳本批量操作
https://www.cnblogs.com/live41/p/15636926.html