一、kafka簡介
kafka,ActiveMQ,RabbitMQ是當今最流行的分布式消息中間件,其中kafka在性能及吞吐量方面是三者中的佼佼者,不過最近查閱官網時,官方與它的定義為一個分布式流媒體平台。kafka最主要有以下幾個方面作用:
-
- 發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
- 以容錯持久的方式存儲記錄流。
- 處理記錄發生的流
kafka有四個比較核心的API 分別為:
producer:允許應用程序發布一個消息至一個或多個kafka的topic中
consumer:允許應用程序訂閱一個或多個主題,並處理所產生的對他們記錄的數據流
stream-api: 允許應用程序從一個或多個主題上消費數據然后將消費的數據輸出到一個或多個其他的主題當中,有效地變換所述輸入流,以輸出流。類似於數據中轉站的作用
connector-api:允許構建或運行可重復使用的生產者或消費者,將topic鏈接到現有的應用程序或數據系統。官網給我們的示意圖:
kafka關鍵名詞解釋:
- producer:生產者。
- consumer:消費者。
- topic: 消息以topic為類別記錄,每一類的消息稱之為一個主題(Topic)。為了提高吞吐量,每個消息主題又會有多個分區
- broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發布的消息。
每個消息(也叫作record記錄,也被稱為消息)是由一個key,一個value和時間戳構成。
主題與日志:
每一個分區(partition)都是一個順序的、不可變的消息隊列,並且可以持續的添加。分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。Kafka集群保持所有的消息,直到它們過期,無論消息是否被消費了。實際上消費者所持有的僅有的元數據就是這個偏移量,也就是消費者在這個log中的位置。 這個偏移量由消費者控制:正常情況當消費者消費消息的時候,偏移量也線性的的增加。但是實際偏移量由消費者控制,消費者可以將偏移量重置為更老的一個偏移量,重新讀取消息。 可以看到這種設計對消費者來說操作自如, 一個消費者的操作不會影響其它消費者對此log的處理。 再說說分區。Kafka中采用分區可以處理更多的消息,不受單台服務器的限制。Topic擁有多個分區意味着它可以不受限的處理更多的數據。
二、kafka速成
1、下載kafka並解壓
kafka下載地址,注意kafka需要zookeeper的服務,因此請確保kafka服務啟動之前先運行zookeeper,請參考這篇文章。在kafka的bin目錄下有 windows的文件夾 用於在windows環境下啟動kafka
2、啟動kafka服務
> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
3、創建一個主題
我們用一個分區和一個副本創建一個名為“test”的主題:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
然后我們可以運行如下命令查看是否已經創建成功:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
當發送的主題不存在且想自動創建主題時,我們可以編輯config/server.properties
auto.create.topics.enable=true default.replication.factor=3
4、發送消息
Kafka附帶一個命令行客戶端,它將從文件或標准輸入中獲取輸入,並將其作為消息發送到Kafka集群。默認情況下,每行將作為單獨的消息發送。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
5、消費消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
6、集群搭建
首先我們為每個代理創建一個配置文件(在Windows上使用該copy
命令):
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
分別編輯上述文件:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
分別啟動:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
現在創建一個復制因子為三的新主題:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
我們可以通過以下命令查看狀態:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
7、外網配置kafka注意事項
請編輯server.properties添加如下配置:
broker.id主要做集群時區別的編號 port 默認kafka端口號 host.name 設置為雲內網地址 advertised.host.name 設置為雲外網映射地址
三、spring中使用kafka
1、編輯gradle配置文件:

dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-context compile group: 'org.springframework', name: 'spring-context', version: '5.0.4.RELEASE' // https://mvnrepository.com/artifact/org.springframework/spring-web compile group: 'org.springframework', name: 'spring-web', version: '5.0.4.RELEASE' // https://mvnrepository.com/artifact/org.springframework/spring-context-support compile group: 'org.springframework', name: 'spring-context-support', version: '5.0.4.RELEASE' // https://mvnrepository.com/artifact/org.springframework/spring-webmvc compile group: 'org.springframework', name: 'spring-webmvc', version: '5.0.4.RELEASE' // https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.4.RELEASE' // https://mvnrepository.com/artifact/org.slf4j/slf4j-api compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25' // https://mvnrepository.com/artifact/ch.qos.logback/logback-core compile group: 'ch.qos.logback', name: 'logback-core', version: '1.2.3' // https://mvnrepository.com/artifact/ch.qos.logback/logback-classic testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' testCompile group: 'junit', name: 'junit', version: '4.12' }
2、編寫AppConfig配置文件類:

package com.hzgj.lyrk.spring.study.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka @ComponentScan public class AppConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(8); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory(), true); } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(8); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConcurrency(3); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Component static class Listener { @KafkaListener(id="client_one",topics = "test") public void receive(String message) { System.out.println("收到的消息為:" + message); } @KafkaListener(id="client_two",topics = "test1") public void receive(Integer message) { System.out.println("收到的的Integer消息為:" + message); } } }
3. 編寫Main方法

package com.hzgj.lyrk.spring.study; import com.hzgj.lyrk.spring.study.config.AppConfig; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFutureCallback; public class Main { public static void main(String[] args) { AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(AppConfig.class); KafkaTemplate<String, String> kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); kafkaTemplate.send("test", 0,"msg","{\"id\":2}").addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable ex) { ex.printStackTrace(); } @Override public void onSuccess(SendResult<String, String> result) { System.out.println("發送消息成功...."); } }); } }
執行成功后得到如下結果: