Rabbitmq實現負載均衡與消息持久化


 

 

Rabbitmq 是對AMQP協議的一種實現。使用范圍也比較廣泛,主要用於消息異步通訊。

一,默認情況下Rabbitmq使用輪詢(round-robin)方式轉發消息。為了較好實現負載,可以在消息接收方指定,每次接收到一條,這樣可以緩解單一服務器壓力。

代碼如下:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);//設置每次接收一條
為了保證消息不丟失,取消自動ACK,改為只有在完全處理消息后再ACK。
如:
Consumer consumer = new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
Thread.sleep(10000);
}
catch(Exception ex)
{

}
System.out.println("received Message:" + message);
channel.basicAck(envelope.getDeliveryTag(), false);//處理完成后ACK
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);//取消自動ACK
二,為了保證在Rabbitmq在宕機后,仍不丟失消息,需要將隊列和發布的消息都聲明為可持久化的。
如:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, bytes);

三,Rabbitmq 的MessageModel(消息模型)
在Rabbitmq的消息模型中,我們決不應該將消息直接發送到queue.事實上,消息發送者並不關心消息是否被路由或被入隊列或被接收並處理。
生產者應只與Exchange(交換器)打交道,
Exchange的作用:從生產者接受消息,向消費者發送消息。
The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.【所以交換器必須准確地知道怎樣處理消息,是否應該加到一個指定的隊列,還是發送到多個隊列,還是應該拋棄該消息。】
指定Exchange的Rule,即以何種方式轉發消息。
Rabbitmq共有四種:direct,topic ,headers和 fanout,NamelessExchange.
fanout,這種方式很簡單,就是一個廣播,把消息轉給所有的訂閱者;有幾個訂閱者,消息就會被復制幾份。
NamelessExchange,無Exchange,消息以輪詢(round-robin)方式,發送給消費者,通過routingKey識別對應的消費者。
【提示】:
rabbitmqctl list_exchanges ,用於查看當前Rabbitmq正在運行的交換器;rabbitmqctl list_bindings,查看當前綁定數
eg:
生產者只負責發送消息,而不關心這些消息是否被處理,也不關心消息是否被拋棄;消息由Exchange根據具體rule處理。
private static final String EXCHANGE_NAME = "logs";
private static final String EXCHANGE_TYPE="fanout";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

 

 如:以fanout方式處理消息:消息會發送給所有的訂閱者,(與routingKey無關)
    private static final String exchangeName="logs";
    private static final String exchangeType = "fanout";
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        Connection connection = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            channel.exchangeDeclare(exchangeName,exchangeType);
            //創建一個隊列,用於接收消息
            String queueName= channel.queueDeclare().getQueue();
            channel.queueBind(queueName,exchangeName,"");
            System.out.println("Waiting for messages...,over it ,Press CTRL+C ");
            Consumer consumer = new DefaultConsumer(channel) {

                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("received Message:" + message);
                    try {
                        doWork(message);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }

                }
            };
            channel.basicConsume(queueName, false, consumer);

        } finally {

        }
    }

 

Bindings,將隊列綁定到Exchange,說明可以從Exchange接收消息。【A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.】
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

direct,只有當routingKey,與bindingKey相同Exchange才能會推送消息。
如:
生產者:
channel.basicPublish(exchangeName, "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, bytes);//片斷
生產者在發送消息時指定routingKey.

消費者:可以綁定多個Key,以接收來自多個routingKey的消息
for (String bindingKey : typeArr) {
channel.queueBind(queueName, exchangeName, bindingKey);
}
//接收者綁定Key.

topic ,生產者在發送消息時,指定准確的routingKey(多個單詞以.號分隔),當bindingKey模式匹配到routingKey時,則接收消息。
注意:routingKey和bindingKey的長度不能超過255.
*(star),只匹配一個單詞。#(hash),能匹配0個或多個單詞

三RPC
分布式遠程調用。

 
 



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM