這個地址圖文會更清晰:https://www.jianshu.com/p/537e87c64ac7
單機系統的時候,客戶端和連接都有同一台服務器管理。

在本地維護一份userId到connetciont的映射
服務器可以根據userId找出對應的連接,然后把消息push出去

但是集群環境下,連接分布在不同的機器,小明向小張發消息時跨了機器

小明向小張發的消息,需要小張的對應連接的服務器才能推送
要完成這個需求需要解決兩個問題:
1、聊天服務器這么多,怎么才能知道小張連接了哪一台機器?
2、知道是哪一台服務器,怎么才能把消息精准發送到對應的機器?

有一種解決方案,就是把消息全量廣播,然后每台機器自己去判斷小張是不是在本機有連接,然后對應推送消息。但是這個方案明顯不可取,因為這樣會造成網絡風暴,滿天飛羽的消息在廣播。
個人解決方案
一、對於第一個問題,怎么才能知道小張連接了哪一台機器,可以用redis去做一個map映射。
每台機器啟動的時候都分配一個機器id,
當用戶創建連接的時候,在redis放一個key-value,key值放的是userId,value放的是機器id。
這樣就可以根據userId找對應的連接機器

二、對於第二個問題,怎么才能把消息精准發送到對應的機器,可以讓每台機器啟動的時候都有一個專屬的mq和專屬的routingkey,使用rabbitMq的TopicExchange交換器,這樣消息就能精准投遞到對應的機器,routingKey可以用上面定義的機器id。
同時queue的熟悉選專屬隊列,這樣服務器重啟后,連接斷開后,舊的隊列會自動刪除

全過程就是:
1、小明要發消息給小張
2、根據小張userId找到小張所在的機器b
3、構建messageBody,把routingKey設置成b
4、向mq發送消息,rabbitMq根據routingKey把消息分發給機器b;
以下是代碼:
1、新建springboot工程,maven如下
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2、新建Constant做固定配置
public interface Constant { //隊列名稱,這個用uuid String QUEUE = "queue:"+UUID.randomUUID().toString(); //路由鍵,作為機器id String ROUTING_KEY = "routingKey:"+new Date().getTime(); //固定值,代表消息主題 String TOPIC = "topic"; //redsikey的前綴 String REDIS_KEY = "redisKey:"; }
3、配置RabbitmqConfig
@Configuration public class RabbitmqConfig { //新建topic交換器 @Bean TopicExchange initExchange() { return new TopicExchange(Constant.TOPIC); } //新建隊列,要設置獨占隊列,exclusive @Bean public Queue initQueue() { Queue queue = QueueBuilder.durable(Constant.QUEUE).exclusive().autoDelete().build(); return queue; } //交換器和主題綁定 @Bean Binding bindingiTopicExchange() { return BindingBuilder.bind(initQueue()).to(initExchange()).with(Constant.ROUTING_KEY); } //新建消費者 @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 指定消費者 container.setMessageListener(listener); // 指定監聽的隊列 container.setQueueNames(Constant.QUEUE); // 設置消費者的 ack 模式為手動確認模式 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setPrefetchCount(300); //connection return container; } }
4、配置消費者Consumer
@Slf4j @Component public class Consumer implements ChannelAwareMessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Override public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); // 代表投遞的標識符,唯一標識了當前信道上的投遞,通過 deliveryTag ,消費者就可以告訴 RabbitMQ 確認收到了當前消息,見下面的方法 long deliveryTag = messageProperties.getDeliveryTag(); // 如果是重復投遞的消息,redelivered 為 true Boolean redelivered = messageProperties.getRedelivered(); // 獲取生產者發送的原始消息 Object originalMessage = rabbitTemplate.getMessageConverter().fromMessage(message); log.info("接受到消息:{}",originalMessage); // 代表消費者確認收到當前消息,第二個參數表示一次是否 ack 多條消息 channel.basicAck(deliveryTag, false); } }
5、設置UserController
@Slf4j
@RequestMapping
@RestController
public class UserController { @Autowired private RedisTemplate redisTemplate; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg") public String sendMsg(@RequestParam Integer sendUserId,@RequestParam Integer recUserId,String msg ){ MessageProperties messageProperties = new MessageProperties(); String messageBody = msg; Message message = rabbitTemplate.getMessageConverter().toMessage(messageBody, messageProperties); String key = Constant.REDIS_KEY+recUserId; Object o = redisTemplate.opsForValue().get(key); if(o == null){ throw new RuntimeException("recUserId未建立連接:"+recUserId); } rabbitTemplate.convertAndSend(Constant.TOPIC, o.toString(),message); return "success"; } @GetMapping("/buildRoutingKey") public String buildRoutingKey(@RequestParam Integer recUserId){ String key = Constant.REDIS_KEY+recUserId; log.info("key={},value={}",key,Constant.ROUTING_KEY); redisTemplate.opsForValue().set(key,Constant.ROUTING_KEY); return "success"; } }
6、實驗環節,需要部署兩個jar包
分別在2台服務器發起請求初始化連接。
在A服務器輸入 curl localhost:8081//buildRoutingKey?recUserId=1
在B服務器輸入 curl localhost:8082//buildRoutingKey?recUserId=2
然后發送消息
在任意一台服務器輸入 curl localhost:8082/sendMsg?sendUserId=2&recUserId=1&msg=xiaozhangHello
在任意一台服務器輸入 curl localhost:8082/sendMsg?sendUserId=1&recUserId=2&msg=xiaoMingHello


代碼地址
https://github.com/hd-eujian/rabbitmq-share.git
https://gitee.com/guoeryyj/rabbitmq-share.git