RABBITMQ(Maven)


簡單模式(一個生產者,一個隊列,一個消費者)

1.導入jar包

              <dependency>

                     <groupId>com.rabbitmq</groupId>

                     <artifactId>amqp-client</artifactId>

                     <version>3.4.1</version>

              </dependency>

 

2.創建rabbitmq連接的工具類

package com.rabbitmq.util;

 

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

 *   創建rabbitmq連接的工具類

 * @author KFS

 *

 */

public class ConnectionUtil {

      

       public static Connection getConnection() throws Exception{

              //創建連接工廠

              ConnectionFactory connectionFactory=new ConnectionFactory();

              //設置參數

              connectionFactory.setHost("127.0.0.1");//主機ip

              connectionFactory.setVirtualHost("/taotao");//虛擬主機名

              connectionFactory.setUsername("admin");//賬號

              connectionFactory.setPassword("admin");//密碼

              //創建連接

              Connection newConnection = connectionFactory.newConnection();

              return newConnection;

       }

}

 

3.simple模式發送消息

package com.simple.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   simple模式發送消息

 * @author KFS

 *

 */

public class Send {

 

       public static void main(String[] args) throws Exception{

              //通過rabbitmq工具類得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               *   創建消息隊列(如果有可以不用創建,但創建會覆蓋之前的)

               *   第一參數:隊列名稱

               *   第二參數:隊列是否持久化(存儲到磁盤)

               *   第三參數:隊列是否被獨占

               *   第四參數:隊列是否自動刪除

               *   第五參數:

               */

              channel.queueDeclare("test_simple_queue", false, false, false, null);

              //創建消息

              String message="simple_queue";

              /*

               *   發送消息

               *   第一參數:交換機名(簡單模式不用交換機,但不能用null)

               *   第二參數:隊列名稱

               *   第三參數:

               *   第四參數:消息(字節流)

               *

               */

              channel.basicPublish("", "test_simple_queue", null, message.getBytes());

              System.out.println("發送的消息:"+message);

              //關閉資源

              channel.close();

              connection.close();

       }

 

}

 

4.simple模式接受消息

package com.simple.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   simple模式接受消息

 * @author KFS

 *

 */

public class Receive {

 

       public static void main(String[] args) throws Exception{

              //通過rabbitmq工具類得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               *   創建消息隊列(如果有可以不用創建,但創建會覆蓋之前的)

               *   第一參數:隊列名稱

               *   第二參數:隊列是否持久化(存儲到磁盤)

               *   第三參數:隊列是否被獨占

               *   第四參數:隊列是否自動刪除

               *   第五參數:

               */

              channel.queueDeclare("test_simple_queue", false, false, false, null);

              //定義消費者

              QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

              /*

               *   監聽隊列

               *   第一參數:隊列名稱

               *   第二參數:是否自動回復完成接受

               *   第三參數:消費者名稱

               */

              channel.basicConsume("test_simple_queue",true, queueingConsumer);

             

              while(true) {

                     //獲取消息

                     Delivery nextDelivery = queueingConsumer.nextDelivery();

                     //打印消息

                     String message=new String(nextDelivery.getBody());

                     System.out.println(message);

              }

             

       }

 

}

 

 

work模式(一個生產者,一個隊列,多個消費者)

1.work模式發送消息

package com.work.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   work模式發送消息

 * @author KFS

 *

 */

public class Send {

 

       public static void main(String[] args) throws Exception{

              //通過rabbitmq連接工具類得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               * 創建消息隊列(如果有就不用創建,但創建不會錯)

               * 第一參數:隊列名稱

               * 第二參數:該隊列是否持久化

               * 第三參數:該隊列是否被獨占

               * 第四參數:該隊列是否自動刪除

               * 第五參數:

               *

               */

              channel.queueDeclare("test_work_queue", false, false, false, null);

              for(int i=1;i<=100;i++) {

                     //創建消息

                     String message="work_queue"+i;

                     /*

                      * 發送消息

                      * 第一參數:交換機名

                      * 第二參數:隊列名稱

                      * 第三參數:

                      * 第四參數:消息(字節流)

                      *

                      */

                     channel.basicPublish("", "test_work_queue", null, message.getBytes());

                     System.out.println("work_發送的消息:"+message);

              }

              //關閉資源

              channel.close();

              connection.close();

       }

 

}

 

a)      work模式平均接受消息(一個就是一個消費者,可以創建多個消費者,不過是平均分配消息。)

package com.work.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   work模式接受消息1

 * @author KFS

 *

 */

public class Receive1 {

 

       public static void main(String[] args) throws Exception{

              //得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               * 創建消息隊列(如果有就不用創建,但創建不會錯)

               * 第一參數:隊列名稱

               * 第二參數:該隊列是否持久化

               * 第三參數:該隊列是否被獨占

               * 第四參數:該隊列是否自動刪除

               * 第五參數:

               *

               */

              channel.queueDeclare("test_work_queue", false, false, false, null);

              //創建隊列消費者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              /*

               * 監聽隊列中的內容

               * 第一參數:隊列名稱

               * 第二參數:是否自動回復

               *

               */

              channel.basicConsume("test_work_queue",true, queueingConsumer);

             

              //獲取消息

              while(true) {

                     Delivery nextDelivery = queueingConsumer.nextDelivery();

                     String message=new String(nextDelivery.getBody());

                     Thread.sleep(1000);

                     System.out.println("接受消息:"+message);

              }

       }

 

}

 

b)      work模式能者多勞接受消息(一個生產者,一個隊列,可以創建多個消費者,不過這是能者多勞的)

package com.work.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   work模式接受消息1

 * @author KFS

 *

 */

public class Receive1 {

 

       public static void main(String[] args) throws Exception{

              //得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               * 創建消息隊列(如果有就不用創建,但創建不會錯)

               * 第一參數:隊列名稱

               * 第二參數:該隊列是否持久化

               * 第三參數:該隊列是否被獨占

               * 第四參數:該隊列是否自動刪除

               * 第五參數:

               *

               */

              channel.queueDeclare("test_work_queue", false, false, false, null);

              //創建隊列消費者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              //設置同一時刻只會發送一條消息給消費者(要放在監聽隊列內容之上)

              channel.basicQos(1);

              /*

               * 監聽隊列中的內容

               * 第一參數:隊列名稱

               * 第二參數:是否自動回復(能者多勞需要手動回復)

               *

               */

              channel.basicConsume("test_work_queue",false, queueingConsumer);

             

             

              //獲取消息

              while(true) {

                     Delivery nextDelivery = queueingConsumer.nextDelivery();

                     String message=new String(nextDelivery.getBody());

                     Thread.sleep(10);

                     System.out.println("接受消息:"+message);

                     //手動回復

                     channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

              }

       }

 

}

 

訂閱模式,交換機,exchange:fanout(一個生產者,一個交換機,多個隊列,多個消費者)

1.訂閱模式發送消息

package com.fanout.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   訂閱模式(exchange,交換機)發送消息

 * @author KFS

 *

 */

public class Send {

 

       public static void main(String[] args) throws Exception{

              //通過rabbitmq連接工具類得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               * 創建交換機exchange

               * 第一參數:交換機名稱

               * 第二參數:交換機類型:

               *

               */

              channel.exchangeDeclare("test_fanout", "fanout");

             

              //消息內容

              String message="testFanout";

              /*

               * 發送消息

               * 第一參數:交換機名稱

               * 第二參數:

               * 第三參數:

               * 第四參數:消息(字節流)

               *

               */

              channel.basicPublish("test_fanout", "", null, message.getBytes());

              System.out.println("發送消息:"+message);

             

              //關閉資源

              channel.close();

              connection.close();

       }

 

}

 

2.訂閱模式接收消息(一個生產者,一個交換機,多個隊列,隊列名稱不同,多個消費者,消費者都能收到消息)

package com.fanout.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   訂閱模式接受消息

 * @author KFS

 *

 */

public class Receive1 {

 

       public static void main(String[] args) throws Exception{

              //通過rabbitmq連接工具類得到連接

              Connection connection=ConnectionUtil.getConnection();

              //創建通道

              Channel channel = connection.createChannel();

              /*

               * 創建交換機exchange

               * 第一參數:交換機名稱

               * 第二參數:交換機類型:

               *

               */

              channel.exchangeDeclare("test_fanout", "fanout");

              //創建隊列

              channel.queueDeclare("test_fanout_queue1", false, false, false, null);

              /*

               * 綁定隊列到交換機

               * 第一參數:隊列名稱

               * 第二參數:交換機名稱

               * 第三參數:

               *

               */

              channel.queueBind("test_fanout_queue1", "test_fanout", "");

        // 同一時刻服務器只會發一條消息給消費者

        channel.basicQos(1);

       

        //定義消費者

        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

        //監聽隊列

        channel.basicConsume("test_fanout_queue1", false,queueingConsumer);

       

        while(true) {

               //獲取消息

               Delivery nextDelivery = queueingConsumer.nextDelivery();

               String message=new String(nextDelivery.getBody());

               System.out.println(message);

               Thread.sleep(10);

              

               //手動回復完成

               channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

        }

       

       

       }

 

}

 

路由模式,exchange:direct(和訂閱模式有相同點,一個生產者,一個交換機,多個隊列,不同的隊列名,多個消費者)

1.路由模式發送消息

package com.direct.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

 

/**

 *   路由模式redirect發送消息

 * @author KFS

 *

 */

public class Send {

 

       public static void main(String[] args) throws Exception{

              //1.通過rabbitmq連接工具類創建連接

              Connection connection = ConnectionUtil.getConnection();

              //2通過連接.創建通道

              Channel channel = connection.createChannel();

              /*

               * 3.創建交換機

               * 第一參數:交換機名稱

               * 第二參數:交換機類型

               *

               */

              channel.exchangeDeclare("test_direct_exchange", "direct");

             

              String message="test_direct";

              /*

               * 4.發送消息

               * 第一參數:交換機名稱

               * 第二參數:鑰匙(接受方的是這個的才能接受)

               * 第三參數:

               * 第四參數:消息(字節流)

               *

               */

              channel.basicPublish("test_direct_exchange", "key1", null, message.getBytes());

              System.out.println("發送消息:"+message);

             

              //關閉資源

              channel.close();

              connection.close();

       }

 

}

 

2.路由模式接收消息(一個生產者,一個交換機,多個隊列,隊列名稱不同,多個消費者,消費者都能收到消息)

 

package com.direct.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

 

/**

 *   路由模式接收消息1

 * @author KFS

 *

 */

public class Receive1 {

 

       public static void main(String[] args) throws Exception{

              //1.通過rabbitmq連接工具類創建連接

              Connection connection = ConnectionUtil.getConnection();

              //2.通過連接創建通道

              Channel channel = connection.createChannel();

              /*

               * 3.通過通道創建隊列

               *

               */

              channel.queueDeclare("test_direct_queue1", false, false, false, null);

              /*

               * 4.綁定隊列和交換機

               * 第一參數:隊列名稱

               * 第二參數:交換機名稱

               * 第三參數:鑰匙

               *

               */

              channel.queueBind("test_direct_queue1", "test_direct_exchange", "key1");

              //同一時刻服務器只發一條消息

              channel.basicQos(1);

             

              //定義隊列消費者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              //監聽隊列,設置手動回復

              channel.basicConsume("test_direct_queue1",false, queueingConsumer);

             

              //獲取消息

              Delivery nextDelivery = queueingConsumer.nextDelivery();

              String message=new String(nextDelivery.getBody());

              System.out.println(message);

             

              //手動回復

              channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

       }

 

}

 

通配符模式(一個發送者,一個交換機,多個隊列,多個消費者)

1.通配符發送消息

package com.topic.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

 

/**

 *   通配符模式topic發送消息

 * @author KFS

 *

 */

public class Send {

 

       public static void main(String[] args) throws Exception{

              //1.通過rabbitmq連接工具類創建連接

              Connection connection = ConnectionUtil.getConnection();

              //2.通過連接創建通道

              Channel channel = connection.createChannel();

              //3.通過通道創建交換機

              channel.exchangeDeclare("test_topic_exchange", "topic");

              String message="test_topic";

              //4.發送消息

              channel.basicPublish("test_topic_exchange", "key.1", null, message.getBytes());

              System.out.println("發送消息:"+message);

             

              //關閉資源

              channel.close();

              connection.close();

       }

 

}

 

2.通配符接收消息

package com.topic.rabbitmq;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

 

/**

 *   通配符模式topic接收消息1

 * @author KFS

 *

 */

public class Receive1 {

 

       public static void main(String[] args) throws Exception{

              //1.通過rabbitmq連接工具類創建連接

              Connection connection = ConnectionUtil.getConnection();

              //2.通過連接創建通道

              Channel channel = connection.createChannel();

              //3.通過通道創建隊列

              channel.queueDeclare("test_topic_queue1", false, false, false, null);

              //4.通過通道綁定隊列

              channel.queueBind("test_topic_queue1", "test_topic_exchange", "key.*");

              //同一時刻服務器只發送一條消息

              channel.basicQos(1);

              //5.通過通道創建消費者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              //6.監聽隊列

              channel.basicConsume("test_topic_queue1", queueingConsumer);

             

              //7.接收消息

              Delivery nextDelivery = queueingConsumer.nextDelivery();

              String message=new String(nextDelivery.getBody());

              System.out.println(message);

             

       }

 

}

 

RabbitMQ和Spring整合

1.監聽者類方法

package com.spring.rabbitmq;

/**

 *   監聽類方法

 * @author KFS

 *

 */

public class SpringReceive1 {

       public void receive1(String msg) {

              System.out.println("接收消息:"+msg);

       }

}

 

2.spring和rabbitmq整合xml:applicationContent-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

      

       <!-- 發送者 -->

       <!-- 定義rabbitmq連接工廠 -->

       <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

      

       <!-- 創建rabbitmq管理 -->

       <rabbit:admin connection-factory="factory"/>

       <!-- 創建交換機: -->

       <rabbit:fanout-exchange name="springExchange" auto-declare="true">

       </rabbit:fanout-exchange>

       <!-- 創建rabbitmq模板 -->

       <rabbit:template id="template" exchange="springExchange" connection-factory="factory"/>

      

      

       <!-- 接收者 -->

       <!-- 定義隊列:隊列和交換機綁定去網頁上綁定 -->

       <rabbit:queue name="fanoutQueue" auto-declare="true"/>

      

       <!-- 隊列監聽 -->

       <rabbit:listener-container connection-factory="factory">

              <rabbit:listener ref="springReceive1" method="receive1" queue-names="fanoutQueue"/>

       </rabbit:listener-container>

       <!-- 定義監聽類 -->

       <bean id="springReceive1" class="com.spring.rabbitmq.SpringReceive1"/>

      

</beans>

 

3.發送和接收代碼

package com.spring.rabbitmq;

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.support.AbstractApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

 

/**

 *   rabbitmq和spring整合發送消息和監聽接收消息

 * @author KFS

 *

 */

public class SpringSend {

 

       public static void main(String[] args) throws Exception{

              //讀取xml文件

              AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("classpath:applicationContent-rabbitmq.xml");

              //獲取rabbitmq模板對象

              RabbitTemplate rabbitTemplate=(RabbitTemplate)applicationContext.getBean("template");

              //發送消息

              rabbitTemplate.convertAndSend("spring_send");

              //監聽者會一直監聽

             

              //休眠

              Thread.sleep(1000);

              //摧毀容器

              applicationContext.destroy();

       }

 

}

 

RabbitMQ的使用(后台系統修改,刪除,添加商品會觸發消息隊列,在搜索系統接收消息在進行相應操作)

1.后台系統

a)      需要的rabbitmq與spring的整合文件:applicationContext-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

       <!-- rabbitmq生產者 -->

       <!-- 配置連接工廠 -->

       <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

      

       <!-- MQ的管理器:管理隊列和交換機 -->

       <rabbit:admin connection-factory="factory"/>

       <!-- 定義交換機 -->

       <rabbit:topic-exchange name="taotao_item_topic" auto-declare="true">

       </rabbit:topic-exchange>

       <!-- rabbitmq模板 -->

       <rabbit:template id="rabbitTemplate" connection-factory="factory" exchange="taotao_item_topic"></rabbit:template>

      

</beans>

 

b)      商品業務層代碼

package com.taotao.manage.service;

 

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

 

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.taotao.manage.bean.Item;

import com.taotao.manage.bean.ItemDesc;

import com.taotao.manage.bean.ItemParamItem;

@Service

public class ItemServiceImp extends BaseServiceImp<Item> implements ItemService{

       @Autowired

       private ItemDescService itemDescService;

       @Autowired

       private ItemParamItemService itemParamItemService;

       @Autowired

       private RabbitTemplate rabbitTemplate;

       private ObjectMapper objectMapper=new ObjectMapper();

      

       /**

        *   添加商品和商品描述

        * @param item

        * @param desc

        */

       @Override

       public void insertItem(Item item, String desc,String itemParams) {

              item.setStatus(1);

              this.insert(item);

             

              ItemDesc record=new ItemDesc();

              record.setItemDesc(desc);

              record.setItemId(item.getId());

              itemDescService.insert(record);

             

              ItemParamItem itemParamItem=new ItemParamItem();

              itemParamItem.setItemId(item.getId());

              itemParamItem.setParamData(itemParams);

              itemParamItemService.insert(itemParamItem);

             

              //以下是消息隊列

              Map<String, Object> message=new HashMap<>();

              message.put("type", "item.insert");

              message.put("date", new Date());

              try {

                     message.put("item", objectMapper.writeValueAsString(item));

                     message.put("itemDesc", objectMapper.writeValueAsString(record));

                     message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem));

                     rabbitTemplate.convertAndSend("item.insert", message);

              } catch (JsonProcessingException e) {

                     e.printStackTrace();

              }

             

       }

 

       /**

        *   修改商品和商品描述

        */

       @Override

       public void updateItem(Item item, String desc,String itemParams,Long itemParamId) {

              item.setStatus(1);

              this.update(item);

             

              ItemDesc itemDesc=new ItemDesc();

              itemDesc.setItemId(item.getId());

              itemDesc.setItemDesc(desc);

              itemDescService.update(itemDesc);

             

              ItemParamItem itemParamItem=new ItemParamItem();

              itemParamItem.setId(itemParamId);

              itemParamItem.setParamData(itemParams);

              itemParamItemService.update(itemParamItem);

             

              //以下是消息隊列

              Map<String, Object> message=new HashMap<>();

              message.put("type", "item.update");

              message.put("date", new Date());

              try {

                     message.put("item", objectMapper.writeValueAsString(item));

                     message.put("itemDesc", objectMapper.writeValueAsString(itemDesc));

                     message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem));

                     rabbitTemplate.convertAndSend("item.update", message);

              } catch (JsonProcessingException e) {

                     e.printStackTrace();

              }

       }

 

       /**

        *   批量修改商品狀態status:1-正常,2-下架,3-刪除

        */

       @Override

       public void updateStatus(Long[] ids,Integer status) {

              Item item=new Item();

              item.setStatus(status);

              for(Long id:ids) {

                     item.setId(id);

                     this.update(item);

                    

                     //以下是消息隊列

                     Map<String, Object> message=new HashMap<>();

                     message.put("type", "item.delete");

                     message.put("date", new Date());

                     try {

                            message.put("item", objectMapper.writeValueAsString(item));

                            rabbitTemplate.convertAndSend("item.delete", message);

                     } catch (JsonProcessingException e) {

                            e.printStackTrace();

                     }

              }

       }

      

}

 

2.搜索系統

a)      rabbitmq與spring的整合文件:applicationContext-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

       <!-- rabbitmq消費者 -->

       <!-- 配置連接工廠 -->

       <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

      

       <!-- MQ的管理器:管理隊列和交換機 -->

       <rabbit:admin connection-factory="factory"/>

       <!-- 配置隊列 -->

       <rabbit:queue name="taotao_item_search" auto-declare="true"></rabbit:queue>

       <bean id="itemsListener" class="com.taotao.search.listener.ItemsListener">

       </bean>

       <!-- 隊列監聽 -->

       <rabbit:listener-container connection-factory="factory">

              <rabbit:listener ref="itemsListener" method="execut" queue-names="taotao_item_search"/>

       </rabbit:listener-container>

      

</beans>

 

b)      監聽代碼:rabbitmq傳消息可以傳Object類,但監聽只能監聽到String,其他的會無限循環。

package com.taotao.search.listener;

 

import java.util.Map;

 

import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;

 

import com.fasterxml.jackson.databind.ObjectMapper;

import com.taotao.manage.bean.Item;

import com.taotao.search.service.SearchService;

 

/**

 *   rabbitmq監聽類

 * @author KFS

 *

 */

public class ItemsListener {

       @Autowired

       private SearchService searchService;

       private ObjectMapper objectMapper=new ObjectMapper();

 

       public void execut(Map<String, Object> map) throws Exception{

              /*

               * rabbitmq傳消息可以傳Object類,但監聽只能監聽到String,其他的會無限循環。

               */

              String type=map.get("type").toString();

              if(StringUtils.equals(type, "item.insert") || StringUtils.equals(type, "item.update")) {

                     Item item = objectMapper.readValue(map.get("item").toString(), Item.class);

                     com.taotao.search.bean.Item record=new com.taotao.search.bean.Item();

                     record.setId(item.getId());

                     record.setTitle(item.getTitle());

                     record.setSellPoint(item.getSellPoint());

                     record.setPrice(item.getPrice());

                     record.setNum(item.getNum());

                     record.setBarcode(item.getBarcode());

                     record.setImage(item.getImage());

                     record.setCid(item.getCid());

                     record.setStatus(item.getStatus());

                     searchService.insert(record);

              }else if(StringUtils.equals(type, "item.delete")) {

                     Item item = objectMapper.readValue(map.get("item").toString(), Item.class);

                     searchService.delete(item.getId());

              }

             

       }

 

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM