rocketmq延遲隊列(延時消息)精確到秒級實現(實現總結編)


    前言篇:

  為了節約成本,決定通過自研來改造rocketmq,添加任意時間延遲的延時隊列,開源版本的rocketmq只有支持18個等級的延遲時間,

其實對於大部分的功能是夠用了的,但是以前的項目,全部都是使用了阿里雲的rocketmq,原因是不同的供應商的訂單的延時時間是不同的

(部分供應商的訂單未支付30分鍾取消,有些1個半小時取消,各種時間都有),

所以使用了大量的延時隊列,但是開源版本不支持任意時間延時(希望官方支持這個功能)

   為了實現這個功能,網上查詢了不少資料,查詢到不少相關的文章,主要實現都是基於時間輪來實現的,

但是比較少開源的代碼實現(也許大家都沒有這個需求吧)

 

debug實踐篇:

1. 擼起袖子加油干,首先,下載源代碼 https://github.com/apache/rocketmq.git,導入ide

運行mvn  package 生成jar包,如果成功的話,會生成到distribution目錄下面

 

 

2. 查看文檔,發現要運行namesvr 和 broker 

找到 src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java ,開心的執行main方法,

哦哦哦哦哦,果然報錯了,提示 rocketmq.home.dir 目錄不存在,查看源碼, 原來是從system.propeties讀取的,

為了調試,我毫不猶豫的加上了配置文件,

 

再次運行,不報錯了,控制台顯示,成功啦(生活是多么美好,空氣是多么清晰!

 

 

 

3.運行 broker ,打開 src\main\java\org\apache\rocketmq\broker\BrokerStartup.java,執行main方法,

添加 配置文件 (D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路徑,你要修改成自己的

1 System.setProperty("rocketmq.home.dir", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb");
2 System.setProperty("user.home", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");


 

運行一下,成功了,開心的發一條消息,試試,哦哦哦哦哦。發不出去哦(人生最痛苦的事情是,快要成功了,卻沒有成功

原來還要配置namesvr地址,在啟動命令,添加 -n   localhost:9876   ( 上面的namesvr 啟動的ip和端口)

 

 

 

4.漫長的改造之路 (我們是勇敢的斯巴達勇士,一直勇往直前)

用了阿里雲的延時隊列,發現它的message 可以傳一個時間過來(任意的延時時間)

來來來,我們復制一下(不要告訴別人,我們一直是復制,粘貼的,沒有原創, 噓 ......

 

 1     /**
 2      * 該類預定義一些系統鍵.
 3      */
 4     static public class SystemPropKey {
 5         public static final String TAG = "__TAG";
 6         public static final String KEY = "__KEY";
 7         public static final String MSGID = "__MSGID";
 8         public static final String SHARDINGKEY = "__SHARDINGKEY";
 9         public static final String RECONSUMETIMES = "__RECONSUMETIMES";
10         public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";
11         public static final String BORNHOST = "__BORNHOST";
12         /**
13          * 設置消息的定時投遞時間(絕對時間). <p>例1: 延遲投遞, 延遲3s投遞, 設置為: System.currentTimeMillis() + 3000; <p>例2: 定時投遞,
14          * 2016-02-01 11:30:00投遞, 設置為: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01
15          * 11:30:00").getTime()
16          */
17         public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";
18     }

 

/**
     * <p> 設置消息的定時投遞時間(絕對時間),最大延遲時間為7天. </p> <ol> <li>延遲投遞: 延遲3s投遞, 設置為: System.currentTimeMillis() + 3000;</li>
     * <li>定時投遞: 2016-02-01 11:30:00投遞, 設置為: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01
     * 11:30:00").getTime()</li> </ol>
     */
    public void setStartDeliverTime(final long value) {
        putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value));
    }

 

5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不負有心人,找到啦,

找到  \src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java, 發現 

public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }


繼續debug,發現 sendMessage 就是處理發送消息的,

如果我們在這里判斷是否延時消息就寫入文件,然后返回成功到客戶端,等到了時間就發送延遲消息,不就搞定了嗎?

oh,yes,就是這么干的

        //處理延遲消息 delay message

        String startTime = msgInner.getProperty(Message.SystemPropKey.STARTDELIVERTIME);
        boolean isDelayMsg = false;
        long nextStartTime = 0;
        if (startTime != null && msgInner.getDelayTimeLevel() <= 0) {
            nextStartTime = Long.parseLong(startTime);
            if (nextStartTime >= System.currentTimeMillis()) {
                isDelayMsg = true;
            }
        }
        if (isDelayMsg) {
            return delayProcessor.handlePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime);
        } else {
            if (traFlag != null && Boolean.parseBoolean(traFlag)) {
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                    + "] sending transaction message is forbidden");
                    return response;
                }
                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
            } else {
                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
            }

            return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

        }
    }

 

其中 delayProcessor.handlePutMessageResultFuture 是我們用來處理延遲消息的地方

我們按照每個時間一個文件夾來保存延時消息,等延時消息到達后,定時的寫入延時隊列里面。

詳細原理,請查考 rocketmq 原理實現篇  https://www.cnblogs.com/tomj2ee/p/15815186.html

 
package org.apache.rocketmq.broker.delay;

import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

public class DelayProcessor implements  Runnable {

    protected static final InternalLogger log = InternalLoggerFactory.getLogger(DelayProcessor.class.getCanonicalName());

    protected final BrokerController brokerController;
    protected final SocketAddress storeHost;

    private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(16);


    public DelayProcessor(final BrokerController brokerController) {
        this.brokerController = brokerController;
        this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
                .getNettyServerConfig().getListenPort());
        Thread thread = new Thread(this);
        thread.setName("delayProcessor-run---thread");

        thread.setDaemon(true);
        new File(getDelayPath()).mkdirs();
        thread.start();
        Thread missCallThread = new Thread(() -> {
            try {
                for(;;) {
                    Thread.sleep(10 * 1000);
                    sendMissCallMsg();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        missCallThread.setName("delayProcessor-callback-thread");
        missCallThread.start();
        System.out.println("init delay success " +getDelayPath());

    }

    public RemotingCommand handlePutMessageResultFuture(RemotingCommand response,
                                                        RemotingCommand request,
                                                        MessageExtBrokerInner msgInner,
                                                        ChannelHandlerContext ctx,
                                                        int queueIdInt, long nextStartTime) {
        return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime);

    }

    private RemotingCommand handlePutMessageResult(RemotingCommand response,
                                                   RemotingCommand request, MessageExtBrokerInner msg,
                                                   ChannelHandlerContext ctx,
                                                   int queueIdInt, long nextStartTime) {
        boolean svOk = saveMsgFile(nextStartTime, msg);
        SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader();
        sendMessageResponseHeader.setQueueId(1);
        sendMessageResponseHeader.setMsgId("0");
        sendMessageResponseHeader.setQueueOffset(0l);
        sendMessageResponseHeader.setTransactionId("");
        RemotingCommand newCommand = RemotingCommand.createRequestCommand(ResponseCode.SUCCESS, sendMessageResponseHeader);

        if (svOk) {
            newCommand.setCode(ResponseCode.SUCCESS);
        } else {
            newCommand.setCode(ResponseCode.SYSTEM_ERROR);
            newCommand.setRemark("發送消息延遲失敗!");
        }
        newCommand.setExtFields(request.getExtFields());
        newCommand.setVersion(response.getVersion());
        newCommand.setOpaque(response.getOpaque());
        newCommand.setLanguage(response.getLanguage());
        newCommand.setBody(request.getBody());

        if (!request.isOnewayRPC()) {
            try {
                ctx.writeAndFlush(newCommand);
            } catch (Throwable e) {
                log.error("DelayProcessor process request over, but response failed", e);
                log.error(request.toString());
                log.error(response.toString());
            }
        }
        return newCommand;
    }


    public void putMessage(MessageExtBrokerInner msgInner) {
        this.brokerController.getMessageStore().putMessage(msgInner);
    }

    @Override
    public void run() {
        for (; ; ) {
            long curTime = System.currentTimeMillis() / 1000;
            jobTaskExecute.submit(() -> sendMsg(curTime));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }

    private String getDelayPath() {
        String delayPath = "./delay-store"+ File.separator + "delay";
        return delayPath;
    }

    private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) {

        ObjectOutputStream objectOutputStream = null;
        try {
            String msgId =(startTime/1000 )+"-"+ System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999);
            System.out.println( getCurrentTime()+"寫入延遲消息 >>" + msgId);
            String parentDir = getDelayPath() + File.separator + startTime / 1000;
            File parentFile = new File(parentDir);
            if (!parentFile.exists()) {
                parentFile.mkdirs();
            }
            String fileName = parentDir + File.separator + msgId;

            FileOutputStream fos = new FileOutputStream(fileName);
            BufferedOutputStream bos = new BufferedOutputStream(fos);
            objectOutputStream = new ObjectOutputStream(bos);
            objectOutputStream.writeObject(msgInner);
            return true;
        } catch (Exception ex) {
            log.error("saveMsgFile ex:", ex);
            return false;
        } finally {
            try {
                if (objectOutputStream != null) {
                    objectOutputStream.close();
                }
            } catch (Exception ex) {
                log.error("saveMsgFile ex:", ex);
            }
        }

    }


    private MessageExtBrokerInner readFile(File f) {
        ObjectInputStream ois = null;
        try {
            ois = new ObjectInputStream(new FileInputStream(f));
            return (MessageExtBrokerInner) ois.readObject();
        } catch (Exception ex) {
            return null;
        } finally {
            if (ois != null) {
                try {
                    ois.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    private  void sendMissCallMsg() {
        File lst = new File(getDelayPath());
        File[] files = lst.listFiles();
        long startTime = System.currentTimeMillis() / 1000 - 10 * 1000;
        for (File f : files) {
            String name = f.getName();
            if (f.isDirectory() && !name.equals(".") && !name.equals("..")) {
                try {
                    Long fileTime = Long.parseLong(name);
                    if (fileTime <= startTime) {
                        sendMsg(fileTime);
                    }
                } catch (Exception ex) {
                }
            }
        }

    }

    private String  getCurrentTime(){
       return  Thread.currentThread().getName()+ ">>["+DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")+"] ";
    }
    private void sendMsg(long startTime) {
        File lst = new File(getDelayPath() + File.separator + startTime);
        File[] files = lst.listFiles();
        if (files != null) {
            for (File f : files) {
                System.out.println( getCurrentTime()+"時間到發送>> "+ startTime+" to commitLog " + f.getName());
                MessageExtBrokerInner msgInner = readFile(f);
                if (msgInner != null) {
                    putMessage(msgInner);
                    System.out.println( getCurrentTime()+"寫入log >> "+ startTime+" to commitLog " + f.getName()+" success");
                    f.delete();
                }
            }
            lst.delete();
        }
    }
}

 

 

總結:rocketmq延遲隊列實現主要是通過時間輪和文件來保存延時消息,等到了時間后,再寫入延時隊列,來達到延時的目的。

 

總共有4種方式來實現延時隊列,可以參考延時隊列的實現原理篇

https://www.cnblogs.com/tomj2ee/p/15815157.html

 

開源rocketmq延遲隊列實現: 

https://gitee.com/venus-suite/rocketmq-with-delivery-time.git

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM