springBoot + rabbitMQ +手動確認消息 + 控制(接口、定時任務)消費者上下線


這里只貼消費者的部分代碼

第一部分:手動ack配置

package com.mybatis.plus.config.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 *
 * 描述: rabbitMQ配置
 *
 * @author 官昌洪
 * @date 2021/12/17 11:24
 * @version V1.0
 */
@Configuration
public class MessageListenerConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

}

 

第二部分:消費消息

package com.mybatis.plus.config.mq;

import com.alibaba.fastjson.JSONObject;
import com.mybatis.plus.entity.Log;
import com.mybatis.plus.utils.EurekaUtils;
import com.mybatis.plus.utils.hash.ConsistentHash;
import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class Receiver {

    @Value("${server.port}")
    private String port;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(id = "testDirectQueueId1", autoStartup = "false", queues = "testDirectQueue")
    public void consumer(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(500);

            if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) {
                String msg = new String(message.getBody(), "UTF-8");
                Log parseObject = JSONObject.parseObject(msg, Log.class);
                log.info("消費的消息來自的隊列名為:" + message.getMessageProperties().getConsumerQueue());
                log.info("消息成功消費到  messageId:" + parseObject.getLogUuid() + "  messageData:" + parseObject.getLogTitle() + "  createTime:" + parseObject.getCreateTime());
                log.info("================================");
                // 收到來自主機的消息 進行一致性hash分配 發往不同的服務
                // 獲取服務節點 創建一致hash環
                ConsistentHash consistentHash = InitConfig.consistentHash;
                List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2");
                if (!allServiceAddr.isEmpty()) {
                    for (Map<String, String> stringMap : allServiceAddr) {
                        String instanceId = stringMap.get("routeKey");
                        // 新增1個物理節點和150個對應的虛擬節點
//                        String instanceId = stringMap.get("queueKey");
                        // 如果hash環中沒有該節點 才新增
                        ConsistentHashNode node = consistentHash.getAccurateNode(instanceId);
                        if (null == node) {
                            consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId), instanceId), 150);
                        }
                    }
                } else {
                    //沒有服務提供者 將消息返回隊列
                    channel.basicReject(deliveryTag, true);
                    return;
                }

                channel.basicAck(deliveryTag, false); //第二個參數,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有消息
                // 提取消息中的某個代表來源主機的標識 然后在hash環上分配目標節點
                String logUuid = parseObject.getLogUuid();
                ConsistentHashNode node = consistentHash.getNode(logUuid);
                log.info("主機標識:{},分配節點:{}", logUuid, node.getTarget());
                //向指定路由發送消息
                // todo 問題 這里怎么保證隊列預先創建初始化好 解決方案 先從配置文件獲取隊列名稱 新增服務時 需要重啟服務
                rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg);
//                planTwo(parseObject);
                log.info(">>>>>>>>>>>>消費消息成功!");
            }
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>消費消息失敗!失敗消息ID:{}, 失敗原因:{}", deliveryTag, e.getMessage());
            channel.basicReject(deliveryTag, true);
        }
    }
}

 

第三部分:控制消費者開啟,關閉

@Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @RequestMapping("/startCustomer")
    public R startCustomer(){
        MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1");
        consumer.start();
        return R.ok();
    }

    @RequestMapping("/stopCustomer")
    public R stopCustomer(){
        MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1");
        consumer.stop();
        return R.ok();
    }

 

主要還是指定 RabbitListener 注解的ID屬性進行控制


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM