上一篇文章講了如何用spring cloud stream集成kafka,並且跑起來一個demo,如果這一次宣傳spring cloud stream的文章,其實到這里就可以啦。但實際上,工程永遠不是簡單的技術會還是不會的問題,在實際的開發中,我們會遇到很多的細節問題(簡稱坑),這篇文章,會把其中一些很小的點說一下,算是用實例告訴大家,工程的復雜性,往往體現在實際的繁瑣步驟中。
1、group的配置
在發送消息的配置里面,group是不用配置的
關於這一點的證明,可以在源代碼的注釋里面看到
org.springframework.cloud.stream.config.BindingProperties
2、修改topic的partitions
配置文件如下
bindings:
output:
binder: kafka
destination: wph-d-2 #消息發往的目的地,對應topic content-type: text/plain #消息的格式 producer: partitionCount: 7
partitionCount是用來設置partition的數量,默認是1,如果這個topic已經建了,修改partitionCount無效,會提示錯誤
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7, but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions` at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:384) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4] at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:325) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4] at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:302) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4] ... 14 common frames omitted
根據錯誤的提示,添加autoAddPartitions
kafka:
binder:
brokers: #Kafka的消息中間件服務器地址 - localhost:9092 autoAddPartitions: true
再次啟動就可以看到partitions數已經改了
autoAddPartitions屬性對應的類是org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties
設置partitionCount屬性的類是org.springframework.cloud.stream.binder.ProducerProperties
3、發送json報錯
用postman發送sendMessage/complexType報錯
在服務器端的報錯內容是:
Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain;charset=UTF-8' not supported]
原因是數據傳輸格式傳輸錯誤,要改一下postman發送數據的格式
然后就能happy的發出去了
4、正確的發送json並轉換成對象
如果我們需要傳輸json的信息,那么在發送消息端需要設置content-type為json(其實可以不寫,默認content-type就是json,后面會講)
bindings:
output:
binder: kafka
destination: wph-d-2 #消息發往的目的地,對應topic content-type: application/json #消息的格式
然后通過producer發送這個消息
@RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST) public String publishMessageComplextType(@RequestBody ChatMessage payload) { logger.info(payload.toString()); producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "chatMessage").build()); return "success"; }
這里需要注意的一點是ChatMessage的field name必須要有getter和settr方法,兩者有一就可以了,否則json轉換成對象的時候,field name收不到值。
在訂閱消息的時候,application.yml里面content-type可以不用配置,這個值默認就是“application/json”,這一點可以在org.springframework.cloud.stream.config.BindingProperties類的注釋里面看到
和上面一樣,ChatMessage的field name需要有getter或者setter的方法,二者之一就行。
接收json並轉換成類的方法如下:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chatMessage'") public void handle(ChatMessage message) { logger.info(message.toString()); }
有坑警告:如果我們把發送消息端的content-type設置成text/plain,消息訂閱端的content-type設置成application/json,就會在消息訂閱端報這個錯誤
Caused by: java.lang.IllegalStateException: argument type mismatch Endpoint [com.wphmoon.kscsclient.Consumer]
如果顛倒過來的話,發送消息端的content-type設置成application/json,消息訂閱端設置成text/plain,我實際測試過,是可以收到消息,並且能轉換成ChatMessage對象,沒有問題。