springboot整合kafka
參考:
配置
依賴
需要web和kafka
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
注意,springboot版本對kafka版本影響不小,1.x可以使用1.x的kafka(比如1.1.1.RELEASE),2.0.x使用2.1.7.RELEASE,2.1.x使用 2.2.x.RELEASE;
版本不對都會導致項目無法啟動
yml配置
#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring:
kafka:
#指定kafka server的地址,集群配多個,中間,逗號隔開,或者使用 列表格式
# - 服務1
# - 服務2 ....
bootstrap-servers: 192.168.88.128:9092
#=============== provider =======================
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
acks: 1
#這個值只能大不能小了,否則會影響sleuth。可以使用的最大內存來緩存等待發送到server端的消息
buffer-memory: 1048576 # 這是最小的?
retries: 0
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 單個請求的最大大小(以字節為單位)
max.request.size: 2097152
# 從發送請求到收到ACK確認等待的最長時間(超時時間)
request.timeout.ms: 40000
# 這項設置設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比這項設置要小的多,
# 我們需要“linger”特定的時間以獲取更多的消息。 這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。
linger.ms: 1
# 消息發送失敗的情況下,重試發送的次數 存在消息發送是成功的,只是由於網絡導致ACK沒收到的重試,會出現消息被重復發送的情況
message.send.max.retries: 0
consumer:
# 指定默認消費者group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
測試
消息實體
@Data
@Accessors(chain = true)
@NoArgsConstructor
public class Message {
/**
* id
*/
private Long id;
/**
* 消息
*/
private String msg;
/**
* 時間戳
*/
private Date sendTime;
}
發送消息
/** 消息發送方
* @author jingshiyu
* @date 2019/7/31 14:04:21
* @desc
*/
@RestController
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public void send(@RequestParam String msg) {
Message message=new Message();
message.setId(123L).setMsg(msg).setSendTime(new Date());
kafkaTemplate.send("kafka_one", JSON.toJSONString(message));
}
}
就這樣,發送消息代碼就實現了。
這里關鍵的代碼為 kafkaTemplate.send()
方法,kafka_one
是 Kafka 里的 topic ,這個 topic 在 Java 程序中是不需要提前在 Kafka 中設置的,因為它會在發送的時候自動創建你設置的 topic, JSON.toJSONString(message)
是消息內容
接收消息
/**
* 監聽服務器上的kafka是否有相關的消息發過來
*/
@Component
@Slf4j
public class KafkaReceiver {
/**
* 定義此消費者接收topics = {"kafka_one"}的消息,與controller中的topic對應上即可
* @param record 變量代表消息本身,可以通過ConsumerRecord<?,?>類型的record變量來打印接收的消息的各種信息
*/
@KafkaListener(topics = {"kafka_one"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
客戶端 consumer 接收消息特別簡單,直接用@KafkaListener
注解即可,並在監聽中設置監聽的 topic ,topics 是一個數組所以是可以綁定多個主題的,上面的代碼中修改為 @KafkaListener(topics = {"zhisheng","tian"})
就可以同時監聽兩個 topic 的消息了。需要注意的是:這里的 topic 需要和消息發送類 KafkaSender.java 中設置的 topic 一致。
發送消息
啟動項目之后,調用接口發送消息
http://192.168.0.173:8083/send?msg=測試消息
將會接收到消息
record =ConsumerRecord(topic = kafka_one, partition = 0, offset = 0, CreateTime = 1564556254952, serialized key size = -1, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":123,"msg":"測試消息","sendTime":1564556254808})
message ={"id":123,"msg":"測試消息","sendTime":1564556254808}
kafka查看
./kafka-topics.sh --list --zookeeper localhost:2181 在kafka上查看topic列表
就會發現剛才我們程序中的 kafka_one
已經自己創建了