rabbitMQ 消息队列(MQ)


 
软件开发的根本就是降低软件开发的复杂性
 
采用可复用的软件设计模型,采用合适的软件架构搭建自己的系统。
 
消息队列提供了一个异步通信协议,消息的发送者不用一直等待知道消息被成功的处理。消息被暂存于队列中,对信息感兴趣的消费者会订阅消息,并处理他们。
 
使用消息队列不是 杀鸡用牛刀 ,而是一种未雨筹谋。随着系统不断升级,你将从中获益。
 
消息队列(MQ)使用消息将应用程序连接。通过 RabbitMQ 消息代理服务器在应用程序之间路由。
 
一种通用的软件“总线”,解决应用程序之间繁重的消息通信工作
 
 
只要消息按照这些规则发布出去,任何消费者应用都能订阅感兴趣的消息。现在信息的生产者和消费者可以完全解耦合。
 
从任何发布者  到任何感兴趣的消费者   之间的信息,通过一条软件总线 动态的连接
 
 
Erlang   面向并发的编程语言
 
RabbitMQ 在应用程序和服务器之间 扮演着路由角色。
 
producer 创建消息,然后发布到代理服务器(RabbitMQ)
 
 
其实原理很简单:
生产者创建消息,消费者接受消息。你的应用可以作为生产者,也可以作为消费者。
 
“信道”才能连接
 
重要的记者消费者和接受者是消息发送和接受概念的体现,而不是客户端和服务端。
从底层开始构造,队列
AMQP,即Advanced Message Queuing Protocol
高级消息队列协议。
 
AMQP 消息路由必须有3个部分,交换器、队列、绑定。
 
生产者把消息发布到交换器上,消息最终到达队列,并被消费者接受。
绑定决定了消息如何从路由器到达特定的队列。
 
通过AMQP 的 basic.consume 命令订阅,会将信道置为接收模式,直到取消对队列订阅为止。
 
向队列单条消息通过 AMQP 的 basic.get 命令实现。
 
消费者收到每一条消息都必须进行确认,消费者必须通过AMQP 的 basic.act 参数设置为 true 。
同时 RabbitMQ 才能安全的把消息从队列中删除。
 
如果应用程序有 bug 而忘记确认消息的话,Rabbit将不会给消费者发送更多的消息了。因为在上一条消息确认前,RabbitMQ 会认为这个消费者并没有做好接受下一条消息。
 
 
 
 
 

实战 rabbitMQ

 
一、MQ(消息队列)
 
RabbitMQ(兔子消息队列)
 
MQ 为 Message  Queue ,消息队列是应用程序和应用程序之间的通信方法。
 
RabbitMQ 是一个开源的,在 AMQP 基础上完整的,可复用的企业消息系统。
 
AMQP 
  消息队列的一个协议,类似 tcp 、udp
 
RabbitMQ 是实现 AMQP 协议的具体一个东西。
 
多种开发语言的支持,java python .net
其实就是一种驱动。
 
rabbit.com 官网
 
MQ的其他产品还有很多:
ActiveMQ 、 Kafka (分布式消息订阅系统)等等
 
二、搭建 rabbitMQ 环境
首先要安装 Erlang 环境,然后在这个基础上才是 
rabbitMQ 环境
 
用户名、计算机名最好都是中文,不然可能安装失败。
 
安装成功后 可以浏览器访问
端口号: 15672
 
默认用户  gust / gust
 
其实在 docker 上安装镜像就好了
 
进入界面后 ,注册一个新号。
Demo/demo 
然后新建一个映射路径(virtual hosts)。  /demo
 
 
docker run -d --hostname localhost --name myrabbit -p 15672:15672 -p 5672:5672 rabbitmq
 
rabitmq  端口
   5672  15672  25672
 
 
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq
 
 
端口号简绍
 
5672   AMQP协议的端口 
 
25672   集群端口
 
15672  管理界面的端口
 
Docker 启动 rabbitMQ 
 
1、启动在 docker容器里
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
使用 docker logs 查看
docker logs some-rabbit
 
2、
 
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
 
3、
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 rabbitmq
 
docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
 
本地Mac 电脑启动 rabbitMQ 
1、进入文件夹
/usr/local/Cellar/rabbitmq/
 
cd  3.7.8
 
2、启动服务
sbin/rabbitmq-server 
3、浏览器访问
浏览器 http://localhost:15672可进入rabbitmq控制终端登录页面
默认用户名和密码为 
guest/guest.
 
 
三、简单队列
生产者将消息发送到队列,消费者从队列中获取消息。
 
1、先创建 pom.xml 文件
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>cn.itcast.rabbitmq</groupId>
   <artifactId>itcast-rabbitmq</artifactId>
   <version>0.0.1-SNAPSHOT</version>
 
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>3.4.1</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
         <version>1.7.7</version>
      </dependency>
      <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
         <version>3.3.2</version>
      </dependency>
      <dependency>
           <groupId>org.springframework.amqp</groupId>
           <artifactId>spring-rabbit</artifactId>
           <version>1.4.0.RELEASE</version>
       </dependency>
   </dependencies>
</project>
 
创建单独的消息通道
package cn.itcast.rabbitmq.util;
 
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
 
public class ConnectionUtil {
 
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
 
        //设置服务地址
        factory.setHost("localhost”);
 
        //端口
        factory.setPort(5672);
 
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/taotao");
        factory.setUsername("taotao");
        factory.setPassword("taotao”);
 
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}
 
创建生产者代码
 
package cn.itcast.rabbitmq.simple;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
public class Send {
 
    private final static String QUEUE_NAME = "test_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
 
        // 从连接中创建通道
        Channel channel = connection.createChannel();
 
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
 
        System.out.println(" [x] Sent '" + message + "'");
 
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
 
创建消费者的代码
package cn.itcast.rabbitmq.simple;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv {
 
    private final static String QUEUE_NAME = "test_queue";
 
    public static void main(String[] argv) throws Exception {
 
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
 
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
 
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
 
 
四、work模式
 
交给两个对象,一个前台、一个后台。
 
创建生产者
package cn.itcast.rabbitmq.work;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
public class Send {
 
    private final static String QUEUE_NAME = "test_queue_work";
 
    public static void main(String[] argv) throws Exception {
 
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
 
            Thread.sleep(i * 10);
        }
 
        channel.close();
        connection.close();
    }
}
 
创建消费者一:
package cn.itcast.rabbitmq.work;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv {
 
    private final static String QUEUE_NAME = "test_queue_work";
 
    public static void main(String[] argv) throws Exception {
 
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
 
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
 
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
 
创建消费者 2
package cn.itcast.rabbitmq.work;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv2 {
 
    private final static String QUEUE_NAME = "test_queue_work";
 
    public static void main(String[] argv) throws Exception {
 
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
 
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);
 
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
 
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
 
Work 模式的 能者多劳 模式
 
 
消息的确认模式
 
消息从队列中获取信息,服务端如何知道消息已经被消费呢?
 
看情况选择合适的 模式。
 
模式1-自动模式
    只要消息从队列中获取,无论取到后是否为成功消息,都认为消息是成功消息。
    // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME,true, consumer);
 
模式2-手动模式
  消费者从中获取到消息后,服务器就会标记其为不可用状态,等待消费者反馈,如果该消息一直没有反馈,那么一直不可用。
 
      // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);
如果为 true ,就是自动模式
 
如果为 false ,就是手动模式,同时还要反馈信息。
 
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 
订阅模式
 
一个生产者,多个消费者
每个消费者都有自己的一个队列
生产者没有将消息直接发送到队列,而是发送到了交换机
每个队列都要绑定交换机
生产者发送的消息,经过交换机。实现一个消息被多个消费者获取的目的
 
 
后面的模式都是由前面的创新而来的。
 
路由模式
 
选择性的接受数据、消息。
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM