RocketMQ消息發布報錯:No accessKey is configured


No accessKey is configured 本來這個錯誤是很簡單個錯誤,本意是mq上邊開啟了acl驗證機制,而客戶端沒有配置accessKey。

但是筆者這次遇到的這個問題比較有意思,廢了我半天時間去研究。在這里記錄一下。


當時筆者是先行自己封裝好了一個rocket的工具庫,考慮到安全問題,決定引入acl機制,然后主要是參照了網上的兩個文章。
配置mq的broker和plain_acl兩個配置文件,客戶端加入acl相關代碼,主要是加入了AclRPCHook,在發送消息之前插入accessKey和簽名供mq進行校驗。
之后用測試工程引用本地maven庫上安裝的rocket工具庫進行測試,結果之前能用的代碼在用了acl之后一直報錯:提示accessKey沒配置。

然后發現直接弄個帶main方法的類運行例子里的代碼是可以的,一樣的代碼copy到測試工程里就不行了。
分析一下應該是客戶端問題,mq的acl配置應該是對的。
接下來1天各種實驗無果,開始回到起點,潛心來看源代碼,搞清楚客戶端的mq發送消息以及acl的源碼是怎么寫的。

當調用DefaultMQProducer的send方法,同步的向mq投遞消息的時候,實際上是defaultMQProducerImpl.send(msg),
也即DefaultMQProducerImpl的sendKernelImpl方法,關鍵代碼:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage,里邊其實是MQClientAPIImpl的如下方法:

private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response,addr);
    }

其中remotingClient是接口,其實現是netty NettyRemotingClient invokeSync(),后者會doBeforeRpcHooks(addr, request);

if (rpcHooks.size() > 0) {
            for (RPCHook rpcHook: rpcHooks) {
                rpcHook.doBeforeRequest(addr, request);
            }
        }

rpcHooks這里相當於是NettyRemotingClient的成員變量。

對於acl的DefaultMQProducer
DefaultMQProducer實例化的時候,是把rpcHook都給到了自己的defaultMQProducerImpl。

factory是在defaultMQProducerImpl.start()也就是DefaultMQProducer.start()的時候實例化的。

MQClientInstance是factory的實現,實例化的時候內部

this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
然后factory提供這個mQClientAPIImpl實例。


defaultMQProducerImpl的start方法中實例化factory時,會判斷如果生成過,就不再生成了。也就是說整個進程只會有一個factory實例。
關鍵代碼:

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);


剛好工程里有另外一個transactionProducer的代碼,是筆者用來測試事務消息的,沒加acl代碼,所以一旦先transactionProducer.start()創建了factory,
那之后一直用這個無acl的,所以發送消息到mq的時候一直提示沒有accessKey就是這個原因。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM