基於 RabbitMQ 實現跨語言的消息調度
微服務的盛行,使我們由原來的單機”巨服務“的項目拆分成了不同的業務相對獨立的模塊,以及與業務不相關的中間件模塊。這樣我們免不了在公司不同的模塊項目使用不同的團隊,而各自的團隊所擅長的開發語言也會不一致(當然,我想大多數都是統一了語言體系)。但是在微服務體系下,使用各自語言的優勢開發對應的模塊是最合適也是合理的訴求。
現在以消息中間件為例子,我們用 rabbitmq 將 .NET 和 Golang 連接起來。
前提
RabbitMQ 的准備工作這里省略,用 docker 可以很快的搭建出來,詳情請移步谷歌。這里我也給一個我查資料的記錄:Docker 安裝運行 Rabbitmq
.NET
關於 .NET 的 RabbitMQ 的消息中間件組件我們使用 EasyNetQ 對消息進行管理調度。我們以新建一個 MQ.EasyNetQ.Producer
api 項目。我們根據 EasyNetQ 官方文檔的 Quick-Start 的例子在 Program.cs
新建一個 RabbitMQ 連接並推送消息:
using (var bus = RabbitHutch.CreateBus("host=localhost:5672;username=guest;password=guest"))
{
var input = "";
Console.WriteLine("Enter a message. 'Quit' to quit.");
while ((input = Console.ReadLine()) != "Quit")
{
bus.Publish(new TextMessage
{
Text = input
});
}
}
然后新建一個消費端項目 MQ.EasyNETQ.Customer
,繼續在 Program.cs
建立與 RabbitMQ 的連接並開啟訂閱:
using (var bus = RabbitHutch.CreateBus("host=localhost:5672;username=guest;password=guest"))
{
bus.PubSub.Subscribe<TextMessage>("test", HandleTextMessage);
}
static void HandleTextMessage(TextMessage textMessage)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Got message: {0}", textMessage.Text);
Console.ResetColor();
}
運行發現沒有問題。
需要注意一下,安裝成功之后 RabbitMQ 自帶消息重試,以及持久化的錯誤消息隊列,以便后續的消息恢復。具體詳見 RabbitMQ 的官方文檔。
ok,.NET 這塊對 RabbitMQ 消息的調度管理初步成功。接下來我們嘗試用 Go
Go
Go 下的 RabbitMQ 組件我們用官方推薦的 amqp 庫。同樣我們新建一個生產者在 src/producer
文件夾下的 producer.go
下。
由於本身 go 的一些限制還有為了方便起見,我把兩個項目放在同一個目錄下以不同的文件夾命名來區分。
同樣我們根據資料以及官方示例 demo 很容易入門在 main 函數寫下如下代碼片段:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "RabbitMQ 連接失敗!")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "打開通信通道失敗!")
defer ch.Close()
// 申明隊列
queue, err := declareQueue(ch)
failOnError(err, "隊列申明失敗")
// 申明交換機
declareExchange(ch)
// 綁定交換機
err = ch.QueueBind(queue.Name, queue.Name, "MQ.Shared.Messages.CreateUserMessage, MQ.Shared", false, nil)
failOnError(err, "綁定隊列失敗")
// 發送消息
err = publish(ch, queue, &src.CreateUserMessage{"marsonshine", 27, true, "marson@163.com", time.Now()})
failOnError(err, "發送消息失敗")
如何申明交換機和隊列以及綁定操作我這里就省略了,然后是發送消息函數
func publish(ch *amqp.Channel, queue amqp.Queue, body interface{}) error {
var network bytes.Buffer
gob.Register(src.CreateUserMessage{})
enc := gob.NewEncoder(&network)
err := enc.Encode(body)
if err != nil {
return err
}
err = ch.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: network.Bytes(),
})
log.Printf("[x] 發送消息 %s", body)
return err
}
這里我用的高性能的序列化插件 encoding/gob,這里就是我后面與 .NET 交互時候遇到的問題,后續在說明。
借來是消費端,代碼路徑在 src/customer/customer.go
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
... 省略
ch, err := conn.Channel()
declareQueue(ch)
// 消費隊列信息
err = consumer(ch, queue)
failOnError(err, "接受消息失敗")
消費消息代碼如下:
func consumer(ch *amqp.Channel, queue amqp.Queue) error {
msgs, err := ch.Consume(queue.Name, "", true, false, false, false, nil)
failOnError(err, "消費者注冊失敗")
forever := make(chan bool)
go func() {
for d := range msgs {
buf := bytes.NewBuffer(d.Body)
dec := gob.NewDecoder(buf)
var user = src.CreateUserMessage{}
err := dec.Decode(&user)
if err != nil {
log.Printf("接受消息失敗: %s", err.Error())
} else {
log.Printf("Received a message: %v", user)
}
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
return err
}
運行項目發現也沒有問題。
在使用兩邊各自的 RabbitMQ 客戶端組件沒有問題之后,我們開始考慮處理下一個核心問題:如何實現 Go 段服務發消息,應用端 .NET 如何消費。這理論上是很好解決的,因為 .Net 與 Golang 用的消息中間件都是 RabbitMQ,只要.Net 與 Golang 都實現了 RabbitMQ 的消息協議(比如 AMQP 協議)就能完成一方消息的推送,另一方消費的目的。
考慮這個問題並不是空穴來風,因為 Go 是用作處理底層平台 rpc 模塊,除了底層平台級不同模塊之間的通信外,各大應用端也要訂閱平台的基礎數據。
Go 推送消息,Net 消費及其出現的問題
到這一步的時候,出現問題了,登錄 RabbitMQ 管理 UI 發現 Go 有正常發出消息,queue 以及 exchange 都是對應上的,在 .NET 的訂閱方式就如上面寫的代碼一樣。在 queue 中的消息在重試一段時間之后如果還是失敗,EasyNetQ 會將無法正常消費的消息轉到錯誤隊列中去。並且可以查看發生具體的錯誤消息,結果發現都是報 ArgumentNullException:typeName is null
類型錯誤。奇怪的是我斷點調試也進不來斷點,說明 EasyNetQ 在消費消息的時候壓根沒有運行這段訂閱代碼:
using (var bus = RabbitHutch.CreateBus("host=localhost:5672;username=guest;password=guest"))
{
bus.PubSub.Subscribe<CreateUserMessage>("test", HandleCreateUserMessage);
}
static void HandleCreateUserMessage(CreateUserMessage message) {
Logger.LogInformation($"接收消息:{JsonSerializer.Serialize(message)} 時間:{DateTimeOffset.Now}");
}
后來也去翻 EasyNetQ 源碼,得知是因為還沒到我寫的這個訂閱階段的代碼,而是在這段訂閱代碼 IDisposable Consume(IQueue queue, MessageHandler onMessage, Action<IConsumerConfiguration> configure)
。這里面有個核心的參數就是 onMessage
,從建立連接到消費具體隊列的消息,這個參數是一直傳遞下去的。EasyNetQ 會根據初始化與 RabbitMQ 連接的參數來創建消費,比如建立隊列時傳遞 isExclusive = true
就會創建一個瞬時消費者,只有當前連接能訪問,並且關閉時會自動刪除。EasyNetQ 默認會初始化一個持久化消費者 PersistentConsumer,然后觸發內部消費者構造一個 BasicConsumer 共給 RabbitMQ.Client 調用觸發方法 HandleBasicDeliver
,由 RabbitMQ.Client 調用傳遞所需要的參數,而報的錯誤也是在這里,因為從 Go 發出的消息,.NET 接收無法解析到對應的元數據信息,所以獲取的 IBasicProperties 對象是空的,由此觸發了參數檢查造成報錯。
我們把消費端改成這樣就能發現 content
能正常接收
bus.Advanced.Consume(queue, (body, properties, info) =>
{
string content = Encoding.UTF8.GetString(body);
var userMessage = System.Text.Json.JsonSerializer.Deserialize<CreateUserMessage>(body);
Logger.LogInformation($"接收消息:{System.Text.Json.JsonSerializer.Serialize(userMessage)} 時間:{DateTimeOffset.Now}");
});
斷點能進來了,就能繼續往下進行了,隨后就會又碰到序列化失敗的問題,因為 content 接收的內容是亂碼的,跨語言之間經常出現的問題就是編碼,所以我把目光又瞄向了 Go,現在我們再來看下 Go 的發消息的那段代碼:
var network bytes.Buffer
gob.Register(src.CreateUserMessage{})
enc := gob.NewEncoder(&network)
err := enc.Encode(body)
...
err = ch.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: network.Bytes(),
})
...
Go 編碼庫 encoding/gob
我首先在網上查資料發現 gob 這個庫編碼是用的 gbk 編碼,實則不然,翻看源碼就知道是用的 utf-8,並且也查明 gob 這個庫是不能指定編碼格式的。無論我是改 ContentType 的類型,在 .Net 消費端依舊無法正常接收。難道只能用 json 序列化傳遞消息?為了弄明白這個,我開始查閱這個 gob 庫是否支持跨語言,也就是說 gob 這個庫是否實現了外界公共協議。最后在官網博客下查到了,encoding/gob 只適用於 Go 語言環境,所以在性能方面非常突出。在這里我貼出博客中的一小段原話,引自 https://blog.golang.org/gob
First, and most obvious, it had to be very easy to use. First, because Go has reflection, there is no need for a separate interface definition language or "protocol compiler". The data structure itself is all the package should need to figure out how to encode and decode it. On the other hand, this approach means that gobs will never work as well with other languages, but that's OK: gobs are unashamedly Go-centric.
既然不支持跨語言,那就心安理得的用 json 了,如果用不了 gob,想追求高性能的化,那么其實還可以用 protobuf 協議或是其它二進制協議來序列化,核心就是雙方語言協議格式統一即可。現在的 publish 函數如下
func publish(ch *amqp.Channel, queue amqp.Queue, body interface{}) error {
buffer, err := json.Marshal(body)
if err != nil {
return err
}
err = ch.Publish(
"",
queue.Name,
false,
false,
amqp.Publishing{
ContentType: "applicaton/json",
Body: buffer,
})
log.Printf("[x] 發送消息 %s", body)
return err
}
這樣 .NET 消費端就能成功接收消息了。
封裝 EasyNetQ 與最佳實踐
從前面的使用來看,我們把業務處理都放在 Program 明顯是不合適的,這里應該只關心模塊,與業務無關的。
幸好 EasyNetQ 考慮到了這點,提供了自動訂閱機制。雖然官網只給出了 Windsor 的例子,但是也很容易就能做到類似下面的封裝代碼
// EasyNetRabbitMQICollectionExtensions.cs
public static RabbitMQEasyNetBuilder EasyNetRabbitMQBuilder(this IServiceCollection services, IConfiguration configuration)
{
string username = configuration["RabbitMQ:UserName"];
string password = configuration["RabbitMQ:Password"];
var connectionString = (ConnectionString)$"host={configuration["RabbitMQ:Server"]},{configuration["RabbitMQ:Server"]}:5673;username={username};password={password}";
// publisherConfirms = true 為開啟推送消息確認,建議開啟,性能剛高
// 因為不加上則當 rabbitmq 不可用時,發送消息會系統錯誤,而開啟發送確認則不會,更具有伸縮性
connectionString.Append("publisherConfirms=true");
var bus = RabbitHutch.CreateBus(connectionString);
services.AddSingleton(bus);
return new RabbitMQEasyNetBuilder(services);
}
然后開啟自動訂閱:
// RabbitMQEasyNetBuilder.cs
public void UseAutoSubscriber(string subscriptionIdPrefix)
{
_services.AddSingleton<MessageDispatcher>();
_services.AddSingleton<AutoSubscriber>(provider =>
{
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), subscriptionIdPrefix)
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>()
};
return subscriber;
});
}
這里注入的 MessageDispatcher
類跟 WindsorMessageDispatcher
差不多,依葫蘆畫瓢。
最后在提供 Configure 觸發自動訂閱:
// IApplicationBuilderExtensions.cs
public static void UseAutoSubscriber(this IApplicationBuilder app,Assembly[] assemblies)
{
var subscriber = app.ApplicationServices.GetService<AutoSubscriber>();
subscriber.Subscribe(assemblies);
...
}
這樣我們就可以直接定義 IConsumer<Message>
的處理程序類即可,完全解耦了業務:
public class UserMessageHandler : IConsumeAsync<CreateUserMessage>
{
private readonly ILoggerFactory _loggerFactory;
public UserMessageHandler(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
public ILogger Logger => _loggerFactory.CreateLogger<UserMessageHandler>();
[ForTopic(Consts.Topic.User)]
public async Task ConsumeAsync(CreateUserMessage message, CancellationToken cancellationToken = default)
{
Logger.LogInformation($"接收消息:{JsonSerializer.Serialize(message)} 時間:{DateTimeOffset.Now}");
//throw new NotSupportedException();
await Task.Yield();
}
}
還沒結束,除了這種推送訂閱方式,EasyNetQ 還提供了 Request/Response,RPC 模式。本質上還是通過 exchange 對 queue 進行消息調度。只是 EasyNetQ 內部做了很多工作,以至於讓我們使用非常方便。那么針對這種模式也是可以做到完全解耦的,重點來了,這個是官網沒有的姿勢啊,且看下面代碼
public interface IResponder
{
void Subscribe();
}
public abstract class ResponderBase : IResponder
{
private readonly IBus _bus;
private ILogger _logger;
public IBus Bus => _bus;
public ILogger Logger
{
get { return _logger ??= NullLogger.Instance; }
set { _logger = value; }
}
protected ResponderBase(IBus bus)
{
_bus = bus;
}
public abstract void Subscribe();
}
先建立一個規約 IResponder
,並給一個基類實現。然后在拓展方法 IApplicationBuilderExtensions.UseAutoSubscriber
中加入如 AutoSubscriber 機制的代碼即可,完整的方法如下:
public static void UseAutoSubscriber(this IApplicationBuilder app,Assembly[] assemblies)
{
var subscriber = app.ApplicationServices.GetService<AutoSubscriber>();
subscriber.Subscribe(assemblies);
var requests = app.ApplicationServices.GetServices<IResponder>();
foreach (var request in requests)
{
request.Subscribe();
}
var advancedSubscribers = app.ApplicationServices.GetServices<IAdvancedSubscriber>();
foreach (var advanced in advancedSubscribers)
{
advanced.Subscribe();
}
}
這樣 Request/Response 與 EasyNetQ 高級 API 都能與業務很好的解耦了。只需要定義各自的 MessageHandler 即可。
最后
總體來說雖然踩坑了(明確來說不是庫的坑,而是對其不熟導致的),但是也如願解決了問題點。在實施多語言交互時,一定要注意彼此之間的差異,要定義好規范協議,在解決基本的交互問題之后,就開始繼續深入進行重構。雖然目前只是項目演示階段,等項目真正執行下去肯定還會碰到更多問題,特別是 Go,才接觸一星期,公司決定用 Go 作為底層核心 rpc 模塊,我個人還是很擔心的,因為我的 go 之道還有很有很長的路要走。
整個 mq 示例源碼地址托管在 https://github.com/MS-Practice/mq