新版建議用響應式函數編程 即Function/Supplier/Consumer方式, 后續簡稱為Function
文章中使用的外部消息中間件是rockermq, 所以后續外部消息中間件都稱為rockermq.
如: (先從圖簡單看看stream和外部消息中間件之間的綁定關系)
-> (先從圖簡單看看stream和rocketmq之間的綁定關系)
先從圖簡單看看stream和rocketmq之間的綁定關系
先粗略梳理一下流程:
初始化
- functionBindingRegistrar的Bean初始化, 根據applicaion.yaml中的配置把Function配置注冊成Bean, 並且根據配置創建出對應的Input/Ouput的MessageChannel.
(MessageChannel的具體實現方法會new MessageDispatcher的實現, MessageDispatcher是MessageChannel中的一個屬性).
- functionInitializer的Bean初始化.找到步驟1中注冊的Bean, 並把對應的Function進行處理后注冊進MessageDispatcher中
- BindingLifecycle的Bean初始化並調用其中的start方法, 將MessageChannel傳入, MessageChannel和rocketmq進行綁定.
調用
- rocketmq接收到消息.找到初始化第三步中綁定的MessageChannel調用send把消息發送.
- MessageChannel會調用子類中的方法找到對應的MessageDispatcher,並調用初始化第二步中注冊進MessageDispatcher的方法, 完成消息的消費.
以上就是一個簡單版本的 spring cloud stream 和 rocketmq 的關聯關系
總結
- 把Function配置進行處理生成對應的MessageChannel並把Fucntion注冊進MessageDispatcher中.
- 把MessageChannel和rocketmq綁定.
- rocketmq接受到消息后發送到對應的MessageChannel中消費.
后續文章會從源碼的角度一步步看上續流程