前言篇:
為了節約成本,決定通過自研來改造rocketmq,添加任意時間延遲的延時隊列,開源版本的rocketmq只有支持18個等級的延遲時間,
其實對於大部分的功能是夠用了的,但是以前的項目,全部都是使用了阿里雲的rocketmq,原因是不同的供應商的訂單的延時時間是不同的
(部分供應商的訂單未支付30分鍾取消,有些1個半小時取消,各種時間都有),
所以使用了大量的延時隊列,但是開源版本不支持任意時間延時(希望官方支持這個功能)
為了實現這個功能,網上查詢了不少資料,查詢到不少相關的文章,主要實現都是基於時間輪來實現的,
但是比較少開源的代碼實現(也許大家都沒有這個需求吧)
debug實踐篇:
1. 擼起袖子加油干,首先,下載源代碼 https://github.com/apache/rocketmq.git,導入ide
運行mvn package 生成jar包,如果成功的話,會生成到distribution目錄下面
2. 查看文檔,發現要運行namesvr 和 broker
找到 src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java ,開心的執行main方法,
哦哦哦哦哦,果然報錯了,提示 rocketmq.home.dir 目錄不存在,查看源碼, 原來是從system.propeties讀取的,
為了調試,我毫不猶豫的加上了配置文件,
再次運行,不報錯了,控制台顯示,成功啦(生活是多么美好,空氣是多么清晰!)
3.運行 broker ,打開 src\main\java\org\apache\rocketmq\broker\BrokerStartup.java,執行main方法,
添加 配置文件 (D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路徑,你要修改成自己的)
1 System.setProperty("rocketmq.home.dir", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb"); 2 System.setProperty("user.home", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");
運行一下,成功了,開心的發一條消息,試試,哦哦哦哦哦。發不出去哦(人生最痛苦的事情是,快要成功了,卻沒有成功)。
原來還要配置namesvr地址,在啟動命令,添加 -n localhost:9876 ( 上面的namesvr 啟動的ip和端口)
4.漫長的改造之路 (我們是勇敢的斯巴達勇士,一直勇往直前)
用了阿里雲的延時隊列,發現它的message 可以傳一個時間過來(任意的延時時間)
來來來,我們復制一下(不要告訴別人,我們一直是復制,粘貼的,沒有原創, 噓 ......)
1 /** 2 * 該類預定義一些系統鍵. 3 */ 4 static public class SystemPropKey { 5 public static final String TAG = "__TAG"; 6 public static final String KEY = "__KEY"; 7 public static final String MSGID = "__MSGID"; 8 public static final String SHARDINGKEY = "__SHARDINGKEY"; 9 public static final String RECONSUMETIMES = "__RECONSUMETIMES"; 10 public static final String BORNTIMESTAMP = "__BORNTIMESTAMP"; 11 public static final String BORNHOST = "__BORNHOST"; 12 /** 13 * 設置消息的定時投遞時間(絕對時間). <p>例1: 延遲投遞, 延遲3s投遞, 設置為: System.currentTimeMillis() + 3000; <p>例2: 定時投遞, 14 * 2016-02-01 11:30:00投遞, 設置為: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 15 * 11:30:00").getTime() 16 */ 17 public static final String STARTDELIVERTIME = "__STARTDELIVERTIME"; 18 }
/** * <p> 設置消息的定時投遞時間(絕對時間),最大延遲時間為7天. </p> <ol> <li>延遲投遞: 延遲3s投遞, 設置為: System.currentTimeMillis() + 3000;</li> * <li>定時投遞: 2016-02-01 11:30:00投遞, 設置為: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 * 11:30:00").getTime()</li> </ol> */ public void setStartDeliverTime(final long value) { putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value)); }
5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不負有心人,找到啦,
找到 \src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java, 發現
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); return response; } }
繼續debug,發現 sendMessage 就是處理發送消息的,
如果我們在這里判斷是否延時消息就寫入文件,然后返回成功到客戶端,等到了時間就發送延遲消息,不就搞定了嗎?
oh,yes,就是這么干的
//處理延遲消息 delay message String startTime = msgInner.getProperty(Message.SystemPropKey.STARTDELIVERTIME); boolean isDelayMsg = false; long nextStartTime = 0; if (startTime != null && msgInner.getDelayTimeLevel() <= 0) { nextStartTime = Long.parseLong(startTime); if (nextStartTime >= System.currentTimeMillis()) { isDelayMsg = true; } } if (isDelayMsg) { return delayProcessor.handlePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime); } else { if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); } }
其中 delayProcessor.handlePutMessageResultFuture 是我們用來處理延遲消息的地方
我們按照每個時間一個文件夾來保存延時消息,等延時消息到達后,定時的寫入延時隊列里面。
詳細原理,請查考 rocketmq 原理實現篇 https://www.cnblogs.com/tomj2ee/p/15815186.html
package org.apache.rocketmq.broker.delay; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import java.io.*; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; public class DelayProcessor implements Runnable { protected static final InternalLogger log = InternalLoggerFactory.getLogger(DelayProcessor.class.getCanonicalName()); protected final BrokerController brokerController; protected final SocketAddress storeHost; private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(16); public DelayProcessor(final BrokerController brokerController) { this.brokerController = brokerController; this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController .getNettyServerConfig().getListenPort()); Thread thread = new Thread(this); thread.setName("delayProcessor-run---thread"); thread.setDaemon(true); new File(getDelayPath()).mkdirs(); thread.start(); Thread missCallThread = new Thread(() -> { try { for(;;) { Thread.sleep(10 * 1000); sendMissCallMsg(); } } catch (InterruptedException e) { e.printStackTrace(); } }); missCallThread.setName("delayProcessor-callback-thread"); missCallThread.start(); System.out.println("init delay success " +getDelayPath()); } public RemotingCommand handlePutMessageResultFuture(RemotingCommand response, RemotingCommand request, MessageExtBrokerInner msgInner, ChannelHandlerContext ctx, int queueIdInt, long nextStartTime) { return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime); } private RemotingCommand handlePutMessageResult(RemotingCommand response, RemotingCommand request, MessageExtBrokerInner msg, ChannelHandlerContext ctx, int queueIdInt, long nextStartTime) { boolean svOk = saveMsgFile(nextStartTime, msg); SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader(); sendMessageResponseHeader.setQueueId(1); sendMessageResponseHeader.setMsgId("0"); sendMessageResponseHeader.setQueueOffset(0l); sendMessageResponseHeader.setTransactionId(""); RemotingCommand newCommand = RemotingCommand.createRequestCommand(ResponseCode.SUCCESS, sendMessageResponseHeader); if (svOk) { newCommand.setCode(ResponseCode.SUCCESS); } else { newCommand.setCode(ResponseCode.SYSTEM_ERROR); newCommand.setRemark("發送消息延遲失敗!"); } newCommand.setExtFields(request.getExtFields()); newCommand.setVersion(response.getVersion()); newCommand.setOpaque(response.getOpaque()); newCommand.setLanguage(response.getLanguage()); newCommand.setBody(request.getBody()); if (!request.isOnewayRPC()) { try { ctx.writeAndFlush(newCommand); } catch (Throwable e) { log.error("DelayProcessor process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); } } return newCommand; } public void putMessage(MessageExtBrokerInner msgInner) { this.brokerController.getMessageStore().putMessage(msgInner); } @Override public void run() { for (; ; ) { long curTime = System.currentTimeMillis() / 1000; jobTaskExecute.submit(() -> sendMsg(curTime)); try { Thread.sleep(1000); } catch (InterruptedException e) { } } } private String getDelayPath() { String delayPath = "./delay-store"+ File.separator + "delay"; return delayPath; } private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) { ObjectOutputStream objectOutputStream = null; try { String msgId =(startTime/1000 )+"-"+ System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999); System.out.println( getCurrentTime()+"寫入延遲消息 >>" + msgId); String parentDir = getDelayPath() + File.separator + startTime / 1000; File parentFile = new File(parentDir); if (!parentFile.exists()) { parentFile.mkdirs(); } String fileName = parentDir + File.separator + msgId; FileOutputStream fos = new FileOutputStream(fileName); BufferedOutputStream bos = new BufferedOutputStream(fos); objectOutputStream = new ObjectOutputStream(bos); objectOutputStream.writeObject(msgInner); return true; } catch (Exception ex) { log.error("saveMsgFile ex:", ex); return false; } finally { try { if (objectOutputStream != null) { objectOutputStream.close(); } } catch (Exception ex) { log.error("saveMsgFile ex:", ex); } } } private MessageExtBrokerInner readFile(File f) { ObjectInputStream ois = null; try { ois = new ObjectInputStream(new FileInputStream(f)); return (MessageExtBrokerInner) ois.readObject(); } catch (Exception ex) { return null; } finally { if (ois != null) { try { ois.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void sendMissCallMsg() { File lst = new File(getDelayPath()); File[] files = lst.listFiles(); long startTime = System.currentTimeMillis() / 1000 - 10 * 1000; for (File f : files) { String name = f.getName(); if (f.isDirectory() && !name.equals(".") && !name.equals("..")) { try { Long fileTime = Long.parseLong(name); if (fileTime <= startTime) { sendMsg(fileTime); } } catch (Exception ex) { } } } } private String getCurrentTime(){ return Thread.currentThread().getName()+ ">>["+DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")+"] "; } private void sendMsg(long startTime) { File lst = new File(getDelayPath() + File.separator + startTime); File[] files = lst.listFiles(); if (files != null) { for (File f : files) { System.out.println( getCurrentTime()+"時間到發送>> "+ startTime+" to commitLog " + f.getName()); MessageExtBrokerInner msgInner = readFile(f); if (msgInner != null) { putMessage(msgInner); System.out.println( getCurrentTime()+"寫入log >> "+ startTime+" to commitLog " + f.getName()+" success"); f.delete(); } } lst.delete(); } } }
總結:rocketmq延遲隊列實現主要是通過時間輪和文件來保存延時消息,等到了時間后,再寫入延時隊列,來達到延時的目的。
總共有4種方式來實現延時隊列,可以參考延時隊列的實現原理篇
https://www.cnblogs.com/tomj2ee/p/15815157.html
開源rocketmq延遲隊列實現:
https://gitee.com/venus-suite/rocketmq-with-delivery-time.git