Kafka是一種分布式的,基於發布/訂閱的消息系統。
Kafka的組成包括:
- Kafka將消息以topic為單位進行歸納。
- 將向Kafka topic發布消息的程序成為producers.
- 將預訂topics並消費消息的程序成為consumer.
- Kafka以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker.
Kafka的下載:https://kafka.apache.org/downloads 下載最新版本
解壓后修改配置(config/server.properties:broker.id、log.dirs)
vim config/server.properties
broker.id = 1
log.dirs = "日志目錄地址"
啟動服務:
- Kafka用到了Zookeeper,所有首先啟動Zookper,下面簡單的啟用一個單實例的Zookkeeper服務。可以在命令的結尾加個&符號,這樣就可以啟動后離開控制台。
bin/zookeeper-server-start.sh config/zookeeper.properties &
- 現在啟動Kafka
bin/kafka-server-start.sh config/server.properties
- 創建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 啟動consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Springboot中整合Kafka:
pom文件中:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
application.yml中:
spring:
# KAFKA
kafka:
# ָkafka服務器地址,可以指定多個
bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
#=============== producer生產者配置 =======================
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
# 緩存容量
buffer-memory: 33554432
# ָ指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#=============== consumer消費者配置 =======================
consumer:
#指定默認消費者的group id
group-id: test-app
#earliest
#當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
#latest
#當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
#none
#topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
auto-offset-reset: latest
enable-auto-commit: true
auto-commit-interval: 100ms
#指定消費key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
定義一個生產者類:KafkaSender 負責消息推送:
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
public void send(String topic, String taskid, String jsonStr) {
//發送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
//推送成功
public void onSuccess(SendResult<String, Object> result) {
logger.info(topic + " 生產者 發送消息成功:" + result.toString());
}
@Override
//推送失敗
public void onFailure(Throwable ex) {
logger.info(topic + " 生產者 發送消息失敗:" + ex.getMessage());
}
});
}
}
定義一個消費者類:KafkaCustomer 用來接收消息
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//下面的主題是一個數組,可以同時訂閱多主題,只需按數組格式即可,也就是用“,”隔開
@KafkaListener(topics = {"testTopic"})
public void receive(ConsumerRecord<?, ?> record){
logger.info("消費得到的消息---key: " + record.key());
logger.info("消費得到的消息---value: " + record.value().toString());
}
}
