Kafka的安裝與部署


一、硬件環境

假設有4台機,IP及主機名如下:

192.168.100.105 c1
192.168.100.110 c2
192.168.100.115 c3
192.168.100.120 c4

 

二、軟件環境

操作系統:Ubuntu Server 18.04

JDK:1.8.0

1.安裝JDK

https://www.cnblogs.com/live41/p/14235891.html

 

2.安裝ZooKeeper

https://www.cnblogs.com/live41/p/15522363.html

* 新版Kafka已內置了ZooKeeper,如果沒有其它大數據組件需要使用ZooKeeper的話,直接用內置的會更方便維護。

* 使用內置ZK的資料:

https://www.cnblogs.com/caoweixiong/p/11060533.html

 

三、搭建分布式Kafka

* 先登錄root賬號再進行以下操作

1.下載安裝包

http://kafka.apache.org/downloads

這里下載的是kafka_2.12-3.0.0.tgz。

 

* 以下步驟在每台機都要執行

2.上傳安裝包到服務器

假設安裝在home目錄

cd /home
rz

 

3.解壓

tar -xvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 kafka

 

4.配置系統環境變量

vim ~/.bashrc

添加以下內容:

export PATH=$PATH:/home/kafka/bin

保存退出后,更新環境變量:

source ~/.bashrc

 

5.編輯Kafka配置文件

cd /home/kafka/config
vim server.properties

添加以下內容:

vim server.properties
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
zookeeper.connect=c1:2181,c2:2181,c3:2181

其中0.0.0.0是同時監聽localhost(127.0.0.1)和內網IP(例如c1或192.168.100.105),建議改為localhost或c1或192.168.100.105。

每台機的broker.id要設置一個唯一的值,例如c1機是1、c2機是2、c3機是3、c4機是4,只要唯一即可,不一定要按順序。

 

6.啟動

先逐台機啟動ZooKeeper

zkServer.sh start

再逐台機啟動Kafka

kafka-server-start.sh -daemon home/kafka/config/server.properties

 

7.檢查

jps

會看到jps、QuorumPeerMain、Kafka

 

8.Kafka命令測試

#創建topic
kafka-topics.sh --bootstrap-server c1:9092 --create --topic topic1 --partitions 8 --replication-factor 2

#列出所有topic
kafka-topics.sh --bootstrap-server c1:9092 --list

#列出所有topic的信息
kafka-topics.sh --bootstrap-server c1:9092 --describe

#列出指定topic的信息
kafka-topics.sh --bootstrap-server c1:9092 --describe --topic topic1

#生產者(消息發送程序)
kafka-console-producer.sh --broker-list c1:9092 --topic topic1

#消費者(消息接收程序)
kafka-console-consumer.sh --bootstrap-server c1:9092 --topic topic1

其中,topic1是topic名,可自定義。

* 由於Apache開發團隊的版本升級原因,不同版本的命令會有所區別。

https://www.cnblogs.com/live41/p/15522207.html

 

9.Java代碼測試

(1) 配置maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

 

(2) 調用代碼

public class KafkaHandler
{
    public static void main(String[] args)
    {
        try
        {
            // 先監聽,再發送消息
            consume();
            produce();
        }
        catch (Exception e)
        {
            System.out.println(e);
        }
    }

    private static void produce() throws Exception
    {
        Properties props = new Properties();
        props.put("bootstrap.servers", "c1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        try
        {
            kafkaProducer.send(new ProducerRecord<String, String>("topic1", "這是測試文本"));
        }
        finally
        {
            kafkaProducer.close();
        }
    }

    private static void consume() throws Exception
    {
        Properties props = new Properties();
        props.put("bootstrap.servers", "c1:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        try
        {
            consumer.subscribe(Arrays.asList("topic1"));
            while (true)
            {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records)
                {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
        finally
        {
            consumer.close();
        }
    }
}

 

10.停止

kafka-server-stop.sh

 

 

附1:

1.acks參數

acks = 1    只保證leader保存成功,如果剛好leader掛了,數據丟失
acks = 0    使用異步模式,該模式下kafka無法保證消息,可能會丟失
acks = all  所有副本都寫入成功並確認

 

2.數據丟失問題的相關參數

acks = all   所有副本都寫入成功並確認
retries = n  重試次數,設置為3或以上
min.insync.replicas = 2  消息至少要被寫入到2個副本才算成功
unclean.leader.election.enable = false  關閉ubclean leader選舉,不允許非ISR中的副本被選舉為leader,防止數據不一致的情況

unclean.leader.election.enable參數的資料:

https://honeypps.com/mq/kafka-params-analysis-of-unclean-leader-election-enable/

 

附2:

利用腳本批量操作

https://www.cnblogs.com/live41/p/15636926.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM