關於MQ 消息隊列的通俗理解和 rabbitMQ 使用


 消息隊列,一聽很高大上,現在很多分布式系統都在用這個消息中間件 網上一搜, 說的都是些原理。

 說下我的通俗理解, 你網上買了, 快遞員給你投遞, 會出現什么問題呢? 1  你不定時在家, 快遞員 來了幾次你都不在,不能到你手里。 2. 快遞員很忙,手里一堆貨物, 最后送到你手里就很慢了。

 有問題就要解決,1: 你不定時在家? 1 送到你的房東家,然后你回去挨個找,  2 放到小區的 快遞投遞點,那里很多家 門牌號的 小櫃子 ,你的東西就在那, 你自己去取 。

                       2 : 快遞員很忙? 把快遞放在倉庫, 哪個快遞員有空閑就去給你送。

 這里對應的就是生產消費了。 我們知道, RPC 分布式服務, 把我們的業務系統都給划分了, 而不是之前一個復雜的系統了, 用戶?拆分。 訂單?拆分。 交易?拆分。基礎服務?拆分。關系?拆分。各司其職。 分布式系統好處利大於弊。

 各個業務 明確自己干什么, 另外可以針對性的提示性能 比如 用戶業務訪問的明顯比訂單多, 用戶 業務 服務器加 三台, 如果你是一個復雜的大系統就不好調試了, 跑題了。 各業務必然有關聯, 用戶注冊。必須要通過基礎服務的 發短信, 推送。

  這里就必須要中間件了,消息隊列 有兩種方式,  廣播 ,  排隊,小區快遞櫃對應的各家的, 就是廣播, 你自己拿你自己的。 不耽誤快遞員時間, 不耽誤你時間, 也不耽誤別人的時間, 排隊,就是放在 房東家, 自己挨個去找。 不管如何, 這種異步方式都是

極大的提升了效率。

這里放個RabbitMQ 的簡單示例, 現在各種中間件 都不錯, 各有所長。

1 啟動類

public class RechargeApplication implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(RechargeApplication.class);

    @Autowired
    private Environment env;

    public static void main(String[] args) {
        SpringApplication.run(RechargeApplication.class, args);
//啟動的時候注冊消費者, 一旦 注冊, 消費者就會監聽 隊列,有數據就去拿,基於AMQP協議 , 舉個例子, 你的快遞被放到小區快遞箱了, 手機收到短信, 你的快遞到了。 快去A503 快遞箱收取, 驗證碼 123456. SpringBeanHelper.getBean(Consumer.
class).payConsumer(); } public void run(String... strings) throws Exception { logger.info("Coupon service started, run at {} environment", env.getActiveProfiles()); } }
SpringBeanHelper

public class SpringBeanHelper implements ApplicationContextAware {
    private static final Logger LOG = LoggerFactory.getLogger(SpringBeanHelper.class);
    protected static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        SpringBeanHelper.applicationContext = applicationContext;
    }

    public static <T> T getBean(Class<T> requiredClass) {
        try {
            return applicationContext.getBean(requiredClass);
        } catch (NoSuchBeanDefinitionException e) {
            LOG.warn("no qualifying bean of type: {}", requiredClass);
            return null;
        }
    }

    public static <T> T getBean(String beanName, Class<T> requiredClass) {
        try {
            return applicationContext.getBean(beanName, requiredClass);
        } catch (NoSuchBeanDefinitionException e) {
            LOG.warn("no qualifying bean of name: {}, type: {}", beanName, requiredClass);
            return null;
        }
    }

    public static String projectDir(){
        return System.getProperty("user.dir");
    }

    public static String contextPath() {
        String basePath = applicationContext.getEnvironment().getProperty("server.context-path");
        return StringUtils.isBlank(basePath) ? "" : basePath;
    }

    public static String applicationName() {
        return applicationContext.getEnvironment().getProperty("spring.application.name");
    }

    public static Integer applicationPort(){
        return Integer.parseInt(applicationContext.getEnvironment().getProperty("server.port"));
    }

    public static String applicationEnv() {
        String[] envList = applicationContext.getEnvironment().getActiveProfiles();
        if (!CollectionUtils.isNullOrEmpty(envList)) {
            return envList[0];
        }
        return "";
    }

    /**
     * 判斷給定的 taskNum 是不是本機要同步的房間
     */
    public static boolean isMineTask(long taskNum) {
        DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
        //通過服務發現取所有本服務的實例
        List<ServiceInstance> serviceList = discoveryClient.getInstances(applicationName());
        if (CollectionUtils.isNullOrEmpty(serviceList)) {
            return false;
        }
        SortedSet<EurekaDiscoveryClient.EurekaServiceInstance> esiSet = Sets.newTreeSet((o1, o2) -> {
            if (NetworkUtils.ip2long(o1.getHost()) < NetworkUtils.ip2long(o2.getHost())) {
                return 1;
            } else if(NetworkUtils.ip2long(o1.getHost()) == NetworkUtils.ip2long(o2.getHost())){
                if(o1.getPort() < o2.getPort()){
                    return 1;
                }else {
                    return -1;
                }
            } else {
                return -1;
            }
        });
        //過濾狀態為UP的實例並按IP排序
        for (ServiceInstance si : serviceList) {
            if (si instanceof EurekaDiscoveryClient.EurekaServiceInstance) {
                EurekaDiscoveryClient.EurekaServiceInstance esi = (EurekaDiscoveryClient.EurekaServiceInstance) si;
                if (InstanceInfo.InstanceStatus.UP == esi.getInstanceInfo().getStatus()) {
                    esiSet.add(esi);
                }
            }
        }
        if (CollectionUtils.isNullOrEmpty(esiSet)) {
            return false;
        }
        int index = 0, esiSize = esiSet.size();
        long theIp = NetworkUtils.ip2long(NetworkUtils.ofInnerIp());
        if (1 == esiSet.size()) {
            return theIp == NetworkUtils.ip2long(esiSet.first().getHost());
        }
        //找到本機實例並且當 taskNum 對所有實例數取余等於本機實例 index 值時處理
        for (EurekaDiscoveryClient.EurekaServiceInstance esi : esiSet) {
            if (NetworkUtils.ip2long(esi.getHost()) == theIp && index == taskNum % esiSize) {
                return true;
            }
            index++;
        }
        return false;
    }
}

3 。生成者 , 快遞員:

    public void producer(Enum topic, Object message) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("rabbit producer topic: {}, message: {}", topic, JsonUtils.toJSONString(message));
        }
        try {
            AMQP.BasicProperties props = propertyOf(message);
            senderChannel.queueDeclare(topic.name(), true, false, false, null);
            this.senderChannel.basicPublish("", topic.name(), props, JsonUtils.toJSONBytes(message));
        } catch (Exception e) {
            throw new RuntimeException("rabbitmq send message error.....", e);
        }
    }

 

4 消費者

public class Consumer {


    public static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

    @Autowired
    private RabbitOperations rabbitOperations;

    @Autowired
    private RechargeService rechargeService;

    /**
     * 支付成功回調
     */
    public void payConsumer() {
        rabbitOperations.consumer(TradeCallBackTopic.CHARGE, 3, (contentType, body) -> {
            String content = StringUtils.toEncodedString(body, Charset.forName("UTF-8"));
            LOGGER.info("=======消費者:payConsumer start, content:{}============", content);
            CallBackDto dto = JSON.parseObject(content, CallBackDto.class);
            if (dto.getCallBackType() == PaymentConstant.CallBackNotifyType.PAY
                    && dto.getTradeStatus() == PaymentConstant.TradeStatus.SUCCESS) {
            }

        });


    }


}

 

 

 

 

 

 

 

 

         

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM