1.簡介
Kafka是一個分布式消息系統,使用Scala語言進行編寫,具有高水平擴展以及高吞吐量特性。
目前流行的消息隊列主要有三種:ActiveMQ、RabbitMQ、Kafka
ActiveMQ、RabbitMQ均支持AMQP協議,Kafka使用仿AMQP協議,目前Flume、Storm、Spark、Elasticsearch都支持與Kafka進行集成。
動態擴容:在不需停止服務的前提下動態的增加或減少節點,Kafka的動態擴容是通過zookeeper實現的,zookeeper上保存着kafka的相關狀態信息(topic、partition等)
2.關於AMQP
AMQP全稱 高級消息隊列協議,是一個統一消息服務的應用層協議,為面向消息的中間件所設計,基於此協議的客戶端與消息中間件可相互傳遞消息,並不受客戶端和中間件的產品以及開發語言不同所限制。
Producer負責把消息發送給Broker。
Broker負責接收並存儲Producer發送的消息。
Consumer負責從Broker中消費消息。
*Broker是消息隊列中最小的運行單元,一個Broker的運行就代表着一個Kafka實例。
3.Kafka的模型
1.Broker
Broker中可以包含多個Topic,每個Topic下又包含多個Partition。
一個Topic(主題)類似於新聞中的體育、娛樂、等分類概念,在實際開發中通常一個業務對應一個Topic。
一個Topic下由多個Partition組成,每個Partition都是一個First In First Out的隊列,用於存放Topic中的消息。
每個消息在Partition中都有一個offset(偏移量),是消息在分區中的唯一標識。
每個Consumer都需要維護一份自己的offset,用於記錄當前消費的進度,然后保存到Kafka當中(Consumer可以以任意的位置開始進行讀取,只需要設置offset即可)
在一個可配置的時間段內,Kafka集群將保留所有發送的消息,不管這些消息是否被被消費。
Kafka的分區是提高Kafka性能的關鍵手段,當Kafka集群的性能不高時,可以試着往topic中添加分區。
Kafka的分區備份
Topic下的每個Partition在Kafka集群中都有備份,在邏輯相關的一組Partition中,都有一個作為Leader,其余作為Follower,Leader和Follwer的選舉都是隨機的,當Follower接收到請求時首先會發送給Leader,由Leader負責消息的讀和寫並把消息同步給各個Follower,如果Leader所在的節點宕機,Follower中的一台則會自動成為Leader。
比如搭建一個Kafka集群,存在3個節點,同時設置Topic的分區數以及分區的備份數是3,現往Broker1中創建一個New Topic,那么在每個Broker實例中都會存在一個New Topic,同時每個New Topic下都會包含3個Partition,在邏輯相關的一組Partition中,都有一個作為Leader,其余作為Follower。
2.Producer
Producer向Broker中指定的Topic發送消息,消息將會根據負載均衡策略進入相應的Partition。
*Producer向Broker發送消息時,除了指定Topic以及Message以外,還可以指定一個Key,用於Partition的散列,Key相同的消息將會保存到同一個Partition當中。
3.Consumer組
Kafka提供了Consumer組的概念,一個Consumer組下可以包含多個Consumer。
Kafka規定,Topic下的每一個Partition都只能被Consumer組下的唯一一個Consumer進行消費,以確保消費的順序性,因此Consumer組下的Consumer數量不能超過Partition的數量,否則將會處於空閑狀態。
隊列模式
若所有的Consumer都在同一個Consumer組中下則成為隊列模式,Topic中的各個Partition都只能被組中的唯一一個Consumer進行消費,組下的Consumer共同競爭Topic中的Partition。
廣播模式
若所有的Consumer都不在同一個Consumer組中則成為廣播模式,Topic中各個Partition的消息都會廣播給所有的Consumer組。
4.Kafka的應用場景
1.解耦
比如存在一個應用A,它需要接收請求並且對請求進行處理,那么此時可以利用Kafka進行解耦,應用A只負責接收請求,同時將請求中的數據封裝為Message,然后保存在Kafka的Topic當中,后續由應用B來進行消費,以達到解耦的目的。
2.削流
如果有大量並發的寫請求直接去到數據庫,那么將會導致數據庫的奔潰(618/11.11),此時可以利用Kafka進行削流,將所有的寫請求封裝成Message,然后保存到Kafka的Topic當中,后續再通過Consumer以一定的速率進行消費(隊列模式)
3.通知
首先各個被通知者都消費Kafka中指定的一個Topic,當需要進行通知時,往Kafka中指定的Topic發送消息,那么此時所有的被通知者就能夠收到通知(廣播模式)
5.關於Kafka中消息的順序性
Kafka只能保證在同一個Partition中的消息是有序的,因為Kafka規定了Topic下的每個Partation都只能被Consumer組中的唯一一個Consumer進行消費,同時Kafka也沒有實現在一個Consumer中使用多線程進行消費,Partition之間的消息是不能夠保證有序的
Kafka是通過指定消息的Key來保證消息的順序性的,因為Kafka會對Key進行散列,Key相同的消息將會保存在同一個Partation當中,因此當消息需要有序時可以通過指定相同的Key放入到同一個Partation當中。
如果需要在一個Consumer中使用多線程去消費Partation中的消息,那么需要自己實現,可以把Consumer拉取過來的消息全部提交到線程池中進行處理,那么消費的速度完全取決於線程池中線程的個數。
6.Kafka的使用
1.安裝
由於Kafka使用scala語言編寫,scale語言運行在JVM中,因此需要先安裝JDK並且配置好環境變量。
由於Kafka中的狀態信息都保存在zk上,雖然Kafka自帶zk,但一般是使用外置的zk集群,因此需要先安裝zk服務並且配置好zk集群關系。
從Kafka官網中下載安裝包並進行解壓。
2.配置文件
config目錄是Kafka配置文件的存放目錄
Broker端配置
Kafka在啟動時需要連接ZK,共同連接同一個ZK集群的Kafka自動構成集群關系(broker.id在集群中不能重復)
Kafka中的消息是保存到磁盤的(log.dirs目錄下),每個Topic下的Partition都對應log.dirs中的一個目錄(topic-num),每個Partition目錄下都有log文件用於存放消息,當Partition有新的消息時會往該log文件后進行追加。
如果創建的Topic其備份數大於1 ,那么在Kafka集群備份數個Broker中也會創建此Topic,因此在其log.dirs目錄下也會存在該topic的目錄。
Producer端配置
Consumer端配置
3.啟動Kafka
1.啟動zk集群
#啟動zk節點
zkServer.sh start
#查看節點角色
zkServer.sh status
2.啟動Kafka集群
./kafka-server-start.sh -daemon ../config/server.properties
3.創建Topic
1.創建名為chat的Topic,Topic的分區數以及備份數都為3。
./kafka-topic.sh --create --zookeeper 192.168.1.80:2181,192.168.1.80:2182,192.168.1.80:2183 --partitions 3 --replication-factor 3 --topic "chat"
創建Topic時需指定ZK服務地址,ZK中保存了Topic的分區數以及備份數(元數據),Kafka集群中的其他節點再從ZK服務中獲取Topic的元數據來創建Topic。
2.查看各個broker中的log.dirs,可見在該目錄下都生成了chat-0、chat-1、chat2分別表示chat Topic的第一個、第二個、第三個partation,每個partation中都有.log文件用於存放Partation中的消息。
3.查看Kafka集群中chat Topic下各個Partation的狀態
./kafka-topic.sh --describe --zookeeper 192.168.1.80:2181 --topic chat
Leader:充當Leader的Broker節點(broker.id)
Replicas:存在備份的Broker節點(broker.id,不管節點是否存活)
Isr:存在備份的同時存活的Broker節點。
4.Producer發送消息
往Kafka集群中的chat主題發送消息
./kafka-console-producer.sh --broker-list 192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092 --topic chat
如果不指定Key,那么消息將會根據負載均衡策略進入相應的Partation。
5.Consumer消費消息
啟動Consumer
./kafka-console-consumer.sh --bootstrap-server 192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092 --topic chat --from-beginning
由於使用腳本文件啟動Consumer並沒有指定使用的配置文件,所以三個Consumer都不在同一個Consumer組中,因此三個Consumer都能夠消費chat主題下各個Partation中的消息。
啟動Consumer並指定配置文件
./kafka-console-consumer.sh --bootstrap-server 192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092 --topic chat --from-beginning --consumer.config ../config/consumer.properties
啟動了3個Consumer並指定使用的配置文件,默認的group.id為test-consumer-group,因此這3個Consumer都在同一個Consumer組下,Topic中各個Partation僅能被組下的唯一一個Consumer進行消費。
在啟動第一個Consumer時,Consumer組下只有一個Consumer,因此消息都會被此Consumer進行消費,當往Consumer組中添加新的Consumer時,將會重新分配擁有Partation的權利。
7.JAVA中使用Kafka
1.導入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.1</version> </dependency>
2.創建Topic
ZkUtils zkUtils = ZkUtils.apply("192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 創建一個名為chat的主題其包含2個分區,備份數是3 AdminUtils.createTopic(zkUtils, "chat", 2, 3, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close();
3.Producer發送消息
//創建Properties對象用於封裝配置項 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); //創建消息實體ProducerRecord,並指定消息上傳的topic、消息的Key、消息的Value ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic","key","value"); //發送消息 producer.send(record); //關閉連接 producer.close();
KafkaProducer是線程安全的,在線程之間可以共享單個生產者實例。
send()方法是異步的,一旦消息被保存在待發送緩沖區時此方法就立即返回,其返回Future<RecordMetadata>實例,當調用該實例的get()方法時將會阻塞直到服務器對請求進行應答(阻塞時長跟acks配置項有關),當服務器處理異常時將拋出異常。
消息由Key和Value組成,Key相同的Message將會保存在同一個Partation當中(根據Key進行散列)
4.Consumer消費消息
//創建Properties對象用於封裝配置項 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("group.id", "consumerA"); //自動提交Consumer的偏移量給Kafka服務 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<>(props); //訂閱主題,一個消費者實例可以訂閱多個主題 consumer.subscribe(Arrays.asList("chat", "hello")); //接收數據,消息存放在ConsumerRecords消息集合中 ConsumerRecords<String, String> records = consumer.poll(1000*5); //遍歷消費端消息集合獲取ConsumerRecord消費端消息實體,一個消費端消息實體包含偏移量、消息Key值、消息Value值 for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
poll(long blockTime)方法用於接收topic中的消息,當沒有消息時將會等待blockTime的時間 (單位:毫秒),執行結果需結合auto.offset.reset配置項。
使用commitSync()方法可以手動同步消費者的偏移量給Kafka 。
使用seek(TopicPartition , long)方法手動設置消費者的偏移量。
8.Spring Kafka
1.導入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency>
2.創建Kafka Producer配置類
@Configuration @EnableKafka public class KafkaProducerConfiguration { /** * Producer配置 */ private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } /** * Producer工廠 */ @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(senderProps()); } /** * KafkaTemplate */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
3.創建Kafka Consumer配置類
@Configuration @EnableKafka public class KafkaConsumerConfiguration { /** * Consumer配置 */ private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "GroupA"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * Consumer工廠 */ @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps()); } /** * Kafka監聽器 */ @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
4.Producer發送消息
@Component public class Producer { @Autowired private KafkaTemplate<String,String> kafkaTemplate; public void send() throws Exception { kafkaTemplate.send("topic", "key","value"); } }
5.Consumer消費消息
@Component public class Consumer { @KafkaListener(groupId="GroupA",topics = "topic",concurrency="3") public void consumer(String value) { System.out.println(String.format("消費消息,value:%s", value)); } }
@KafkaListener注解中的concurrency數量為開啟的Consumer數量,也就是在Consumer組下存在多少個Consumer。
9.使用Kafka要解決的問題
1.Consumer端消息丟失
當Consumer消費消息后,自動提交了offset,如果后續程序處理出錯,那么消息將會丟失,此時可以通過手動提交offset的方式進行解決。
2.重復消費
當Consumer讀取消息后,程序也成功進行處理,如果手動提交offset時出錯,則會導致重復消費,同時如果Producer重復發送消息也會導致重復消費,當發生重復消費時只需要保證冪等性即可(多次執行的結果保持一致)
如何保證冪等性
1.每次保存時,都先從數據庫查一次,如果數據已存在則表示重復消費(針對並發不大同時實時性不高的場景)
2.數據庫表添加唯一約束,當重復消費時將會插入失敗(針對沒有分庫分表的場景)
3.添加消息表並在字段上加上唯一約束,每當消費完一條消息就往表里插入一條記錄,當重復消息時將會插入失敗(針對有分庫分表的場景)
4.每次保存時,都先從Redis中查一次,如果Redis中已存在則表示重復消費(不太靠譜,除非過期時間設置很久)
*如果並發很高,需要借助Redis或者Zookeeper通過分布式鎖來進行控制(比如Producer發送了2條相同的消息,如果沒有指定Key,假設這兩條消息分別坐落在不同的Partation當中,然后剛好被兩個Consumer線程同時消費,此時就存在同步問題,需要通過分布式鎖來進行控制)
3.Broker端消息丟失
當Partition中的Leader重選舉時,也就是說Leader掛了,那么有可能導致消息未來得及同步給其他的Follower,最終導致消息丟失,此時只需要在Producer中設置acks等於all,那么Producer必須等待Leader將消息同步給所有的Follower后再進行返回。
4.消息堆積
消息堆積主要是因為Consumer消費的速度太慢了,可以通過為Topic新增Partation同時新增Consumer來進行消費從而提高消費的速度,或者將Consumer拉取的數據放入到線程池中進行處理,那么消費的速度就取決於線程池中線程的數量(要注意內存溢出),但是就不能通過Kafka監控工具中來判斷是否存在消息堆積的現象了
5.消費失敗
當消費失敗時,可以將消息放入到一個隊列當中,比如使用Redis的list結構,后續專門有一個線程來處理消費失敗的消息(定時任務)