這個文章主要介紹簡單的消費者的實現,rabbitMQ實現的消費者可以對消息服務器進行實時監聽,當有消息(生產者把消息推到服務器上之后),消費者可以自動去消費它,這通常是開啟一個進程去維護這個對話,它與消息服務器保持一個TCP的長連接,整個這個過程於rabbitMQ為我們提供,程序開發人員只需要實現自己的回調方法即可.
簡單的rabbitMQ消費者
/// <summary> /// 消息消費者 /// </summary> public class RabbitMqSubscriber : Lind.DDD.Commons.DisposableBase { private readonly string exchangeName; private readonly string queueName; private readonly IConnection connection; private readonly IModel channel; private bool disposed; /// <summary> /// 從消息服務器拉到消息后觸發 /// </summary> public event EventHandler<MessageReceivedEventArgs> MessageReceived; /// <summary> /// Initializes a new instance of <c>RabbitMqMessageSubscriber</c> class. /// </summary> /// <param name="uri"></param> /// <param name="exchangeName"></param> /// <param name="queueName"></param> public RabbitMqSubscriber(string uri, string queueName, string userName = "", string password = "") { this.exchangeName = exchangeName; this.queueName = queueName; var factory = new ConnectionFactory() { Uri = uri }; if (!string.IsNullOrWhiteSpace(userName)) factory.UserName = userName; if (!string.IsNullOrWhiteSpace(password)) factory.Password = password; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); } public void Subscribe() { channel.QueueDeclare( queue: this.queueName, durable: false,//持久化 exclusive: false, //獨占,只能被一個consumer使用 autoDelete: false,//自己刪除,在最后一個consumer完成后刪除它 arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var json = Encoding.UTF8.GetString(body); var message = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); this.OnMessageReceived(new MessageReceivedEventArgs(message)); channel.BasicAck(e.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); } private void OnMessageReceived(MessageReceivedEventArgs e) { this.MessageReceived?.Invoke(this, e); } protected override void Finalize(bool disposing) { if (disposing) { if (!disposed) { this.channel.Dispose(); this.connection.Dispose(); disposed = true; } } } }
簡單調用
class Program { static void Main(string[] args) { var subscriber = new Lind.DDD.RabbitMq.RabbitMqSubscriber("amqp://localhost:5672", "zzl"); subscriber.MessageReceived += Subscriber_MessageReceived; subscriber.Subscribe(); Console.ReadKey(); } private static void Subscriber_MessageReceived(object sender, RabbitMq.MessageReceivedEventArgs e) { Console.WriteLine("消費者2->消費了一個消息{0}", e.Message); Lind.DDD.Logger.LoggerFactory.Instance.Logger_Debug("消費者2->消費了一個消息{0}" + e.Message); Thread.Sleep(2000); } }
實時拉消息
RabbitMQ消息模型
通過上面圖我們可以更容易和清晰的去理解rabbitmq的工作流程.