POM
<!--MQ start-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!--MQ end-->
provider
yml
spring:
kafka:
# 指定kafkaserver的地址,集群配多個,中間,逗號隔開
bootstrap-servers: 127.0.0.1:9092
producer:
#procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
#acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
#acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
#可以設置的值為:all, -1, 0, 1
acks: 1
# 每次批量發送消息的數量,produce積累到一定數據,一次發送
batch-size: 16384
# produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
buffer-memory: 33554432
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
retries: 0
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
controller
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@ApiOperation(value = "kafka provider", notes = "kafka provider")
@GetMapping("/message/send")
public boolean send(@RequestParam String message){
kafkaTemplate.send("testTopic",message);
return true;
}
}
consumer
@Component
public class ConsumerListener {
@KafkaListener(topics = "testTopic")
public void onMessage(String message){
//insertIntoDb(buffer);//這里為插入數據庫代碼
System.out.println(message);
}
}