RabbitMQ是一個消息代理,從“生產者”接收消息並傳遞消息至“消費者”,期間可根據規則路由、緩存、持久化消息。“生產者”也即message發送者以下簡稱P,相對應的“消費者”乃message接收者以下簡稱C,message通過queue由P到C,queue存在於RabbitMQ,可存儲盡可能多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message。
應用場景1-“Hello Word”
一個P向queue發送一個message,一個C從該queue接收message並打印。
producer,連接至RabbitMQ Server,聲明隊列,發送message,關閉連接,退出。
consumer,連接至RabbitMQ Server,聲明隊列,接收消息並進行處理這里為打印出消息,退出。
1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class Send 6 { 7 public static void Main() 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" }; 10 using(var connection = factory.CreateConnection()) 11 using(var channel = connection.CreateModel()) 12 { 13 channel.QueueDeclare(queue: "hello", 14 durable: false, 15 exclusive: false, 16 autoDelete: false, 17 arguments: null); 18 19 string message = "Hello World!"; 20 var body = Encoding.UTF8.GetBytes(message); 21 22 channel.BasicPublish(exchange: "", 23 routingKey: "hello", 24 basicProperties: null, 25 body: body); 26 Console.WriteLine(" [x] Sent {0}", message); 27 } 28 29 Console.WriteLine(" Press [enter] to exit."); 30 Console.ReadLine(); 31 } 32 }
1 using RabbitMQ.Client; 2 using RabbitMQ.Client.Events; 3 using System; 4 using System.Text; 5 6 class Receive 7 { 8 public static void Main() 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.QueueDeclare(queue: "hello", 15 durable: false, 16 exclusive: false, 17 autoDelete: false, 18 arguments: null); 19 20 var consumer = new EventingBasicConsumer(channel); 21 consumer.Received += (model, ea) => 22 { 23 var body = ea.Body; 24 var message = Encoding.UTF8.GetString(body); 25 Console.WriteLine(" [x] Received {0}", message); 26 }; 27 channel.BasicConsume(queue: "hello", 28 noAck: true, 29 consumer: consumer); 30 31 Console.WriteLine(" Press [enter] to exit."); 32 Console.ReadLine(); 33 } 34 } 35 }
應用場景2-work queues
將耗時的消息處理通過隊列分配給多個consumer來處理,我們稱此處的consumer為worker,我們將此處的queue稱為Task Queue,其目的是為了避免資源密集型的task的同步處理,也即立即處理task並等待完成。相反,調度task使其稍后被處理。也即把task封裝進message並發送到task queue,worker進程在后台運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。
建立連接,聲明隊列,發送可以模擬耗時任務的message,斷開連接、退出。
建立連接,聲明隊列,不斷的接收message,處理任務,進行確認。
應用場景3-Publish/Subscribe
在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。現在我們設法將一個message傳遞給多個consumer。這種模式被稱為publish/subscribe。此處以一個簡單的日志系統為例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息可以被所有運行的log接收者接收。因此,我們可以運行一個log接收者直接在屏幕上顯示log,同時運行另一個log接收者將log寫入磁盤文件。
日志消息接收者:建立連接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。
日志消息發送者:建立連接,聲明fanout類型的exchange,通過exchage向queue發送日志消息,消息被廣播給所有接收者,關閉連接,退出。
應用場景4-Routing
應用場景3中構建了簡單的log系統,可以將log message廣播至多個receiver。現在我們將考慮只把指定的message類型發送給其subscriber,比如,只把error message寫到log file而將所有log message顯示在控制台。
log message接收者:建立連接,聲明direct類型的exchange,聲明queue,使用提供的參數作為routing_key將queue綁定到exchange,開始循環接收log message並打印。
log message發送者:建立連接,聲明direct類型的exchange,生成並發送log message到exchange,關閉連接,退出。
應用場景5-topic
應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不同的log message發送給不同的subscriber(也即分別通過不同的routing_key將queue綁定到exchange,這樣exchange便可將不同的message根據message內容路由至不同的queue)。但仍然存在限制,不能根據多個規則路由消息,比如接收者要么只能收error類型的log message要么只能收info類型的message。如果我們不僅想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。為了達到此目的,需要topic類型的exchange。topic類型的exchange中routing_key中可以包含兩個特殊字符:“*”用於替代一個詞,“#”用於0個或多個詞。
log message接收者:建立連接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。
log message發送者:建立連接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉連接,退出。
應用場景6-PRC
在應用場景2中描述了如何使用work queue將耗時的task分配到不同的worker中。但是,如果我們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個完全不同的故事。這一模式被稱為遠程過程調用。現在,我們將構建一個RPC系統,包含一個client和可擴展的RPC server,通過返回斐波那契數來模擬RPC service。
RPC server:建立連接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求后調用自己的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。
RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的連接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應后的處理函數on_response根據響應關聯的correlation_id屬性作出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30為參數發起遠程過程調用。
消息隊列的選擇:kafka、rabbitmq、zeromq
一、rabbitmq
首先是百科里的一段話,Rabbitmq是流行的開源消息隊列系統,使用erlang語言進行開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。可以說從功能上rabbitmq基本上是符號這次項目要求的工具。
它的優點有:
1、完整的消息隊列系統,支持多種消息隊列模式,包括競爭消費;
2、基於AMQP
3、支持集群模式,擴展集群容量和性能比較方便,同時集成了集群的監控和管理;
4、支持消息的持久化;
缺點是:
1、需要學習比較復雜的接口和協議,比較耗費時間;
2、性能不是特別理想大概在1wqps左右;
3、使用Erlang語言,以前沒聽說過,出了問題不會排查;
二、zeromq
以前經常在內網中使用,號稱是最快的消息隊列,由於它支持的模式非常多:tcp、ipc、inproc、multicas,基本已經達到了替代標准socket的地步了,聽說linux內核已經准備將zeromq納入標准內核中了。
zeromq是一個智能傳輸層,它並不是對socket的封裝,而是在其之上有一套自己的協議,可以使用非常豐富的開發模式像扇出(fanout)、發布訂閱(pub-sub)、任務分發(task distribution)、請求響應(request-reply)等。
優點:
1、缺省為異步I/O交互,封裝了連接的維護操作,消息處理並行化;
2、性能非常不錯;
3、編程簡單,上手很快;
缺點:
1、消息無法持久化,除非自己在實現一個中間件,否則消息傳遞完成就刪除了;
2、擴展性不是很好,其實是一個消息庫,並不算是MQ;
三、kafka
日志團隊正在使用的工具,是一個消息發布訂閱系統。生產者向某個隊列發送一個數據,消費者訂閱一個隊列,一旦這個隊列內產生新的數據了,中間人就會將數據發送給所有訂閱隊列的消費者。
用術語來說生產者就是producer、消費者就是consumer、中間人就是broker,kafka主要就是這三者之間進行聯系的。
優點:
1、高吞吐量率,每秒能處理幾十萬條消息;
2、分布式架構,能夠以集群進行處理;
3、日志團隊已經建立了kafka集群,可以蹭一蹭;
缺點:
1、以前沒有使用過,需要一定的熟悉時間,和開發工作;
四、結論
日志團隊的強烈推薦,和強大的技術支持,最后決定使用kafka了,它提供的特點和優勢確實也使人心動,不過這次的調研也讓我了解了一些開源軟件的設計思路和軟件選擇的看法,后面在寫幾篇記錄一下。