spring cloud stream 3.1.2 源碼搭配rocketmq學習 (一)


新版建議用響應式函數編程 即Function/Supplier/Consumer方式, 后續簡稱為Function

文章中使用的外部消息中間件是rockermq, 所以后續外部消息中間件都稱為rockermq.
如: (先從圖簡單看看stream和外部消息中間件之間的綁定關系)
-> (先從圖簡單看看stream和rocketmq之間的綁定關系)

先從圖簡單看看stream和rocketmq之間的綁定關系

image

先粗略梳理一下流程:

初始化

  1. functionBindingRegistrar的Bean初始化, 根據applicaion.yaml中的配置把Function配置注冊成Bean, 並且根據配置創建出對應的Input/Ouput的MessageChannel.

    (MessageChannel的具體實現方法會new MessageDispatcher的實現, MessageDispatcher是MessageChannel中的一個屬性).

  2. functionInitializer的Bean初始化.找到步驟1中注冊的Bean, 並把對應的Function進行處理后注冊進MessageDispatcher中
  3. BindingLifecycle的Bean初始化並調用其中的start方法, 將MessageChannel傳入, MessageChannel和rocketmq進行綁定.

調用

  1. rocketmq接收到消息.找到初始化第三步中綁定的MessageChannel調用send把消息發送.
  2. MessageChannel會調用子類中的方法找到對應的MessageDispatcher,並調用初始化第二步中注冊進MessageDispatcher的方法, 完成消息的消費.

以上就是一個簡單版本的 spring cloud stream 和 rocketmq 的關聯關系

總結

  1. Function配置進行處理生成對應的MessageChannel並把Fucntion注冊進MessageDispatcher中.
  2. 把MessageChannel和rocketmq綁定.
  3. rocketmq接受到消息后發送到對應的MessageChannel中消費.

后續文章會從源碼的角度一步步看上續流程

Wish.
Do.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM