QMQ去哪兒網-mq中間件(啟動失敗)


簡介

  • 去哪兒網近日宣布開源其內部廣泛使用的消息中間件 QMQ 。QMQ 自 2012 年誕生以來在去哪兒網所有業務場景中廣泛的應用,包括跟交易息息相關的訂單場景; 也包括報價搜索等高吞吐量場景。目前在公司內部日常消息 qps 在 60W 左右,生產上承載將近 4W+ 消息 topic ,消息的端到端延遲可以控制在 10ms 以內。
  • 主要提供以下特性:
    • 異步實時消息
    • 延遲/定時消息
    • 基於Tag的服務端過濾
    • Consumer端冪等處理支持
    • Consumer端filter
    • 死信消息
    • 結合Spring annotation使用的簡單API
    • 提供豐富的監控指標
    • 接入OpenTracing
    • 分布式事務(即將開源)
    • 消息投遞軌跡(即將開源)
    • 歷史消息的自動備份(即將開源)

demo實踐

配置MetaServer

  1. https://github.com/qunarcorp/qmq/releases 下載安裝包,目前最新版qmq-dist-1.0.0-bin.tar.gz
  2. 解壓后目錄
  3. 在sql目錄有個init.sql,在本地的mysql數據庫中,隨便建個庫,運行該sql,會創建幾張表
  4. 在conf中,有很多配置文件需要配置
  5. 首先是datasource.properties配置數據庫地址
# 可選,MySQL驅動類
jdbc.driverClassName=com.mysql.jdbc.Driver
# 必填,MySQL數據庫連接地址
jdbc.url=jdbc:mysql://<address>:<port>/<db>?<params>
# 必填,MySQL數據庫用戶名
jdbc.username=
# 必填,MySQL數據庫密碼
jdbc.password=
# 可選,連接池最大連接數
pool.size.max=10
  1. metaserver.properties
#可選,提供http服務,用於meta server的服務發現
meta.server.discover.port=8080
#可選,以tcp的方式監聽,供client和server訪問
meta.server.port=20880
# 可選,內部數據緩存刷新間隔
refresh.period.seconds=5
# 可選,動態生效,broker心跳超時時間
heartbeat.timeout.ms=30000
# 可選,動態生效,每個主題至少分配多少broker group
min.group.num=2
  1. valid-api-tokens.properties(一行一條白名單,在加入集群服務時配置需要用到,設置=前面的參數即可)
123=default-pass
  1. client_log_switch.properties
# 是否輸出所有主題的詳細請求信息
default=false

# 可以控制單個主題是否輸出詳細請求信息
<subject a>=true
<subject b>=false
  1. 使用bin目錄的metaserver.sh(windows平台上請使用metaserver.cmd)啟動

配置Broker

# 必填,metaserver地址,即你第一步安裝的meta server的ip地址,注意這里的地址的端口是meta.server.discover.port指定的端口,默認是8080
meta.server.endpoint=http://<metaserver address>/meta/address
# 可選,broker服務端口
broker.port=20881
# 可選,同步數據端口
sync.port=20882
# 可選,動態生效,從機同步請求超時時間
slave.sync.timeout=3000
# 必填,數據存放目錄
store.root=/data
# 可選,動態生效,主是否等待從寫入完成再返回寫入結果
wait.slave.wrote=false
# 可選,動態生效,重試消息延遲派發時間
message.retry.delay.seconds=5
# 可選,動態生效,messagelog過期時間
messagelog.retention.hours=72
# 可選,動態生效,consumerlog過期時間
consumerlog.retention.hours=72
# 可選,動態生效,pulllog過期時間
pulllog.retention.hours
# 可選,動態生效,數據文件過期檢查周期
log.retention.check.interval.seconds
# 可選,動態生效,是否刪除過期數據
log.expired.delete.enable=true
# 可選,動態生效,checkpoint文件保留數量
checkpoint.retain.count=5
# 可選,動態生效,action checkpoint強制寫入周期,單位為日志數量
action.checkpoint.interval=100000
# 可選,動態生效,message checkpoint強制寫入周期,單位為日志數量
message.checkpoint.interval=100000
# 可選,動態生效,重試消息寫入QPS限制
put_need_retry_message.limiter=50
# 可選,動態生效,從機一次最多拉取多少數據
sync.batch.size=100000
# 可選,動態生效,從機同步數據超時時間
message.sync.timeout.ms=10
  1. 運行,liunx使用tools.sh
tools.cmd AddBroker --metaserver=localhost:8080 --token=123 --brokerGroup=group1 --role=0 --hostname=localhost --ip=127.0.0.1 --servePort=20881 --syncPort=20882
  • metaserver address指的是ip:port,port默認是8080
  • token即metaserver的配置valid-api-tokens.properties里任何一項
  • brokerGroup 這一組的名字,每一組分為主從
  • role 角色 0 - master, 1 - slave, 5 - delay master, 6 - delay slave
  • hostname broker的主機名
  • ip broker的ip地址
  • servePort broker接收消息的端口
  • syncPort 主從同步端口

配置Delay Server

# 必填,metaserver地址,即你第一步安裝的meta server的ip地址,注意這里的地址的端口是meta.server.discover.port指定的端口,默認是8080
meta.server.endpoint=http://<metaserver address>/meta/address
# 可選,broker服務端口
broker.port=20881
# 可選,同步數據端口
sync.port=20882
# 可選,動態生效,從機同步請求超時時間
slave.sync.timeout=3000
# 必填,數據存放目錄
store.root=/data
# 可選,動態生效,主是否等待從寫入完成再返回寫入結果
wait.slave.wrote=true
# 可選,動態生效,從機一次最多拉取多少數據
sync.batch.size=100000
# 可選,動態生效,從機同步dispatch數據超時時間
dispatch.sync.timeout.ms=10
# 可選,動態生效,從機同步message數據超時時間
message.sync.timeout.ms=10
# 可選,動態生效,是否刪除過期數據
log.expired.delete.enable=true
# 可選,動態生效,數據文件過期檢查周期
log.retention.check.interval.seconds=60
# 可選,動態生效,dispatch文件過期時間
dispatch.log.keep.hour=72
# 可選,動態生效,messagelog過期時間
messagelog.retention.hours=72
  1. 運行
tools.cmd AddBroker --metaserver=localhost:8080 --token=123 --brokerGroup=group1 --role=5 --hostname=localhost --ip=127.0.0.1 --servePort=20881 --syncPort=20882

運行失敗~

  1. broker.cmd 啟動並報錯后,metaserver也報錯
  2. broker報錯
[2018-12-17 16:33:16 ERROR qunar.tc.qmq.meta.BrokerRegisterService] Send acquire meta message to meta server failed
java.lang.IndexOutOfBoundsException: null
        at io.netty.buffer.EmptyByteBuf.readShort(EmptyByteBuf.java:450)
        at qunar.tc.qmq.utils.PayloadHolderUtils.readString(PayloadHolderUtils.java:37)
        at qunar.tc.qmq.meta.BrokerAcquireMetaResponseSerializer.deSerialize(BrokerAcquireMetaResponseSerializer.java:31)
        at qunar.tc.qmq.meta.BrokerRegisterService.acquireMeta(BrokerRegisterService.java:76)
        at qunar.tc.qmq.meta.BrokerRegisterService.start(BrokerRegisterService.java:68)
        at qunar.tc.qmq.startup.ServerWrapper.register(ServerWrapper.java:103)
        at qunar.tc.qmq.startup.ServerWrapper.start(ServerWrapper.java:86)
        at qunar.tc.qmq.container.Bootstrap.main(Bootstrap.java:29)
Exception in thread "main" java.lang.RuntimeException: java.lang.IndexOutOfBoundsException
        at qunar.tc.qmq.meta.BrokerRegisterService.acquireMeta(BrokerRegisterService.java:80)
        at qunar.tc.qmq.meta.BrokerRegisterService.start(BrokerRegisterService.java:68)
        at qunar.tc.qmq.startup.ServerWrapper.register(ServerWrapper.java:103)
        at qunar.tc.qmq.startup.ServerWrapper.start(ServerWrapper.java:86)
        at qunar.tc.qmq.container.Bootstrap.main(Bootstrap.java:29)
Caused by: java.lang.IndexOutOfBoundsException
        at io.netty.buffer.EmptyByteBuf.readShort(EmptyByteBuf.java:450)
        at qunar.tc.qmq.utils.PayloadHolderUtils.readString(PayloadHolderUtils.java:37)
        at qunar.tc.qmq.meta.BrokerAcquireMetaResponseSerializer.deSerialize(BrokerAcquireMetaResponseSerializer.java:31)
        at qunar.tc.qmq.meta.BrokerRegisterService.acquireMeta(BrokerRegisterService.java:76)
  1. metaServer報錯
[2018-12-17 16:33:16 ERROR qunar.tc.qmq.netty.NettyRequestExecutor] doExecute request exception, channel:[id: 0x88b9cfb5, L:/172.16.100.141:20880 - R:/172.16.100.141:59614], cmd:Datagram{header=RemotingHeader{magicCode=-557774114, code=40, version=8, opaque=0, flag=0, requestCode=-1}}
java.lang.RuntimeException: cannot find broker meta for DESKTOP-BCBMBTF:20881
        at qunar.tc.qmq.meta.processor.BrokerAcquireMetaProcessor.lambda$createResponse$1(BrokerAcquireMetaProcessor.java:68)
        at java.util.Optional.orElseThrow(Optional.java:290)
        at qunar.tc.qmq.meta.processor.BrokerAcquireMetaProcessor.createResponse(BrokerAcquireMetaProcessor.java:68)
        at qunar.tc.qmq.meta.processor.BrokerAcquireMetaProcessor.processRequest(BrokerAcquireMetaProcessor.java:58)
        at qunar.tc.qmq.netty.NettyRequestExecutor.doExecute(NettyRequestExecutor.java:86)
        at qunar.tc.qmq.netty.NettyRequestExecutor.executeWithMonitor(NettyRequestExecutor.java:70)
        at qunar.tc.qmq.netty.NettyRequestExecutor.execute(NettyRequestExecutor.java:56)
        at qunar.tc.qmq.netty.NettyServerHandler.processRequestCommand(NettyServerHandler.java:79)
        at qunar.tc.qmq.netty.NettyServerHandler.processMessageReceived(NettyServerHandler.java:57)
        at qunar.tc.qmq.netty.NettyServerHandler.channelRead0(NettyServerHandler.java:50)
        at qunar.tc.qmq.netty.NettyServerHandler.channelRead0(NettyServerHandler.java:37)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)

總結

  1. qmq剛開源出來,很多東西有待實踐,我碰到的問題目前網上很難搜到答案
  2. 若有大神運行成功官方demo的,還請寫篇博客,必拜訪推薦,請在評論區留下地址

官方地址
https://github.com/qunarcorp/qmq


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM