==背景==
物聯網場景,在設備端寫了一個小的API服務程序,這個程序包括:
1、向平台上報設備數據
2、創建消費者客戶端,用來監聽平台的下行命令
==問題==
平台層需要知道設備的狀態:在線 or 離線。我能想到的解決辦法
1、設備上報心跳數據,平台通過心跳來判斷設備是否在線。
2、rocketmq應該有可以監控消費者狀態的命令,是否可以通過這個命令實現。
方案1肯定是沒有問題的,不過缺點就是需要在平台上寫狀態管理的代碼,麻煩不說,可能還有延遲。
於是想嘗試方法2是否可行。
==踐行過程==
首先,我觀察了rocketmq-console(RocketMQ的Web界面,需要獨立部署),發現可以通過Web界面查看消費者狀態,結果如圖:
通過瀏覽器的控制台日志,可以看到調用的是consumerConnection.query接口。
很好,我是否可以借鑒一下這個思路,去監聽消費者狀態呢。
按照這個思路走,去github上找了源碼:https://github.com/apache/rocketmq-externals
通過查看他們的源碼,才知道RocketMQ已經提供了供查看消費者鏈接信息的API。
==API示例==
需要引入新的pom文件rocketmq-tools、rocketmq-common,增加只有,所有的pom為
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-store</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.0</version> </dependency>
Java代碼示例
package admin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; public class AdminExtSample { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876"); defaultMQAdminExt.start(); ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down"); System.out.println(cc.toString()); defaultMQAdminExt.shutdown(); } }
這樣就可以獲取上面web頁面中的所有信息了。
==實際應用==
在實際產品中,需要監控設備端消費者的狀態,然后形成設備在線離線的結果數據。
打算用Flink寫一個實時流,監控消費者狀態,並將結果寫入到InfluxDB總。
1、設計思路:
→獨立編寫一個實時處理流(與數據處理的流分離開)
→在Source中按照一定頻率查看消費者狀態(ConsumerConnection)
→在Proc中讀取InfluxDB中的狀態(表名:device_status),與Source中產生的數據進行對比產生online、offline的設備狀態變化數據
→在Sink中將設備狀態變化數據寫入到InfluxDB(表名:device_status)
2、實際代碼
類名:SourceConsumerConnection
作用:RocketMQ的Source,用來定時檢查消費者狀態,產生狀態的數據流
Tips:當消費組不在線的時候,會拋出MQBrokerException(206)的異常,這個異常需要人工處理一下。
package com.rexel.stream.flink.source; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rexel.stream.utils.RocketUtils; import java.util.HashSet; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; /** * @ClassName: SourceConsumerConnection * @Description: 無任何處理的Source * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/7/28 */ @Slf4j public class SourceConsumerConnection extends RichParallelSourceFunction<String> { private RocketUtils rocketUtils = RocketUtils.getInstance(); private DefaultMQAdminExt adminExt; private String namesrvAddr; private String consumerGroup; private String accessKey; private String secretKey; public SourceConsumerConnection( String namesrvAddr, String consumerGroup, String accessKey, String secretKey) { this.namesrvAddr = namesrvAddr; this.consumerGroup = consumerGroup; this.accessKey = accessKey; this.secretKey = secretKey; } @Override public void run(SourceContext<String> ctx) { adminExt = rocketUtils.createAdminExt(namesrvAddr, accessKey, secretKey); while (true) { ConsumerConnection cc; try { cc = adminExt.examineConsumerConnectionInfo(consumerGroup); } catch (InterruptedException | RemotingException | MQClientException e) { e.printStackTrace(); return; } catch (MQBrokerException e) { // 消費者不在線 if (e.getResponseCode() == 206) { ctx.collect(new JSONArray().toString()); continue; } else { e.printStackTrace(); return; } } HashSet<Connection> set = cc.getConnectionSet(); JSONArray jsonArray = new JSONArray(); for(Connection connection : set) { JSONObject jsonObject = (JSONObject)JSONObject.toJSON(connection); jsonArray.add(jsonObject); } ctx.collect(jsonArray.toString()); sleep(); } } @Override public void close() { shutdown(); } @Override public void cancel() { shutdown(); } private void sleep() { try { Thread.sleep((long) 3000); } catch (InterruptedException e) { e.printStackTrace(); } } private void shutdown() { if (adminExt != null) { adminExt.shutdown(); } adminExt = null; } }
類名:CheckDeviceStatus
作用:接收Source數據,並讀取InfluxDB數據,對比之后產生設備狀態變化數據
package com.rexel.stream.flink.proc; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rexel.stream.cons.Constants; import com.rexel.stream.utils.CommonUtils; import com.rexel.stream.utils.InfluxUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.influxdb.InfluxDB; /** * @ClassName: CheckDeviceStatus * @Description: 設備狀態檢查 * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/7/28 */ @Slf4j public class CheckDeviceStatus extends ProcessFunction<String, String> { private InfluxUtils influxUtils = InfluxUtils.getInstance(); private InfluxDB influxdb = null; private String database; private String url; private String username; private String password; public CheckDeviceStatus(String database, String url, String username, String password) { this.database = database; this.url = url; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { this.influxdb = this.influxUtils.connect(url, username, password); this.influxdb.setDatabase(database); } @Override public void processElement(String value, Context ctx, Collector<String> out) { /* * 輸入數據1:消費者連接狀態 * [ * { * "clientAddr":"223.72.217.178:20020", * "clientId":"RexelLabDevice1@4e9c376c-eb28-4086-86e1-be6555d23430", * "language":"JAVA", * "version":313 * } * ] */ JSONArray consumerArray = JSONArray.parseArray(value); /* * 輸入數據2:設備最后一次狀態 * [ * { * "time":"2020-07-21 16:08:14", * "productKey":"a1B6t6ZG6oR", * "deviceName":"QCHTestDevice1", * "status":"offline" * } * ] */ JSONArray deviceArray = influxUtils.queryDeviceStatus(influxdb, database); // 檢查已經登錄過的設備 for (int i = 0; i < deviceArray.size(); i++) { JSONObject deviceStatusJson = deviceArray.getJSONObject(i); String deviceName = deviceStatusJson.getString("deviceName"); String productKey = deviceStatusJson.getString("productKey"); String statusOld = deviceStatusJson.getString("status"); // 獲取設備消費者 JSONObject deviceConsumerJson = getDeviceConsumer(deviceName, consumerArray); // 計算設備狀態 String statusNew = deviceConsumerJson == null ? Constants.OFFLINE : Constants.ONLINE; // 狀態未發生變化 if (statusOld.equals(statusNew)) { continue; } // 生成設備狀態JSON JSONObject resultJson = new JSONObject(); resultJson.put("status", statusNew); resultJson.put("productKey", productKey); resultJson.put("deviceName", deviceName); resultJson.put("time", CommonUtils.timeLongToStr(System.currentTimeMillis())); out.collect(resultJson.toString()); } // 檢查新創建的設備 for (int i = 0; i < consumerArray.size(); i++) { JSONObject consumerJson = consumerArray.getJSONObject(i); String clientId = consumerJson.getString("clientId"); String deviceName = clientId.split("@")[0]; // 檢查是否為新設備 if (!isNewDevice(deviceName, deviceArray)) { continue; } // 生成設備狀態JSON JSONObject resultJson = new JSONObject(); resultJson.put("status", Constants.ONLINE); resultJson.put("productKey", "DView"); resultJson.put("deviceName", deviceName); resultJson.put("time", CommonUtils.timeLongToStr(System.currentTimeMillis())); out.collect(resultJson.toString()); } } private boolean isNewDevice(String deviceName, JSONArray deviceArray) { for (int i = 0; i < deviceArray.size(); i++) { JSONObject deviceStatusJson = deviceArray.getJSONObject(i); String deviceNameTemp = deviceStatusJson.getString("deviceName"); if (deviceName.equals(deviceNameTemp)) { return false; } } return true; } private JSONObject getDeviceConsumer(String deviceName, JSONArray consumerArray) { for (int i = 0; i < consumerArray.size(); i++) { JSONObject consumerJson = consumerArray.getJSONObject(i); String clientId = consumerJson.getString("clientId"); if (clientId != null && clientId.startsWith(deviceName)) { return consumerJson; } } return null; } }
類名:SinkToInfluxStatus
作用:將數據寫入到InfluxDB中
package com.rexel.stream.flink.sink; import com.alibaba.fastjson.JSON; import com.rexel.stream.pojo.DeviceStatus; import com.rexel.stream.utils.InfluxUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.influxdb.InfluxDB; /** * @ClassName: SinkToInfluxStatus * @Description: 設備狀態數據Sink To InfluxDB * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/7/28 */ @Slf4j public class SinkToInfluxStatus extends RichSinkFunction<String> { private InfluxUtils influxUtils = InfluxUtils.getInstance(); private InfluxDB influxDb = null; private String database; private String url; private String username; private String password; public SinkToInfluxStatus(String url, String username, String password, String database) { this.url = url; this.username = username; this.password = password; this.database = database; } @Override public void open(Configuration parameters) throws Exception { this.influxDb = this.influxUtils.connect(url, username, password); this.influxDb.setDatabase(database); } @Override public void invoke(String value, Context context) { // 解析為DeviceBase對象 DeviceStatus deviceStatus = JSON.parseObject(value, DeviceStatus.class); // 寫入InfluxDB this.influxUtils.write(this.influxDb, this.database, deviceStatus); log.debug("[------]deviceStatus=" + deviceStatus.toString()); } @Override public void close() { if (this.influxDb != null) { this.influxDb.close(); } this.influxDb = null; } }
類名:StreamDeviceStatus
作用:Flink Job啟動類
Tips:啟動參數需要設置一個--input的參數,這個參數將指定一個配置文件
package com.rexel.stream.flink.job; import com.alibaba.fastjson.JSONObject; import com.rexel.stream.common.CheckPoint; import com.rexel.stream.cons.Constants; import com.rexel.stream.flink.proc.CheckDeviceStatus; import com.rexel.stream.flink.sink.SinkToInfluxStatus; import com.rexel.stream.flink.source.SourceConsumerConnection; import com.rexel.stream.utils.CommonUtils; import java.nio.charset.StandardCharsets; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; /** * @ClassName: StreamDeviceStatus * @Description: 監控設備狀態實時流 * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/7/28 */ @Slf4j public class StreamDeviceStatus extends AbstractStream { /** * Job啟動 * * @param args 啟動參數 * @throws Exception e */ public static void main(String[] args) throws Exception { // 參數檢查 final ParameterTool params = ParameterTool.fromArgs(args); if (!params.has(Constants.INPUT_CONF)) { log.error("[------]parameter error."); return; } // 讀取配置文件 String confStr = CommonUtils.readFile(params.get(Constants.INPUT_CONF), StandardCharsets.UTF_8); JSONObject confJson = JSONObject.parseObject(confStr); if (confJson == null) { log.error("[------]convert to json error."); return; } // 啟動實時處理流 StreamDeviceStatus streamDeviceStatus = new StreamDeviceStatus(confJson); streamDeviceStatus.execute(); } /** * 構造函數 * * @param confJson 配置文件JSON */ private StreamDeviceStatus(JSONObject confJson) { super(confJson); } /** * 實時流主邏輯 * * @throws Exception e */ private void execute() throws Exception { // 創建執行流上下文 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 檢查是否開啟CheckPoint if (confJson.getBoolean(Constants.JOB_CHECK_POINT)) { CheckPoint.setCheckPoint(env); } // 定義數據源 SingleOutputStreamOperator<String> rocketSource = env.addSource(getRocketSource()).name("rmq_consumer").setParallelism(1); // 定義處理過程 SingleOutputStreamOperator<String> statusStream = rocketSource.process(getCheckDeviceStatus()).name("check_status").setParallelism(1); // 定義輸出 statusStream.addSink(getSinkInfluxStatus()).name("to_influx").setParallelism(1); // 作業流啟動 JobExecutionResult result = env.execute("check_device_status"); log.info("[------]execute. result=" + result.toString()); } /** * 創建RocketSource(設備運行數據) * * @return RichParallelSourceFunction */ private RichParallelSourceFunction<String> getRocketSource() { String namesrvAddr = confJson.getString(Constants.RMQ_NAMESRV_ADDR); String accessKey = confJson.getString(Constants.RMQ_ACCESS_KEY); String secretKey = confJson.getString(Constants.RMQ_SECRET_KEY); String consumerGroup = confJson.getString(Constants.RMQ_CG_DEVICE_NOTICE_DOWN); return new SourceConsumerConnection(namesrvAddr, consumerGroup, accessKey, secretKey); } /** * 創建處理過程(告警檢查) * * @return BroadcastProcessFunction */ private ProcessFunction<String, String> getCheckDeviceStatus() { String url = confJson.getString(Constants.INFLUX_URL); String username = confJson.getString(Constants.INFLUX_USERNAME); String password = confJson.getString(Constants.INFLUX_PASSWORD); String database = confJson.getString(Constants.INFLUX_DATABASE); return new CheckDeviceStatus(database, url, username, password); } /** * 創建SinkToInflux(通用) * * @return RichSinkFunction */ private RichSinkFunction<String> getSinkInfluxStatus() { String url = confJson.getString(Constants.INFLUX_URL); String username = confJson.getString(Constants.INFLUX_USERNAME); String password = confJson.getString(Constants.INFLUX_PASSWORD); String database = confJson.getString(Constants.INFLUX_DATABASE); return new SinkToInfluxStatus(url, username, password, database); } }
類名:DeviceStatus
作用:InfluxDB表名device_status的java映射文件
package com.rexel.stream.pojo; import com.rexel.stream.cons.Constants; import com.rexel.stream.utils.CommonUtils; import lombok.EqualsAndHashCode; import lombok.Setter; import org.influxdb.dto.Point; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * @ClassName: DeviceStatus * @Description: 設備上下線狀態 * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/2/27 */ @EqualsAndHashCode(callSuper = true) @Setter public class DeviceStatus extends AbstractDeviceBase implements Serializable { /** * 設備狀態。 * online:上線。 * offline:離線。 */ private String status; /** * 設備所屬產品的唯一標識。 */ private String productKey; /** * 設備名稱。 */ private String deviceName; /** * 發送通知的時間點。 */ private String time; /** * 發送通知的UTC時間點。 */ private String utcTime; /** * 狀態變更前最后一次通信的時間。 * 為避免消息時序紊亂造成影響,建議您根據lastTime來維護最終設備狀態。 */ private String lastTime; /** * 狀態變更前最后一次通信的UTC時間。 */ private String utcLastTime; /** * 設備公網出口IP。 */ private String clientIp; /** * 構造函數,初始化Measurement名稱 */ public DeviceStatus() { setMeasurement(Constants.DEVICE_STATUS); } /** * 生成Point列表 * * @return Point列表 */ @Override public List<Point> getPointList() { List<Point> pointList = new ArrayList<>(); long longTime = CommonUtils.timeStrToLong(String.valueOf(time)); Point.Builder builder = Point.measurement(getMeasurement()); builder.time(longTime, TimeUnit.MILLISECONDS); addTag(builder, "productKey", productKey); addTag(builder, "deviceName", deviceName); addField(builder, "status", status); addField(builder, "utcTime", utcTime); addField(builder, "lastTime", lastTime); addField(builder, "utcLastTime", utcLastTime); addField(builder, "clientIp", clientIp); pointList.add(builder.build()); return pointList; } }
類名:AbstractDeviceBase
作用:DeviceStatus的抽象父類,用來完成以下通用的處理(實際工程中還有好幾個表的映射對象)
package com.rexel.stream.pojo; import com.rexel.stream.cons.Constants; import com.rexel.stream.utils.CommonUtils; import lombok.Data; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import java.io.Serializable; import java.util.List; /** * @ClassName: DeviceBase * @Description: 物聯網平台數據格式父類 * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/2/27 * <p> * 以下是InfluxDB的一些使用原則。供參考 * ■建議的原則: * 如果字段是經常被作為檢索條件的元數據,設計為tag; * 如果字段經常要作為group by的參數,設計為tag; * 如果字段需要被用來放在influxQL的函數的參數,設計為field; * 如果出於某些考慮,字段不方便作為string類型保存,則應該使用field,因為tags總是以string類型來存儲。 * ■不建議的原則: * 不要有太多的series * 不要在measurement的名字里攜帶數據 * 不要在一個tag字段里存放多於一條信息 * 不要使用influxQL的關鍵字作為字段名 */ @Data abstract public class AbstractDeviceBase implements Serializable { private String measurement = Constants.DEVICE_OTHER; /** * 獲取Point List * * @return Point List */ abstract public List<Point> getPointList(); /** * 生成BatchPoints * * @param dbName database * @return BatchPoints */ public BatchPoints getBatchPoints(String dbName) { BatchPoints batchPoints = BatchPoints.database( dbName).retentionPolicy(null).consistency(ConsistencyLevel.ALL).build(); List<Point> pointList = getPointList(); if (pointList == null || pointList.size() <= 0) { return null; } pointList.forEach(batchPoints::point); return batchPoints; } /** * 創建Tag * * @param builder Builder * @param name tag名 * @param value tag值 */ void addTag(Point.Builder builder, String name, String value) { if (value != null) { builder.tag(name, value); } } /** * 創建Field * * @param builder Builder * @param name field名 * @param value field值 */ void addField(Point.Builder builder, String name, Object value) { if (value == null) { return; } if (CommonUtils.isNumber(value)) { builder.addField(name, CommonUtils.formatDouble(value)); } else { builder.addField(name, String.valueOf(value)); } } }
類名:InfluxUtils
作用:InfluxDB的通用處理類
package com.rexel.stream.utils; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rexel.stream.pojo.AbstractDeviceBase; import java.util.List; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import java.io.Serializable; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult.Result; import org.influxdb.dto.QueryResult.Series; /** * @ClassName: InfluxUtils * @Description: InfluxDB通用類 * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/2/27 */ @Slf4j public class InfluxUtils implements Serializable { /** * 構造函數 */ private InfluxUtils() { } /** * 單例模式 */ private static class SingletonInstance { private static final InfluxUtils INSTANCE = new InfluxUtils(); } /** * 獲取實例句柄 */ public static InfluxUtils getInstance() { return SingletonInstance.INSTANCE; } /** * InfluxDB連接 * * @param url URL * @param userName 用戶名 * @param password 密碼 * @return InfluxDB句柄 * @throws Exception e */ public InfluxDB connect(String url, String userName, String password) throws Exception { OkHttpClient.Builder client = new OkHttpClient.Builder().readTimeout(60, TimeUnit.SECONDS); InfluxDB influxDb = InfluxDBFactory.connect(url, userName, password, client); if (influxDb == null) { throw new Exception("[---InfluxDBSink---]influxDB == null"); } return influxDb; } /** * InfluxDB寫入 * * @param influxDb InfluxDB句柄 * @param dbName database * @param abstractDeviceBase 寫入對象 */ public void write(InfluxDB influxDb, String dbName, AbstractDeviceBase abstractDeviceBase) { BatchPoints batchPoints = abstractDeviceBase.getBatchPoints(dbName); if (batchPoints == null) { log.error("[------]batchPoints == null"); return; } if (batchPoints.getPoints().size() <= 0) { log.error("[------]point size == 0"); return; } influxDb.write(batchPoints); log.debug("[------]batchPoints=" + batchPoints.toString()); } /** * 檢索設備狀態 * * @param influxDb InfluxDB句柄 * @param database database * @return JSONArray */ public JSONArray queryDeviceStatus(InfluxDB influxDb, String database) { String sql = " select " + " last(status) as status, " + " productKey, " + " deviceName " + " from device_status " + " group by productKey, deviceName " + " tz('Asia/Shanghai') "; QueryResult queryResult = influxDb.query(new Query(sql, database)); return convert(queryResult); } /** * 轉換QueryResult * * @param queryResult QueryResult * @return JSONArray */ private JSONArray convert(QueryResult queryResult) { JSONArray jsonArray = new JSONArray(); List<Result> results = queryResult.getResults(); for (Result result : results) { List<Series> seriesList = result.getSeries(); if (seriesList == null) { continue; } for (Series series : seriesList) { List<List<Object>> valuesList = series.getValues(); if (valuesList == null) { continue; } for (List<Object> values : valuesList) { List<String> columns = series.getColumns(); JSONObject jsonObject = new JSONObject(); for (int i = 0; i < columns.size(); i++) { String column = columns.get(i); Object value = values.get(i); if (value != null) { jsonObject.put(column, value); } } jsonArray.add(jsonObject); } } } return jsonArray; } }
類名:RocketUtils
作用:RocketMQ的通用處理類
package com.rexel.stream.utils; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; /** * @ClassName: RocketUtils * @Description: RocketMQ通用類 * @Author: chunhui.qu@rexel.com.cn * @Date: 2020/2/27 */ @Slf4j public class RocketUtils implements Serializable { private static Map<String, DefaultMQProducer> producerMap = new HashMap<>(); /** * 構造函數 */ private RocketUtils() { } /** * 單例模式 */ private static class SingletonInstance { private static final RocketUtils INSTANCE = new RocketUtils(); } /** * 獲取實例句柄 */ public static RocketUtils getInstance() { return SingletonInstance.INSTANCE; } /** * 創建DefaultMQAdminExt * * @param namesrvAddr namesrvAddr * @param accessKey accessKey * @param secretKey secretKey * @return DefaultMQAdminExt */ public DefaultMQAdminExt createAdminExt(String namesrvAddr, String accessKey, String secretKey) { RPCHook rpcHook = getAclRpcHook(accessKey, secretKey); DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook, 300000); adminExt.setNamesrvAddr(namesrvAddr); adminExt.setAdminExtGroup(UUID.randomUUID().toString()); try { adminExt.start(); } catch (MQClientException e) { e.printStackTrace(); return null; } return adminExt; } /** * 創建消費者 * * @param namesrvAddr namesrvAddr * @param topic topic * @param group 消費組 * @param accessKey accessKey * @param secretKey secretKey * @return 消費者句柄 */ public DefaultMQPushConsumer createConsumer( String namesrvAddr, String topic, String group, String accessKey, String secretKey) { RPCHook rpcHook = getAclRpcHook(accessKey, secretKey); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr(namesrvAddr); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setVipChannelEnabled(false); consumer.setConsumeTimeout(180000); consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1); consumer.setConsumeMessageBatchMaxSize(1); try { consumer.subscribe(topic, "*"); } catch (MQClientException e) { e.printStackTrace(); return null; } return consumer; } /** * 創建生產者 * * @param nameSrvAddr NamesrvAddr * @param group 生產組 * @param accessKey accessKey * @param secretKey secretKey * @return 生產者句柄 */ public DefaultMQProducer createProducer( String nameSrvAddr, String group, String accessKey, String secretKey) { String key = nameSrvAddr + group; if (producerMap.containsKey(key)) { return producerMap.get(key); } RPCHook rpcHook = getAclRpcHook(accessKey, secretKey); DefaultMQProducer producer = new DefaultMQProducer(group, rpcHook); producer.setNamesrvAddr(nameSrvAddr); producer.setSendMessageWithVIPChannel(false); producer.setSendMsgTimeout(5000); producer.setInstanceName(UUID.randomUUID().toString()); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); return null; } producerMap.put(key, producer); return producer; } /** * 生產數據(異步) * * @param producer 生產者句柄 * @param msg 消息 * @param callback 回調函數 */ public void sendAsync(DefaultMQProducer producer, Message msg, SendCallback callback) { try { producer.send(msg, callback); } catch (MQClientException | RemotingException | InterruptedException e) { e.printStackTrace(); } } /** * 生產數據(同步) * * @param producer 生產者句柄 * @param msg 消息 * @return 發送結果 */ public SendResult send(DefaultMQProducer producer, Message msg) { try { return producer.send(msg); } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) { e.printStackTrace(); return null; } } /** * 設置ACL權限 * * @param accessKey accessKey * @param secretKey secretKey * @return RPCHook */ private RPCHook getAclRpcHook(String accessKey, String secretKey) { return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey)); } }
啟動文件:
{ "job.name": "xxx", "job.checkpoint": false, "influxdb.url": "http://xxx:8100", "influxdb.username": "xxx", "influxdb.password": "xxx", "influxdb.database": "xxx", "rocketmq.namesrvAddr": "xxx", "rocketmq.acl.accessKey": "xxx", "rocketmq.acl.secretKey": "xxx", "rocketmq.device.consumerGroup.notice.down": "xxx" }
3、處理結果
以下是InfluxDB中的結果數據。
--END--