Kafka分布式消息系統


1.簡介

Kafka是一個分布式消息系統,使用Scala語言進行編寫,具有高水平擴展以及高吞吐量特性。

目前流行的消息隊列主要有三種:ActiveMQ、RabbitMQ、Kafka

ActiveMQ、RabbitMQ均支持AMQP協議,Kafka使用仿AMQP協議,目前Flume、Storm、Spark、Elasticsearch都支持與Kafka進行集成。

動態擴容:在不需停止服務的前提下動態的增加或減少節點,Kafka的動態擴容是通過zookeeper實現的,zookeeper上保存着kafka的相關狀態信息(topic、partition等)

 

2.關於AMQP

AMQP全稱 高級消息隊列協議,是一個統一消息服務的應用層協議,為面向消息的中間件所設計,基於此協議的客戶端與消息中間件可相互傳遞消息,並不受客戶端和中間件的產品以及開發語言不同所限制。

 

Producer負責把消息發送給Broker。

Broker負責接收並存儲Producer發送的消息。

Consumer負責從Broker中消費消息。

*Broker是消息隊列中最小的運行單元,一個Broker的運行就代表着一個Kafka實例。

 

3.Kafka的模型

 

1.Broker

Broker中可以包含多個Topic,每個Topic下又包含多個Partition。

一個Topic(主題)類似於新聞中的體育、娛樂、等分類概念,在實際開發中通常一個業務對應一個Topic。

一個Topic下由多個Partition組成,每個Partition都是一個First In First Out的隊列,用於存放Topic中的消息。

 

每個消息在Partition中都有一個offset(偏移量),是消息在分區中的唯一標識。

每個Consumer都需要維護一份自己的offset,用於記錄當前消費的進度,然后保存到Kafka當中(Consumer可以以任意的位置開始進行讀取,只需要設置offset即可)

在一個可配置的時間段內,Kafka集群將保留所有發送的消息,不管這些消息是否被被消費。

Kafka的分區是提高Kafka性能的關鍵手段,當Kafka集群的性能不高時,可以試着往topic中添加分區。

 

Kafka的分區備份

Topic下的每個Partition在Kafka集群中都有備份,在邏輯相關的一組Partition中,都有一個作為Leader,其余作為Follower,Leader和Follwer的選舉都是隨機的,當Follower接收到請求時首先會發送給Leader,由Leader負責消息的讀和寫並把消息同步給各個Follower,如果Leader所在的節點宕機,Follower中的一台則會自動成為Leader。

 

比如搭建一個Kafka集群,存在3個節點,同時設置Topic的分區數以及分區的備份數是3,現往Broker1中創建一個New Topic,那么在每個Broker實例中都會存在一個New Topic,同時每個New Topic下都會包含3個Partition,在邏輯相關的一組Partition中,都有一個作為Leader,其余作為Follower。

 

2.Producer

Producer向Broker中指定的Topic發送消息,消息將會根據負載均衡策略進入相應的Partition。

*Producer向Broker發送消息時,除了指定Topic以及Message以外,還可以指定一個Key,用於Partition的散列,Key相同的消息將會保存到同一個Partition當中。

 

3.Consumer組

 

Kafka提供了Consumer組的概念,一個Consumer組下可以包含多個Consumer。

Kafka規定,Topic下的每一個Partition都只能被Consumer組下的唯一一個Consumer進行消費,以確保消費的順序性,因此Consumer組下的Consumer數量不能超過Partition的數量,否則將會處於空閑狀態。


隊列模式

若所有的Consumer都在同一個Consumer組中下則成為隊列模式,Topic中的各個Partition都只能被組中的唯一一個Consumer進行消費,組下的Consumer共同競爭Topic中的Partition。

廣播模式

若所有的Consumer都不在同一個Consumer組中則成為廣播模式,Topic中各個Partition的消息都會廣播給所有的Consumer組。

 

4.Kafka的應用場景

1.解耦

比如存在一個應用A,它需要接收請求並且對請求進行處理,那么此時可以利用Kafka進行解耦,應用A只負責接收請求,同時將請求中的數據封裝為Message,然后保存在Kafka的Topic當中,后續由應用B來進行消費,以達到解耦的目的。

 

2.削流

如果有大量並發的寫請求直接去到數據庫,那么將會導致數據庫的奔潰(618/11.11),此時可以利用Kafka進行削流,將所有的寫請求封裝成Message,然后保存到Kafka的Topic當中,后續再通過Consumer以一定的速率進行消費(隊列模式)

 

3.通知

首先各個被通知者都消費Kafka中指定的一個Topic,當需要進行通知時,往Kafka中指定的Topic發送消息,那么此時所有的被通知者就能夠收到通知(廣播模式)

 

5.關於Kafka中消息的順序性

Kafka只能保證在同一個Partition中的消息是有序的,因為Kafka規定了Topic下的每個Partation都只能被Consumer組中的唯一一個Consumer進行消費,同時Kafka也沒有實現在一個Consumer中使用多線程進行消費,Partition之間的消息是不能夠保證有序的

Kafka是通過指定消息的Key來保證消息的順序性的,因為Kafka會對Key進行散列,Key相同的消息將會保存在同一個Partation當中,因此當消息需要有序時可以通過指定相同的Key放入到同一個Partation當中。

如果需要在一個Consumer中使用多線程去消費Partation中的消息,那么需要自己實現,可以把Consumer拉取過來的消息全部提交到線程池中進行處理,那么消費的速度完全取決於線程池中線程的個數。

 

6.Kafka的使用

 

1.安裝

由於Kafka使用scala語言編寫,scale語言運行在JVM中,因此需要先安裝JDK並且配置好環境變量。

由於Kafka中的狀態信息都保存在zk上,雖然Kafka自帶zk,但一般是使用外置的zk集群,因此需要先安裝zk服務並且配置好zk集群關系。

從Kafka官網中下載安裝包並進行解壓。

 

2.配置文件

config目錄是Kafka配置文件的存放目錄

Broker端配置

Kafka在啟動時需要連接ZK,共同連接同一個ZK集群的Kafka自動構成集群關系(broker.id在集群中不能重復)

Kafka中的消息是保存到磁盤的(log.dirs目錄下),每個Topic下的Partition都對應log.dirs中的一個目錄(topic-num),每個Partition目錄下都有log文件用於存放消息,當Partition有新的消息時會往該log文件后進行追加。

如果創建的Topic其備份數大於1 ,那么在Kafka集群備份數個Broker中也會創建此Topic,因此在其log.dirs目錄下也會存在該topic的目錄。

 

Producer端配置

 

Consumer端配置

 

3.啟動Kafka

 

1.啟動zk集群

#啟動zk節點
zkServer.sh start
#查看節點角色
zkServer.sh status 

 

2.啟動Kafka集群

./kafka-server-start.sh -daemon ../config/server.properties

 

3.創建Topic

1.創建名為chat的Topic,Topic的分區數以及備份數都為3。

./kafka-topic.sh --create --zookeeper 192.168.1.80:2181,192.168.1.80:2182,192.168.1.80:2183 --partitions 3 --replication-factor 3 --topic "chat"

創建Topic時需指定ZK服務地址,ZK中保存了Topic的分區數以及備份數(元數據),Kafka集群中的其他節點再從ZK服務中獲取Topic的元數據來創建Topic。

 

2.查看各個broker中的log.dirs,可見在該目錄下都生成了chat-0、chat-1、chat2分別表示chat Topic的第一個、第二個、第三個partation,每個partation中都有.log文件用於存放Partation中的消息。

 

3.查看Kafka集群中chat Topic下各個Partation的狀態

./kafka-topic.sh --describe --zookeeper 192.168.1.80:2181 --topic chat

Leader:充當Leader的Broker節點(broker.id)

Replicas:存在備份的Broker節點(broker.id,不管節點是否存活)

Isr:存在備份的同時存活的Broker節點。

 

4.Producer發送消息

往Kafka集群中的chat主題發送消息

./kafka-console-producer.sh --broker-list 192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092 --topic chat

如果不指定Key,那么消息將會根據負載均衡策略進入相應的Partation。

 

5.Consumer消費消息

啟動Consumer

./kafka-console-consumer.sh --bootstrap-server 192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092 --topic chat --from-beginning

由於使用腳本文件啟動Consumer並沒有指定使用的配置文件,所以三個Consumer都不在同一個Consumer組中,因此三個Consumer都能夠消費chat主題下各個Partation中的消息。


啟動Consumer並指定配置文件

./kafka-console-consumer.sh --bootstrap-server 192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092 --topic chat --from-beginning --consumer.config ../config/consumer.properties

啟動了3個Consumer並指定使用的配置文件,默認的group.id為test-consumer-group,因此這3個Consumer都在同一個Consumer組下,Topic中各個Partation僅能被組下的唯一一個Consumer進行消費。

在啟動第一個Consumer時,Consumer組下只有一個Consumer,因此消息都會被此Consumer進行消費,當往Consumer組中添加新的Consumer時,將會重新分配擁有Partation的權利。

 

7.JAVA中使用Kafka

 

1.導入依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>0.11.0.1</version>
</dependency>

 

2.創建Topic

ZkUtils zkUtils = ZkUtils.apply("192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 創建一個名為chat的主題其包含2個分區,備份數是3
AdminUtils.createTopic(zkUtils, "chat", 2, 3, new Properties(), RackAwareMode.Enforced$.MODULE$);
zkUtils.close();

 

3.Producer發送消息

//創建Properties對象用於封裝配置項
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); 
props.put("acks", "all"); 
props.put("retries", 0); 
props.put("batch.size", 16384); 
props.put("linger.ms", 1); 
props.put("buffer.memory", 33554432); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
//創建消息實體ProducerRecord,並指定消息上傳的topic、消息的Key、消息的Value
ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic","key","value");
//發送消息
producer.send(record);
//關閉連接
producer.close();

KafkaProducer是線程安全的,在線程之間可以共享單個生產者實例。

send()方法是異步的,一旦消息被保存在待發送緩沖區時此方法就立即返回,其返回Future<RecordMetadata>實例,當調用該實例的get()方法時將會阻塞直到服務器對請求進行應答(阻塞時長跟acks配置項有關),當服務器處理異常時將拋出異常。

消息由Key和Value組成,Key相同的Message將會保存在同一個Partation當中(根據Key進行散列)

 

4.Consumer消費消息

//創建Properties對象用於封裝配置項
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092");
props.put("group.id", "consumerA");
//自動提交Consumer的偏移量給Kafka服務
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱主題,一個消費者實例可以訂閱多個主題
consumer.subscribe(Arrays.asList("chat", "hello"));
//接收數據,消息存放在ConsumerRecords消息集合中
ConsumerRecords<String, String> records = consumer.poll(1000*5);
//遍歷消費端消息集合獲取ConsumerRecord消費端消息實體,一個消費端消息實體包含偏移量、消息Key值、消息Value值
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

poll(long blockTime)方法用於接收topic中的消息,當沒有消息時將會等待blockTime的時間 (單位:毫秒),執行結果需結合auto.offset.reset配置項。

使用commitSync()方法可以手動同步消費者的偏移量給Kafka 。

使用seek(TopicPartition , long)方法手動設置消費者的偏移量。

 

8.Spring Kafka

 

1.導入依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.0.RELEASE</version>
</dependency>

 

2.創建Kafka Producer配置類

@Configuration
@EnableKafka
public class KafkaProducerConfiguration {

    /**
     * Producer配置
     */
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * Producer工廠
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    /**
     * KafkaTemplate
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

3.創建Kafka Consumer配置類

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    /**
     * Consumer配置
     */
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "GroupA");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     * Consumer工廠
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    /**
     * Kafka監聽器
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

 

4.Producer發送消息 

@Component
public class Producer {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    public void send() throws Exception {
        kafkaTemplate.send("topic", "key","value");
    }
}

 

5.Consumer消費消息

@Component
public class Consumer {

    @KafkaListener(groupId="GroupA",topics = "topic",concurrency="3")
    public void consumer(String value) {
             System.out.println(String.format("消費消息,value:%s", value));
    }
}

@KafkaListener注解中的concurrency數量為開啟的Consumer數量,也就是在Consumer組下存在多少個Consumer。

 

9.使用Kafka要解決的問題

 

1.Consumer端消息丟失

當Consumer消費消息后,自動提交了offset,如果后續程序處理出錯,那么消息將會丟失,此時可以通過手動提交offset的方式進行解決。

 

2.重復消費

當Consumer讀取消息后,程序也成功進行處理,如果手動提交offset時出錯,則會導致重復消費,同時如果Producer重復發送消息也會導致重復消費,當發生重復消費時只需要保證冪等性即可(多次執行的結果保持一致)

 

如何保證冪等性

1.每次保存時,都先從數據庫查一次,如果數據已存在則表示重復消費(針對並發不大同時實時性不高的場景)

2.數據庫表添加唯一約束,當重復消費時將會插入失敗(針對沒有分庫分表的場景)

3.添加消息表並在字段上加上唯一約束,每當消費完一條消息就往表里插入一條記錄,當重復消息時將會插入失敗(針對有分庫分表的場景)

4.每次保存時,都先從Redis中查一次,如果Redis中已存在則表示重復消費(不太靠譜,除非過期時間設置很久)

*如果並發很高,需要借助Redis或者Zookeeper通過分布式鎖來進行控制(比如Producer發送了2條相同的消息,如果沒有指定Key,假設這兩條消息分別坐落在不同的Partation當中,然后剛好被兩個Consumer線程同時消費,此時就存在同步問題,需要通過分布式鎖來進行控制)

 

3.Broker端消息丟失

當Partition中的Leader重選舉時,也就是說Leader掛了,那么有可能導致消息未來得及同步給其他的Follower,最終導致消息丟失,此時只需要在Producer中設置acks等於all,那么Producer必須等待Leader將消息同步給所有的Follower后再進行返回。

 

4.消息堆積

消息堆積主要是因為Consumer消費的速度太慢了,可以通過為Topic新增Partation同時新增Consumer來進行消費從而提高消費的速度,或者將Consumer拉取的數據放入到線程池中進行處理,那么消費的速度就取決於線程池中線程的數量(要注意內存溢出),但是就不能通過Kafka監控工具中來判斷是否存在消息堆積的現象了

 

5.消費失敗

當消費失敗時,可以將消息放入到一個隊列當中,比如使用Redis的list結構,后續專門有一個線程來處理消費失敗的消息(定時任務)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM