簡單模式(一個生產者,一個隊列,一個消費者)
1.導入jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> |
2.創建rabbitmq連接的工具類
package com.rabbitmq.util;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 創建rabbitmq連接的工具類 * @author KFS * */ public class ConnectionUtil {
public static Connection getConnection() throws Exception{ //創建連接工廠 ConnectionFactory connectionFactory=new ConnectionFactory(); //設置參數 connectionFactory.setHost("127.0.0.1");//主機ip connectionFactory.setVirtualHost("/taotao");//虛擬主機名 connectionFactory.setUsername("admin");//賬號 connectionFactory.setPassword("admin");//密碼 //創建連接 Connection newConnection = connectionFactory.newConnection(); return newConnection; } } |
3.simple模式發送消息
package com.simple.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtil; /** * simple模式發送消息 * @author KFS * */ public class Send {
public static void main(String[] args) throws Exception{ //通過rabbitmq工具類得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建消息隊列(如果有可以不用創建,但創建會覆蓋之前的) * 第一參數:隊列名稱 * 第二參數:隊列是否持久化(存儲到磁盤) * 第三參數:隊列是否被獨占 * 第四參數:隊列是否自動刪除 * 第五參數: */ channel.queueDeclare("test_simple_queue", false, false, false, null); //創建消息 String message="simple_queue"; /* * 發送消息 * 第一參數:交換機名(簡單模式不用交換機,但不能用null) * 第二參數:隊列名稱 * 第三參數: * 第四參數:消息(字節流) * */ channel.basicPublish("", "test_simple_queue", null, message.getBytes()); System.out.println("發送的消息:"+message); //關閉資源 channel.close(); connection.close(); }
} |
4.simple模式接受消息
package com.simple.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.util.ConnectionUtil; /** * simple模式接受消息 * @author KFS * */ public class Receive {
public static void main(String[] args) throws Exception{ //通過rabbitmq工具類得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建消息隊列(如果有可以不用創建,但創建會覆蓋之前的) * 第一參數:隊列名稱 * 第二參數:隊列是否持久化(存儲到磁盤) * 第三參數:隊列是否被獨占 * 第四參數:隊列是否自動刪除 * 第五參數: */ channel.queueDeclare("test_simple_queue", false, false, false, null); //定義消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); /* * 監聽隊列 * 第一參數:隊列名稱 * 第二參數:是否自動回復完成接受 * 第三參數:消費者名稱 */ channel.basicConsume("test_simple_queue",true, queueingConsumer);
while(true) { //獲取消息 Delivery nextDelivery = queueingConsumer.nextDelivery(); //打印消息 String message=new String(nextDelivery.getBody()); System.out.println(message); }
}
} |
work模式(一個生產者,一個隊列,多個消費者)
1.work模式發送消息
package com.work.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtil; /** * work模式發送消息 * @author KFS * */ public class Send {
public static void main(String[] args) throws Exception{ //通過rabbitmq連接工具類得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建消息隊列(如果有就不用創建,但創建不會錯) * 第一參數:隊列名稱 * 第二參數:該隊列是否持久化 * 第三參數:該隊列是否被獨占 * 第四參數:該隊列是否自動刪除 * 第五參數: * */ channel.queueDeclare("test_work_queue", false, false, false, null); for(int i=1;i<=100;i++) { //創建消息 String message="work_queue"+i; /* * 發送消息 * 第一參數:交換機名 * 第二參數:隊列名稱 * 第三參數: * 第四參數:消息(字節流) * */ channel.basicPublish("", "test_work_queue", null, message.getBytes()); System.out.println("work_發送的消息:"+message); } //關閉資源 channel.close(); connection.close(); }
} |
a) work模式平均接受消息(一個就是一個消費者,可以創建多個消費者,不過是平均分配消息。)
package com.work.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.util.ConnectionUtil; /** * work模式接受消息1 * @author KFS * */ public class Receive1 {
public static void main(String[] args) throws Exception{ //得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建消息隊列(如果有就不用創建,但創建不會錯) * 第一參數:隊列名稱 * 第二參數:該隊列是否持久化 * 第三參數:該隊列是否被獨占 * 第四參數:該隊列是否自動刪除 * 第五參數: * */ channel.queueDeclare("test_work_queue", false, false, false, null); //創建隊列消費者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel); /* * 監聽隊列中的內容 * 第一參數:隊列名稱 * 第二參數:是否自動回復 * */ channel.basicConsume("test_work_queue",true, queueingConsumer);
//獲取消息 while(true) { Delivery nextDelivery = queueingConsumer.nextDelivery(); String message=new String(nextDelivery.getBody()); Thread.sleep(1000); System.out.println("接受消息:"+message); } }
} |
b) work模式能者多勞接受消息(一個生產者,一個隊列,可以創建多個消費者,不過這是能者多勞的)
package com.work.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.util.ConnectionUtil; /** * work模式接受消息1 * @author KFS * */ public class Receive1 {
public static void main(String[] args) throws Exception{ //得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建消息隊列(如果有就不用創建,但創建不會錯) * 第一參數:隊列名稱 * 第二參數:該隊列是否持久化 * 第三參數:該隊列是否被獨占 * 第四參數:該隊列是否自動刪除 * 第五參數: * */ channel.queueDeclare("test_work_queue", false, false, false, null); //創建隊列消費者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel); //設置同一時刻只會發送一條消息給消費者(要放在監聽隊列內容之上) channel.basicQos(1); /* * 監聽隊列中的內容 * 第一參數:隊列名稱 * 第二參數:是否自動回復(能者多勞需要手動回復) * */ channel.basicConsume("test_work_queue",false, queueingConsumer);
//獲取消息 while(true) { Delivery nextDelivery = queueingConsumer.nextDelivery(); String message=new String(nextDelivery.getBody()); Thread.sleep(10); System.out.println("接受消息:"+message); //手動回復 channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false); } }
} |
訂閱模式,交換機,exchange:fanout(一個生產者,一個交換機,多個隊列,多個消費者)
1.訂閱模式發送消息
package com.fanout.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtil; /** * 訂閱模式(exchange,交換機)發送消息 * @author KFS * */ public class Send {
public static void main(String[] args) throws Exception{ //通過rabbitmq連接工具類得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建交換機exchange * 第一參數:交換機名稱 * 第二參數:交換機類型: * */ channel.exchangeDeclare("test_fanout", "fanout");
//消息內容 String message="testFanout"; /* * 發送消息 * 第一參數:交換機名稱 * 第二參數: * 第三參數: * 第四參數:消息(字節流) * */ channel.basicPublish("test_fanout", "", null, message.getBytes()); System.out.println("發送消息:"+message);
//關閉資源 channel.close(); connection.close(); }
} |
2.訂閱模式接收消息(一個生產者,一個交換機,多個隊列,隊列名稱不同,多個消費者,消費者都能收到消息)
package com.fanout.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.util.ConnectionUtil; /** * 訂閱模式接受消息 * @author KFS * */ public class Receive1 {
public static void main(String[] args) throws Exception{ //通過rabbitmq連接工具類得到連接 Connection connection=ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); /* * 創建交換機exchange * 第一參數:交換機名稱 * 第二參數:交換機類型: * */ channel.exchangeDeclare("test_fanout", "fanout"); //創建隊列 channel.queueDeclare("test_fanout_queue1", false, false, false, null); /* * 綁定隊列到交換機 * 第一參數:隊列名稱 * 第二參數:交換機名稱 * 第三參數: * */ channel.queueBind("test_fanout_queue1", "test_fanout", ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1);
//定義消費者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel); //監聽隊列 channel.basicConsume("test_fanout_queue1", false,queueingConsumer);
while(true) { //獲取消息 Delivery nextDelivery = queueingConsumer.nextDelivery(); String message=new String(nextDelivery.getBody()); System.out.println(message); Thread.sleep(10);
//手動回復完成 channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false); }
}
} |
路由模式,exchange:direct(和訂閱模式有相同點,一個生產者,一個交換機,多個隊列,不同的隊列名,多個消費者)
1.路由模式發送消息
package com.direct.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtil;
/** * 路由模式redirect發送消息 * @author KFS * */ public class Send {
public static void main(String[] args) throws Exception{ //1.通過rabbitmq連接工具類創建連接 Connection connection = ConnectionUtil.getConnection(); //2通過連接.創建通道 Channel channel = connection.createChannel(); /* * 3.創建交換機 * 第一參數:交換機名稱 * 第二參數:交換機類型 * */ channel.exchangeDeclare("test_direct_exchange", "direct");
String message="test_direct"; /* * 4.發送消息 * 第一參數:交換機名稱 * 第二參數:鑰匙(接受方的是這個的才能接受) * 第三參數: * 第四參數:消息(字節流) * */ channel.basicPublish("test_direct_exchange", "key1", null, message.getBytes()); System.out.println("發送消息:"+message);
//關閉資源 channel.close(); connection.close(); }
} |
2.路由模式接收消息(一個生產者,一個交換機,多個隊列,隊列名稱不同,多個消費者,消費者都能收到消息)
package com.direct.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.util.ConnectionUtil;
/** * 路由模式接收消息1 * @author KFS * */ public class Receive1 {
public static void main(String[] args) throws Exception{ //1.通過rabbitmq連接工具類創建連接 Connection connection = ConnectionUtil.getConnection(); //2.通過連接創建通道 Channel channel = connection.createChannel(); /* * 3.通過通道創建隊列 * */ channel.queueDeclare("test_direct_queue1", false, false, false, null); /* * 4.綁定隊列和交換機 * 第一參數:隊列名稱 * 第二參數:交換機名稱 * 第三參數:鑰匙 * */ channel.queueBind("test_direct_queue1", "test_direct_exchange", "key1"); //同一時刻服務器只發一條消息 channel.basicQos(1);
//定義隊列消費者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel); //監聽隊列,設置手動回復 channel.basicConsume("test_direct_queue1",false, queueingConsumer);
//獲取消息 Delivery nextDelivery = queueingConsumer.nextDelivery(); String message=new String(nextDelivery.getBody()); System.out.println(message);
//手動回復 channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false); }
} |
通配符模式(一個發送者,一個交換機,多個隊列,多個消費者)
1.通配符發送消息
package com.topic.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtil;
/** * 通配符模式topic發送消息 * @author KFS * */ public class Send {
public static void main(String[] args) throws Exception{ //1.通過rabbitmq連接工具類創建連接 Connection connection = ConnectionUtil.getConnection(); //2.通過連接創建通道 Channel channel = connection.createChannel(); //3.通過通道創建交換機 channel.exchangeDeclare("test_topic_exchange", "topic"); String message="test_topic"; //4.發送消息 channel.basicPublish("test_topic_exchange", "key.1", null, message.getBytes()); System.out.println("發送消息:"+message);
//關閉資源 channel.close(); connection.close(); }
} |
2.通配符接收消息
package com.topic.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.util.ConnectionUtil;
/** * 通配符模式topic接收消息1 * @author KFS * */ public class Receive1 {
public static void main(String[] args) throws Exception{ //1.通過rabbitmq連接工具類創建連接 Connection connection = ConnectionUtil.getConnection(); //2.通過連接創建通道 Channel channel = connection.createChannel(); //3.通過通道創建隊列 channel.queueDeclare("test_topic_queue1", false, false, false, null); //4.通過通道綁定隊列 channel.queueBind("test_topic_queue1", "test_topic_exchange", "key.*"); //同一時刻服務器只發送一條消息 channel.basicQos(1); //5.通過通道創建消費者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel); //6.監聽隊列 channel.basicConsume("test_topic_queue1", queueingConsumer);
//7.接收消息 Delivery nextDelivery = queueingConsumer.nextDelivery(); String message=new String(nextDelivery.getBody()); System.out.println(message);
}
} |
RabbitMQ和Spring整合
1.監聽者類方法
package com.spring.rabbitmq; /** * 監聽類方法 * @author KFS * */ public class SpringReceive1 { public void receive1(String msg) { System.out.println("接收消息:"+msg); } } |
2.spring和rabbitmq整合xml:applicationContent-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<!-- 發送者 --> <!-- 定義rabbitmq連接工廠 --> <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>
<!-- 創建rabbitmq管理 --> <rabbit:admin connection-factory="factory"/> <!-- 創建交換機: --> <rabbit:fanout-exchange name="springExchange" auto-declare="true"> </rabbit:fanout-exchange> <!-- 創建rabbitmq模板 --> <rabbit:template id="template" exchange="springExchange" connection-factory="factory"/>
<!-- 接收者 --> <!-- 定義隊列:隊列和交換機綁定去網頁上綁定 --> <rabbit:queue name="fanoutQueue" auto-declare="true"/>
<!-- 隊列監聽 --> <rabbit:listener-container connection-factory="factory"> <rabbit:listener ref="springReceive1" method="receive1" queue-names="fanoutQueue"/> </rabbit:listener-container> <!-- 定義監聽類 --> <bean id="springReceive1" class="com.spring.rabbitmq.SpringReceive1"/>
</beans> |
3.發送和接收代碼
package com.spring.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;
/** * rabbitmq和spring整合發送消息和監聽接收消息 * @author KFS * */ public class SpringSend {
public static void main(String[] args) throws Exception{ //讀取xml文件 AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("classpath:applicationContent-rabbitmq.xml"); //獲取rabbitmq模板對象 RabbitTemplate rabbitTemplate=(RabbitTemplate)applicationContext.getBean("template"); //發送消息 rabbitTemplate.convertAndSend("spring_send"); //監聽者會一直監聽
//休眠 Thread.sleep(1000); //摧毀容器 applicationContext.destroy(); }
} |
RabbitMQ的使用(后台系統修改,刪除,添加商品會觸發消息隊列,在搜索系統接收消息在進行相應操作)
1.后台系統
a) 需要的rabbitmq與spring的整合文件:applicationContext-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <!-- rabbitmq生產者 --> <!-- 配置連接工廠 --> <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>
<!-- MQ的管理器:管理隊列和交換機 --> <rabbit:admin connection-factory="factory"/> <!-- 定義交換機 --> <rabbit:topic-exchange name="taotao_item_topic" auto-declare="true"> </rabbit:topic-exchange> <!-- rabbitmq模板 --> <rabbit:template id="rabbitTemplate" connection-factory="factory" exchange="taotao_item_topic"></rabbit:template>
</beans> |
b) 商品業務層代碼
package com.taotao.manage.service;
import java.util.Date; import java.util.HashMap; import java.util.Map;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.taotao.manage.bean.Item; import com.taotao.manage.bean.ItemDesc; import com.taotao.manage.bean.ItemParamItem; @Service public class ItemServiceImp extends BaseServiceImp<Item> implements ItemService{ @Autowired private ItemDescService itemDescService; @Autowired private ItemParamItemService itemParamItemService; @Autowired private RabbitTemplate rabbitTemplate; private ObjectMapper objectMapper=new ObjectMapper();
/** * 添加商品和商品描述 * @param item * @param desc */ @Override public void insertItem(Item item, String desc,String itemParams) { item.setStatus(1); this.insert(item);
ItemDesc record=new ItemDesc(); record.setItemDesc(desc); record.setItemId(item.getId()); itemDescService.insert(record);
ItemParamItem itemParamItem=new ItemParamItem(); itemParamItem.setItemId(item.getId()); itemParamItem.setParamData(itemParams); itemParamItemService.insert(itemParamItem);
//以下是消息隊列 Map<String, Object> message=new HashMap<>(); message.put("type", "item.insert"); message.put("date", new Date()); try { message.put("item", objectMapper.writeValueAsString(item)); message.put("itemDesc", objectMapper.writeValueAsString(record)); message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem)); rabbitTemplate.convertAndSend("item.insert", message); } catch (JsonProcessingException e) { e.printStackTrace(); }
}
/** * 修改商品和商品描述 */ @Override public void updateItem(Item item, String desc,String itemParams,Long itemParamId) { item.setStatus(1); this.update(item);
ItemDesc itemDesc=new ItemDesc(); itemDesc.setItemId(item.getId()); itemDesc.setItemDesc(desc); itemDescService.update(itemDesc);
ItemParamItem itemParamItem=new ItemParamItem(); itemParamItem.setId(itemParamId); itemParamItem.setParamData(itemParams); itemParamItemService.update(itemParamItem);
//以下是消息隊列 Map<String, Object> message=new HashMap<>(); message.put("type", "item.update"); message.put("date", new Date()); try { message.put("item", objectMapper.writeValueAsString(item)); message.put("itemDesc", objectMapper.writeValueAsString(itemDesc)); message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem)); rabbitTemplate.convertAndSend("item.update", message); } catch (JsonProcessingException e) { e.printStackTrace(); } }
/** * 批量修改商品狀態status:1-正常,2-下架,3-刪除 */ @Override public void updateStatus(Long[] ids,Integer status) { Item item=new Item(); item.setStatus(status); for(Long id:ids) { item.setId(id); this.update(item);
//以下是消息隊列 Map<String, Object> message=new HashMap<>(); message.put("type", "item.delete"); message.put("date", new Date()); try { message.put("item", objectMapper.writeValueAsString(item)); rabbitTemplate.convertAndSend("item.delete", message); } catch (JsonProcessingException e) { e.printStackTrace(); } } }
} |
2.搜索系統
a) rabbitmq與spring的整合文件:applicationContext-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <!-- rabbitmq消費者 --> <!-- 配置連接工廠 --> <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>
<!-- MQ的管理器:管理隊列和交換機 --> <rabbit:admin connection-factory="factory"/> <!-- 配置隊列 --> <rabbit:queue name="taotao_item_search" auto-declare="true"></rabbit:queue> <bean id="itemsListener" class="com.taotao.search.listener.ItemsListener"> </bean> <!-- 隊列監聽 --> <rabbit:listener-container connection-factory="factory"> <rabbit:listener ref="itemsListener" method="execut" queue-names="taotao_item_search"/> </rabbit:listener-container>
</beans> |
b) 監聽代碼:rabbitmq傳消息可以傳Object類,但監聽只能監聽到String,其他的會無限循環。
package com.taotao.search.listener;
import java.util.Map;
import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired;
import com.fasterxml.jackson.databind.ObjectMapper; import com.taotao.manage.bean.Item; import com.taotao.search.service.SearchService;
/** * rabbitmq監聽類 * @author KFS * */ public class ItemsListener { @Autowired private SearchService searchService; private ObjectMapper objectMapper=new ObjectMapper();
public void execut(Map<String, Object> map) throws Exception{ /* * rabbitmq傳消息可以傳Object類,但監聽只能監聽到String,其他的會無限循環。 */ String type=map.get("type").toString(); if(StringUtils.equals(type, "item.insert") || StringUtils.equals(type, "item.update")) { Item item = objectMapper.readValue(map.get("item").toString(), Item.class); com.taotao.search.bean.Item record=new com.taotao.search.bean.Item(); record.setId(item.getId()); record.setTitle(item.getTitle()); record.setSellPoint(item.getSellPoint()); record.setPrice(item.getPrice()); record.setNum(item.getNum()); record.setBarcode(item.getBarcode()); record.setImage(item.getImage()); record.setCid(item.getCid()); record.setStatus(item.getStatus()); searchService.insert(record); }else if(StringUtils.equals(type, "item.delete")) { Item item = objectMapper.readValue(map.get("item").toString(), Item.class); searchService.delete(item.getId()); }
}
} |