javaweb消息中間件——rabbitmq入門


概念:RabbitMQ是一款開源的消息中間件系統,由erlang開發,是AMQP的實現。

架構圖大概如上。

broker是消息隊列的服務器,比如在linux上,我們安裝的rabbitmq就是一個broker,可以通過url+username+password連接。

每個消息服務器可以創建多個vhost,默認的vhost是“/”,linux中通過rabbitmqctl add_vhost <vhost> 創建vhost,再給指定用戶授權即可。

生產者首先通過創建channel與broker連接,類似於創建一個會話,這樣可以與消息主機通信發送消息。

消息生產者將消息發送到定義的exchange上,exchange通過不同的轉發路由規則將消息轉發到相應的隊列,消費者選擇一個隊列監聽,如果有多個消費者監聽同一個隊列,默認是輪詢方式,保證每個連接有相同的收到消息的概率。

一個簡單的rabbitmq程序:

public class Producer {
    private static final String TEST_VHOST = "testvhost";
    private static final String TEST_QUEUE_NAME = "task_queue";

    private static Connection connection;
    private static Channel channel;

    public static void main(String[] args) throws IOException, TimeoutException, RabbitmqConnectionException {
        try {
            //create connectionFactory with host, username, password, vhost.
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername("test");
            connectionFactory.setPassword("test");
            connectionFactory.setHost("localhost");
            connectionFactory.setVirtualHost(TEST_VHOST);
            //get connection from connectionFactory
            connection = connectionFactory.newConnection();
            //create an session to communicate with mq host
            channel = connection.createChannel();
            //declare a queue(if not exists, create it)
            channel.queueDeclare(TEST_QUEUE_NAME, true, false, false, null);
            String message = "Hello world";
            System.out.println("sending message : " + message);
            //publish message to the declaring queue
            channel.basicPublish("", TEST_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        }catch (Exception e) {
            throw new RabbitmqConnectionException("Error connection");
        } finally {
            channel.close();
            connection.close();
        }


    }
}

  

public class Consumer {
    private static final String TEST_VHOST = "testvhost";
    private static final String TEST_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost(TEST_VHOST);
        factory.setUsername("test");
        factory.setPassword("test");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        //declaring a queue to listen
        channel.queueDeclare(TEST_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages...");

        //a piece message per time
        channel.basicQos(1);

        final com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println("Received : '" + message + "'");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TEST_QUEUE_NAME, false, consumer);
    }
}

 在spring中:

<!-- spring-rabbitmq.xml-->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:mvc="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <description>rabbitmq 連接服務配置</description>

    <mvc:component-scan base-package="com.battery.rabbitMq"/>

    <!-- 連接配置 -->
    <rabbit:connection-factory id="rabbit-connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"  virtual-host="${mq.vhost}"/>

    <rabbit:admin connection-factory="rabbit-connectionFactory"/>

    <!-- spring template聲明,注入到類中,用於將消息發送到指定隊列-->
    <rabbit:template exchange="test-mq-fanout" id="ssoTemplate"  connection-factory="rabbit-connectionFactory"  message-converter="jsonMessageConverter" />

    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- 聲明一個消息隊列(
        durable:是否持久化
        exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除
        auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列) -->
    <rabbit:queue id="test_queue" name="test_queue" durable="true" auto-delete="false" exclusive="false" />

    <!-- 定義交換機
     rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。
     rabbit:binding:設置消息queue匹配的key
     -->
    <rabbit:fanout-exchange name="test-mq-fanout" auto-declare="true" durable="true" auto-delete="false" id="test-mq-fanout">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- 消費者配置  -->

    <!-- 隊列監聽類 -->
    <bean id="queueListener" class="com.battery.rabbitMq.QueueListener"/>

    <!-- 監聽容器配置 -->
    <rabbit:listener-container connection-factory="rabbit-connectionFactory" acknowledge="manual">
        <rabbit:listener queues="test_queue" ref="queueListener" method="onMessage"/>
    </rabbit:listener-container>

</beans>
 
         
@Service
public class MQProducerImpl implements MQProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

private final static Logger LOGGER = LoggerFactory.getLogger(MQProducerImpl.class);

/**
* convertAndSend:將Java對象轉換為消息發送到匹配Key的交換機中Exchange,由於配置了JSON轉換,這里是將Java對象轉換成JSON字符串的形式。
* 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
**/
@Override
public void sendDataToQueue(Object object) {
try {
rabbitTemplate.convertAndSend(object);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
@Component
public class QueueListener implements ChannelAwareMessageListener {

    private static Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String ackMessage = new String(message.getBody(), "utf-8");
            System.out.print(ackMessage);
            logger.debug("接收到:" + new String(message.getBody(), "utf-8"));
        } catch (Exception e) {
            System.out.print(e.getMessage());
        }
    }
}

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM