在linux安裝rabbitmq
依賴
| 依賴 |
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
配置
| 配置 |
| #rabbitmq spring.rabbitmq.host=10.110.3.62 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #消費者 spring.rabbitmq.listener.simple.concurrency= 10 spring.rabbitmq.listener.simple.max-concurrency= 10 #一次取的消費者數量 spring.rabbitmq.listener.simple.prefetch= 1 #消費者自動啟動 spring.rabbitmq.listener.simple.auto-startup=true #消費失敗后重新消費 spring.rabbitmq.listener.simple.default-requeue-rejected= true #發布后重試 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 #每隔多久進行重試 spring.rabbitmq.template.retry.multiplier=1.0 |
提供者
| 提供者-MQSender |
| package com.cxl.shop.rabbitmq;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.cxl.shop.redis.RedisService;
@Service public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired AmqpTemplate amqpTemplate ;
public void sendMiaoshaMessage(MiaoshaMessage mm) { String msg = RedisService.beanToString(mm); log.info("send message:"+msg); amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg); }
// public void send(Object message) { // String msg = RedisService.beanToString(message); // log.info("send message:"+msg); // amqpTemplate.convertAndSend(MQConfig.QUEUE, msg); // } // // public void sendTopic(Object message) { // String msg = RedisService.beanToString(message); // log.info("send topic message:"+msg); // amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1"); // amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2"); // } // // public void sendFanout(Object message) { // String msg = RedisService.beanToString(message); // log.info("send fanout message:"+msg); // amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg); // } // // public void sendHeader(Object message) { // String msg = RedisService.beanToString(message); // log.info("send fanout message:"+msg); // MessageProperties properties = new MessageProperties(); // properties.setHeader("header1", "value1"); // properties.setHeader("header2", "value2"); // Message obj = new Message(msg.getBytes(), properties); // amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj); // }
}
|
消費者
| 消息消費者 |
| package com.cxl.shop.rabbitmq;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import com.cxl.shop.domain.MiaoshaOrder; import com.cxl.shop.domain.MiaoshaUser; import com.cxl.shop.redis.RedisService; import com.cxl.shop.service.GoodsService; import com.cxl.shop.service.MiaoshaService; import com.cxl.shop.service.OrderService; import com.cxl.shop.vo.GoodsVo;
@Service public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@Autowired RedisService redisService;
@Autowired GoodsService goodsService;
@Autowired OrderService orderService;
@Autowired MiaoshaService miaoshaService;
@RabbitListener(queues=MQConfig.MIAOSHA_QUEUE) public void receive(String message) { log.info("receive message:"+message); MiaoshaMessage mm = RedisService.stringToBean(message, MiaoshaMessage.class); MiaoshaUser user = mm.getUser(); long goodsId = mm.getGoodsId();
GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId); int stock = goods.getStockCount(); if(stock <= 0) { return; } //判斷是否已經秒殺到了 MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId); if(order != null) { return; } //減庫存 下訂單 寫入秒殺訂單 miaoshaService.miaosha(user, goods); }
// @RabbitListener(queues=MQConfig.QUEUE) // public void receive(String message) { // log.info("receive message:"+message); // } // // @RabbitListener(queues=MQConfig.TOPIC_QUEUE1) // public void receiveTopic1(String message) { // log.info(" topic queue1 message:"+message); // } // // @RabbitListener(queues=MQConfig.TOPIC_QUEUE2) // public void receiveTopic2(String message) { // log.info(" topic queue2 message:"+message); // } // // @RabbitListener(queues=MQConfig.HEADER_QUEUE) // public void receiveHeaderQueue(byte[] message) { // log.info(" header queue message:"+new String(message)); // } //
}
|
配置類
| MQ的配置類 MQconfig |
| package com.cxl.shop.rabbitmq;
import java.util.HashMap; import java.util.Map;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MQConfig {
public static final String MIAOSHA_QUEUE = "miaosha.queue"; public static final String QUEUE = "queue"; public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String HEADER_QUEUE = "header.queue"; public static final String TOPIC_EXCHANGE = "topicExchage"; public static final String FANOUT_EXCHANGE = "fanoutxchage"; public static final String HEADERS_EXCHANGE = "headersExchage";
/** * Direct模式 交換機Exchange * */ @Bean public Queue queue() { return new Queue(QUEUE, true); }
/** * Topic模式 交換機Exchange * */ @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); } @Bean public TopicExchange topicExchage(){ return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#"); } /** * Fanout模式 交換機Exchange * */ @Bean public FanoutExchange fanoutExchage(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding FanoutBinding1() { return BindingBuilder.bind(topicQueue1()).to(fanoutExchage()); } @Bean public Binding FanoutBinding2() { return BindingBuilder.bind(topicQueue2()).to(fanoutExchage()); } /** * Header模式 交換機Exchange * */ @Bean public HeadersExchange headersExchage(){ return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headerQueue1() { return new Queue(HEADER_QUEUE, true); } @Bean public Binding headerBinding() { Map<String, Object> map = new HashMap<String, Object>(); map.put("header1", "value1"); map.put("header2", "value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match(); }
}
|
輔助類
| 工具類 |
| public static <T> String beanToString(T value) { if(value == null) { return null; } Class<?> clazz = value.getClass(); if(clazz == int.class || clazz == Integer.class) { return ""+value; }else if(clazz == String.class) { return (String)value; }else if(clazz == long.class || clazz == Long.class) { return ""+value; }else { return JSON.toJSONString(value); } }
@SuppressWarnings("unchecked") public static <T> T stringToBean(String str, Class<T> clazz) { if(str == null || str.length() <= 0 || clazz == null) { return null; } if(clazz == int.class || clazz == Integer.class) { return (T)Integer.valueOf(str); }else if(clazz == String.class) { return (T)str; }else if(clazz == long.class || clazz == Long.class) { return (T)Long.valueOf(str); }else { return JSON.toJavaObject(JSON.parseObject(str), clazz); } } |
Contoller
| @Autowired MQSender sender; |
| // @RequestMapping("/mq/header") // @ResponseBody // public Result<String> header() { // sender.sendHeader("hello,imooc"); // return Result.success("Hello,world"); // } // // @RequestMapping("/mq/fanout") // @ResponseBody // public Result<String> fanout() { // sender.sendFanout("hello,imooc"); // return Result.success("Hello,world"); // } // // @RequestMapping("/mq/topic") // @ResponseBody // public Result<String> topic() { // sender.sendTopic("hello,imooc"); // return Result.success("Hello,world"); // } // // @RequestMapping("/mq") // @ResponseBody // public Result<String> mq() { // sender.send("hello,imooc"); // return Result.success("Hello,world"); // } |
問題
啟動時候報錯沒有權限,這個時候不能用guest用戶來進行遠程登錄

找到rabbitMQ的 目錄,創建配置文件
rabbitmq.config
里面配置上
[{rabbit, [{loopback_users, []}]}].
官方文檔:http://www.rabbitmq.com/access-control.html
