spring boot與kafka集成(spring boot 1.5.1版本)


from http://www.jianshu.com/p/a313ffd19a2e

隨着spring boot 1.5版本的發布,在spring項目中與kafka集成更為簡便。

引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

具體spring-kafka的版本由spring boot的當前版本決定。

application.properties配置文件
spring.kafka.bootstrap-servers=192.168.1.107:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

最簡化的配置僅需指定kafka主機和消息者組名即可。這里使用的是單節點kafka,集群環境中配置多個kafka主機地址即可。例如:

spring.kafka.bootstrap-servers=192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092

以下4項配置指定消息key和消息體的編解碼方式。

spring.kafka.consumer.key-deserializer spring.kafka.consumer.value-deserializer spring.kafka.producer.key-serializer spring.kafka.producer.value-serializer
消息對象
import java.util.Date; public class Message { private Long id; private String msg; private Date sendTime; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; } }
消息生產者
import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @Component public class Sender { @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void sendMessage(){ Message m = new Message(); m.setId(System.currentTimeMillis()); m.setMsg(UUID.randomUUID().toString()); m.setSendTime(new Date()); kafkaTemplate.send("test1", gson.toJson(m)); } }
消息消費者
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @Component public class Receiver { private Gson gson = new GsonBuilder().create(); @KafkaListener(topics = "test1") public void processMessage(String content) { Message m = gson.fromJson(content, Message.class); } }
運行
@SpringBootApplication public class AppStart { public static void main(String[] args) throws InterruptedException { ApplicationContext app = SpringApplication.run(AppStart.class, args); while(true){ Sender sender = app.getBean(Sender.class); sender.sendMessage(); Thread.sleep(500); } } }
通過上面的示例可以發現,相對於spring boot 1.4.x版本,1.5集成kafka主要是將以前需要手工編碼進行設置的kafka配置改由spring配置文件定義。
注意

我使用的spring boot版本是1.5.1,spring-kafka版本1.1.2,jdk1.8,該組合似乎不支持低版本的kafka。之前我使用kafka版本為2.11-0.10.0.0,向kafka發送消息時一直產生異常,后來升級kafka版本至2.11-0.10.2.0故障消失。由於測試時間有限,未作進一步分析。希望查明原因的同學能私信我。謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM