通過監控RockeMQ消費者狀態判斷設備是否在線的樣例


==背景==

 物聯網場景,在設備端寫了一個小的API服務程序,這個程序包括:

1、向平台上報設備數據

2、創建消費者客戶端,用來監聽平台的下行命令

 

==問題==

平台層需要知道設備的狀態:在線  or  離線。我能想到的解決辦法

1、設備上報心跳數據,平台通過心跳來判斷設備是否在線。

2、rocketmq應該有可以監控消費者狀態的命令,是否可以通過這個命令實現。

方案1肯定是沒有問題的,不過缺點就是需要在平台上寫狀態管理的代碼,麻煩不說,可能還有延遲。

於是想嘗試方法2是否可行。

 

==踐行過程==

首先,我觀察了rocketmq-console(RocketMQ的Web界面,需要獨立部署),發現可以通過Web界面查看消費者狀態,結果如圖:

 

通過瀏覽器的控制台日志,可以看到調用的是consumerConnection.query接口。

很好,我是否可以借鑒一下這個思路,去監聽消費者狀態呢。

 

按照這個思路走,去github上找了源碼:https://github.com/apache/rocketmq-externals

通過查看他們的源碼,才知道RocketMQ已經提供了供查看消費者鏈接信息的API。

 

==API示例==

需要引入新的pom文件rocketmq-tools、rocketmq-common,增加只有,所有的pom為

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-store</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-tools</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>4.5.0</version>
</dependency>

 

Java代碼示例

package admin;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

public class AdminExtSample {
    public static void main(String[] args)
        throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
        defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876");
        defaultMQAdminExt.start();

        ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down");
        System.out.println(cc.toString());

        defaultMQAdminExt.shutdown();

    }
} 

這樣就可以獲取上面web頁面中的所有信息了。

 

==實際應用==

在實際產品中,需要監控設備端消費者的狀態,然后形成設備在線離線的結果數據。

打算用Flink寫一個實時流,監控消費者狀態,並將結果寫入到InfluxDB總。

 

1、設計思路:

→獨立編寫一個實時處理流(與數據處理的流分離開)

→在Source中按照一定頻率查看消費者狀態(ConsumerConnection)

→在Proc中讀取InfluxDB中的狀態(表名:device_status),與Source中產生的數據進行對比產生online、offline的設備狀態變化數據

→在Sink中將設備狀態變化數據寫入到InfluxDB(表名:device_status)

 

2、實際代碼

類名:SourceConsumerConnection

作用:RocketMQ的Source,用來定時檢查消費者狀態,產生狀態的數據流

Tips:當消費組不在線的時候,會拋出MQBrokerException(206)的異常,這個異常需要人工處理一下。

package com.rexel.stream.flink.source;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rexel.stream.utils.RocketUtils;
import java.util.HashSet;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

/**
 * @ClassName: SourceConsumerConnection
 * @Description: 無任何處理的Source
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/7/28
 */
@Slf4j
public class SourceConsumerConnection extends RichParallelSourceFunction<String> {
    private RocketUtils rocketUtils = RocketUtils.getInstance();
    private DefaultMQAdminExt adminExt;
    private String namesrvAddr;
    private String consumerGroup;
    private String accessKey;
    private String secretKey;

    public SourceConsumerConnection(
        String namesrvAddr, String consumerGroup, String accessKey, String secretKey) {
        this.namesrvAddr = namesrvAddr;
        this.consumerGroup = consumerGroup;
        this.accessKey = accessKey;
        this.secretKey = secretKey;
    }

    @Override
    public void run(SourceContext<String> ctx) {
        adminExt = rocketUtils.createAdminExt(namesrvAddr, accessKey, secretKey);

        while (true) {
            ConsumerConnection cc;

            try {
                cc = adminExt.examineConsumerConnectionInfo(consumerGroup);
            } catch (InterruptedException | RemotingException | MQClientException e) {
                e.printStackTrace();
                return;
            } catch (MQBrokerException e) {
                // 消費者不在線
                if (e.getResponseCode() == 206) {
                    ctx.collect(new JSONArray().toString());
                    continue;
                } else {
                    e.printStackTrace();
                    return;
                }
            }

            HashSet<Connection> set = cc.getConnectionSet();
            JSONArray jsonArray = new JSONArray();
            for(Connection connection : set) {
                JSONObject jsonObject = (JSONObject)JSONObject.toJSON(connection);
                jsonArray.add(jsonObject);
            }
            ctx.collect(jsonArray.toString());

            sleep();
        }
    }

    @Override
    public void close() {
        shutdown();
    }

    @Override
    public void cancel() {
        shutdown();
    }

    private void sleep() {
        try {
            Thread.sleep((long) 3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    private void shutdown() {
        if (adminExt != null) {
            adminExt.shutdown();
        }
        adminExt = null;
    }
}

 

類名:CheckDeviceStatus

作用:接收Source數據,並讀取InfluxDB數據,對比之后產生設備狀態變化數據

package com.rexel.stream.flink.proc;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rexel.stream.cons.Constants;
import com.rexel.stream.utils.CommonUtils;
import com.rexel.stream.utils.InfluxUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.influxdb.InfluxDB;

/**
 * @ClassName: CheckDeviceStatus
 * @Description: 設備狀態檢查
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/7/28
 */
@Slf4j
public class CheckDeviceStatus extends ProcessFunction<String, String> {
    private InfluxUtils influxUtils = InfluxUtils.getInstance();
    private InfluxDB influxdb = null;
    private String database;
    private String url;
    private String username;
    private String password;

    public CheckDeviceStatus(String database, String url, String username, String password) {
        this.database = database;
        this.url = url;
        this.username = username;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.influxdb = this.influxUtils.connect(url, username, password);
        this.influxdb.setDatabase(database);
    }

    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        /*
         * 輸入數據1:消費者連接狀態
         * [
         *     {
         *         "clientAddr":"223.72.217.178:20020",
         *         "clientId":"RexelLabDevice1@4e9c376c-eb28-4086-86e1-be6555d23430",
         *         "language":"JAVA",
         *         "version":313
         *     }
         * ]
         */
        JSONArray consumerArray = JSONArray.parseArray(value);

        /*
         * 輸入數據2:設備最后一次狀態
         * [
         *     {
         *         "time":"2020-07-21 16:08:14",
         *         "productKey":"a1B6t6ZG6oR",
         *         "deviceName":"QCHTestDevice1",
         *         "status":"offline"
         *     }
         * ]
         */
        JSONArray deviceArray = influxUtils.queryDeviceStatus(influxdb, database);

        // 檢查已經登錄過的設備
        for (int i = 0; i < deviceArray.size(); i++) {
            JSONObject deviceStatusJson = deviceArray.getJSONObject(i);
            String deviceName = deviceStatusJson.getString("deviceName");
            String productKey = deviceStatusJson.getString("productKey");
            String statusOld = deviceStatusJson.getString("status");

            // 獲取設備消費者
            JSONObject deviceConsumerJson = getDeviceConsumer(deviceName, consumerArray);

            // 計算設備狀態
            String statusNew = deviceConsumerJson == null ? Constants.OFFLINE : Constants.ONLINE;

            // 狀態未發生變化
            if (statusOld.equals(statusNew)) {
                continue;
            }

            // 生成設備狀態JSON
            JSONObject resultJson = new JSONObject();
            resultJson.put("status", statusNew);
            resultJson.put("productKey", productKey);
            resultJson.put("deviceName", deviceName);
            resultJson.put("time", CommonUtils.timeLongToStr(System.currentTimeMillis()));
            out.collect(resultJson.toString());
        }

        // 檢查新創建的設備
        for (int i = 0; i < consumerArray.size(); i++) {
            JSONObject consumerJson = consumerArray.getJSONObject(i);
            String clientId = consumerJson.getString("clientId");
            String deviceName = clientId.split("@")[0];

            // 檢查是否為新設備
            if (!isNewDevice(deviceName, deviceArray)) {
                continue;
            }

            // 生成設備狀態JSON
            JSONObject resultJson = new JSONObject();
            resultJson.put("status", Constants.ONLINE);
            resultJson.put("productKey", "DView");
            resultJson.put("deviceName", deviceName);
            resultJson.put("time", CommonUtils.timeLongToStr(System.currentTimeMillis()));
            out.collect(resultJson.toString());
        }
    }

    private boolean isNewDevice(String deviceName, JSONArray deviceArray) {
        for (int i = 0; i < deviceArray.size(); i++) {
            JSONObject deviceStatusJson = deviceArray.getJSONObject(i);
            String deviceNameTemp = deviceStatusJson.getString("deviceName");
            if (deviceName.equals(deviceNameTemp)) {
                return false;
            }
        }
        return true;
    }

    private JSONObject getDeviceConsumer(String deviceName, JSONArray consumerArray) {
        for (int i = 0; i < consumerArray.size(); i++) {
            JSONObject consumerJson = consumerArray.getJSONObject(i);
            String clientId = consumerJson.getString("clientId");
            if (clientId != null && clientId.startsWith(deviceName)) {
                return consumerJson;
            }
        }
        return null;
    }
}

 

類名:SinkToInfluxStatus

作用:將數據寫入到InfluxDB中

package com.rexel.stream.flink.sink;

import com.alibaba.fastjson.JSON;
import com.rexel.stream.pojo.DeviceStatus;
import com.rexel.stream.utils.InfluxUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.influxdb.InfluxDB;

/**
 * @ClassName: SinkToInfluxStatus
 * @Description: 設備狀態數據Sink To InfluxDB
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/7/28
 */
@Slf4j
public class SinkToInfluxStatus extends RichSinkFunction<String> {
    private InfluxUtils influxUtils = InfluxUtils.getInstance();
    private InfluxDB influxDb = null;
    private String database;
    private String url;
    private String username;
    private String password;

    public SinkToInfluxStatus(String url, String username, String password, String database) {
        this.url = url;
        this.username = username;
        this.password = password;
        this.database = database;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.influxDb = this.influxUtils.connect(url, username, password);
        this.influxDb.setDatabase(database);
    }

    @Override
    public void invoke(String value, Context context) {
        // 解析為DeviceBase對象
        DeviceStatus deviceStatus = JSON.parseObject(value, DeviceStatus.class);

        // 寫入InfluxDB
        this.influxUtils.write(this.influxDb, this.database, deviceStatus);
        log.debug("[------]deviceStatus=" + deviceStatus.toString());
    }

    @Override
    public void close() {
        if (this.influxDb != null) {
            this.influxDb.close();
        }
        this.influxDb = null;
    }
}

 

類名:StreamDeviceStatus

作用:Flink Job啟動類

Tips:啟動參數需要設置一個--input的參數,這個參數將指定一個配置文件

package com.rexel.stream.flink.job;

import com.alibaba.fastjson.JSONObject;
import com.rexel.stream.common.CheckPoint;
import com.rexel.stream.cons.Constants;
import com.rexel.stream.flink.proc.CheckDeviceStatus;
import com.rexel.stream.flink.sink.SinkToInfluxStatus;
import com.rexel.stream.flink.source.SourceConsumerConnection;
import com.rexel.stream.utils.CommonUtils;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 * @ClassName: StreamDeviceStatus
 * @Description: 監控設備狀態實時流
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/7/28
 */
@Slf4j
public class StreamDeviceStatus extends AbstractStream {

    /**
     * Job啟動
     *
     * @param args 啟動參數
     * @throws Exception e
     */
    public static void main(String[] args) throws Exception {
        // 參數檢查
        final ParameterTool params = ParameterTool.fromArgs(args);
        if (!params.has(Constants.INPUT_CONF)) {
            log.error("[------]parameter error.");
            return;
        }

        // 讀取配置文件
        String confStr = CommonUtils.readFile(params.get(Constants.INPUT_CONF), StandardCharsets.UTF_8);
        JSONObject confJson = JSONObject.parseObject(confStr);
        if (confJson == null) {
            log.error("[------]convert to json error.");
            return;
        }

        // 啟動實時處理流
        StreamDeviceStatus streamDeviceStatus = new StreamDeviceStatus(confJson);
        streamDeviceStatus.execute();
    }

    /**
     * 構造函數
     *
     * @param confJson 配置文件JSON
     */
    private StreamDeviceStatus(JSONObject confJson) {
        super(confJson);
    }

    /**
     * 實時流主邏輯
     *
     * @throws Exception e
     */
    private void execute() throws Exception {
        // 創建執行流上下文
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 檢查是否開啟CheckPoint
        if (confJson.getBoolean(Constants.JOB_CHECK_POINT)) {
            CheckPoint.setCheckPoint(env);
        }

        // 定義數據源
        SingleOutputStreamOperator<String> rocketSource =
            env.addSource(getRocketSource()).name("rmq_consumer").setParallelism(1);

        // 定義處理過程
        SingleOutputStreamOperator<String> statusStream =
            rocketSource.process(getCheckDeviceStatus()).name("check_status").setParallelism(1);

        // 定義輸出
        statusStream.addSink(getSinkInfluxStatus()).name("to_influx").setParallelism(1);

        // 作業流啟動
        JobExecutionResult result = env.execute("check_device_status");
        log.info("[------]execute. result=" + result.toString());
    }

    /**
     * 創建RocketSource(設備運行數據)
     *
     * @return RichParallelSourceFunction
     */
    private RichParallelSourceFunction<String> getRocketSource() {
        String namesrvAddr = confJson.getString(Constants.RMQ_NAMESRV_ADDR);
        String accessKey = confJson.getString(Constants.RMQ_ACCESS_KEY);
        String secretKey = confJson.getString(Constants.RMQ_SECRET_KEY);

        String consumerGroup = confJson.getString(Constants.RMQ_CG_DEVICE_NOTICE_DOWN);
        return new SourceConsumerConnection(namesrvAddr, consumerGroup, accessKey, secretKey);
    }

    /**
     * 創建處理過程(告警檢查)
     *
     * @return BroadcastProcessFunction
     */
    private ProcessFunction<String, String> getCheckDeviceStatus() {
        String url = confJson.getString(Constants.INFLUX_URL);
        String username = confJson.getString(Constants.INFLUX_USERNAME);
        String password = confJson.getString(Constants.INFLUX_PASSWORD);
        String database = confJson.getString(Constants.INFLUX_DATABASE);
        return new CheckDeviceStatus(database, url, username, password);
    }

    /**
     * 創建SinkToInflux(通用)
     *
     * @return RichSinkFunction
     */
    private RichSinkFunction<String> getSinkInfluxStatus() {
        String url = confJson.getString(Constants.INFLUX_URL);
        String username = confJson.getString(Constants.INFLUX_USERNAME);
        String password = confJson.getString(Constants.INFLUX_PASSWORD);
        String database = confJson.getString(Constants.INFLUX_DATABASE);
        return new SinkToInfluxStatus(url, username, password, database);
    }
}

 

類名:DeviceStatus

作用:InfluxDB表名device_status的java映射文件

package com.rexel.stream.pojo;

import com.rexel.stream.cons.Constants;
import com.rexel.stream.utils.CommonUtils;
import lombok.EqualsAndHashCode;
import lombok.Setter;
import org.influxdb.dto.Point;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName: DeviceStatus
 * @Description: 設備上下線狀態
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/2/27
 */
@EqualsAndHashCode(callSuper = true)
@Setter
public class DeviceStatus extends AbstractDeviceBase implements Serializable {

    /**
     * 設備狀態。
     * online:上線。
     * offline:離線。
     */
    private String status;
    /**
     * 設備所屬產品的唯一標識。
     */
    private String productKey;
    /**
     * 設備名稱。
     */
    private String deviceName;
    /**
     * 發送通知的時間點。
     */
    private String time;
    /**
     * 發送通知的UTC時間點。
     */
    private String utcTime;
    /**
     * 狀態變更前最后一次通信的時間。
     * 為避免消息時序紊亂造成影響,建議您根據lastTime來維護最終設備狀態。
     */
    private String lastTime;
    /**
     * 狀態變更前最后一次通信的UTC時間。
     */
    private String utcLastTime;
    /**
     * 設備公網出口IP。
     */
    private String clientIp;

    /**
     * 構造函數,初始化Measurement名稱
     */
    public DeviceStatus() {
        setMeasurement(Constants.DEVICE_STATUS);
    }

    /**
     * 生成Point列表
     *
     * @return Point列表
     */
    @Override
    public List<Point> getPointList() {
        List<Point> pointList = new ArrayList<>();

        long longTime = CommonUtils.timeStrToLong(String.valueOf(time));
        Point.Builder builder = Point.measurement(getMeasurement());
        builder.time(longTime, TimeUnit.MILLISECONDS);
        addTag(builder, "productKey", productKey);
        addTag(builder, "deviceName", deviceName);
        addField(builder, "status", status);
        addField(builder, "utcTime", utcTime);
        addField(builder, "lastTime", lastTime);
        addField(builder, "utcLastTime", utcLastTime);
        addField(builder, "clientIp", clientIp);
        pointList.add(builder.build());

        return pointList;
    }
}

 

類名:AbstractDeviceBase

作用:DeviceStatus的抽象父類,用來完成以下通用的處理(實際工程中還有好幾個表的映射對象)

package com.rexel.stream.pojo;

import com.rexel.stream.cons.Constants;
import com.rexel.stream.utils.CommonUtils;
import lombok.Data;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import java.io.Serializable;
import java.util.List;

/**
 * @ClassName: DeviceBase
 * @Description: 物聯網平台數據格式父類
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/2/27
 * <p>
 * 以下是InfluxDB的一些使用原則。供參考
 * ■建議的原則:
 * 如果字段是經常被作為檢索條件的元數據,設計為tag;
 * 如果字段經常要作為group by的參數,設計為tag;
 * 如果字段需要被用來放在influxQL的函數的參數,設計為field;
 * 如果出於某些考慮,字段不方便作為string類型保存,則應該使用field,因為tags總是以string類型來存儲。
 * ■不建議的原則:
 * 不要有太多的series
 * 不要在measurement的名字里攜帶數據
 * 不要在一個tag字段里存放多於一條信息
 * 不要使用influxQL的關鍵字作為字段名
 */
@Data
abstract public class AbstractDeviceBase implements Serializable {
    private String measurement = Constants.DEVICE_OTHER;

    /**
     * 獲取Point List
     *
     * @return Point List
     */
    abstract public List<Point> getPointList();

    /**
     * 生成BatchPoints
     *
     * @param dbName database
     * @return BatchPoints
     */
    public BatchPoints getBatchPoints(String dbName) {
        BatchPoints batchPoints = BatchPoints.database(
                dbName).retentionPolicy(null).consistency(ConsistencyLevel.ALL).build();

        List<Point> pointList = getPointList();
        if (pointList == null || pointList.size() <= 0) {
            return null;
        }
        pointList.forEach(batchPoints::point);

        return batchPoints;
    }

    /**
     * 創建Tag
     *
     * @param builder Builder
     * @param name tag名
     * @param value tag值
     */
    void addTag(Point.Builder builder, String name, String value) {
        if (value != null) {
            builder.tag(name, value);
        }
    }

    /**
     * 創建Field
     *
     * @param builder Builder
     * @param name field名
     * @param value field值
     */
    void addField(Point.Builder builder, String name, Object value) {
        if (value == null) {
            return;
        }

        if (CommonUtils.isNumber(value)) {
            builder.addField(name, CommonUtils.formatDouble(value));
        } else {
            builder.addField(name, String.valueOf(value));
        }
    }
}

 

類名:InfluxUtils

作用:InfluxDB的通用處理類

package com.rexel.stream.utils;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rexel.stream.pojo.AbstractDeviceBase;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;

import java.io.Serializable;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Result;
import org.influxdb.dto.QueryResult.Series;

/**
 * @ClassName: InfluxUtils
 * @Description: InfluxDB通用類
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/2/27
 */
@Slf4j
public class InfluxUtils implements Serializable {
    /**
     * 構造函數
     */
    private InfluxUtils() {
    }

    /**
     * 單例模式
     */
    private static class SingletonInstance {
        private static final InfluxUtils INSTANCE = new InfluxUtils();
    }

    /**
     * 獲取實例句柄
     */
    public static InfluxUtils getInstance() {
        return SingletonInstance.INSTANCE;
    }

    /**
     * InfluxDB連接
     *
     * @param url URL
     * @param userName 用戶名
     * @param password 密碼
     * @return InfluxDB句柄
     * @throws Exception e
     */
    public InfluxDB connect(String url, String userName, String password) throws Exception {
        OkHttpClient.Builder client =
            new OkHttpClient.Builder().readTimeout(60, TimeUnit.SECONDS);
        InfluxDB influxDb = InfluxDBFactory.connect(url, userName, password, client);
        if (influxDb == null) {
            throw new Exception("[---InfluxDBSink---]influxDB == null");
        }
        return influxDb;
    }

    /**
     * InfluxDB寫入
     *
     * @param influxDb InfluxDB句柄
     * @param dbName database
     * @param abstractDeviceBase 寫入對象
     */
    public void write(InfluxDB influxDb, String dbName, AbstractDeviceBase abstractDeviceBase) {
        BatchPoints batchPoints = abstractDeviceBase.getBatchPoints(dbName);
        if (batchPoints == null) {
            log.error("[------]batchPoints == null");
            return;
        }

        if (batchPoints.getPoints().size() <= 0) {
            log.error("[------]point size == 0");
            return;
        }

        influxDb.write(batchPoints);
        log.debug("[------]batchPoints=" + batchPoints.toString());
    }

    /**
     * 檢索設備狀態
     *
     * @param influxDb InfluxDB句柄
     * @param database database
     * @return JSONArray
     */
    public JSONArray queryDeviceStatus(InfluxDB influxDb, String database) {
        String sql = " select "
            + " last(status) as status, "
            + " productKey, "
            + " deviceName "
            + " from device_status "
            + " group by productKey, deviceName "
            + " tz('Asia/Shanghai') ";

        QueryResult queryResult = influxDb.query(new Query(sql, database));
        return convert(queryResult);
    }

    /**
     * 轉換QueryResult
     *
     * @param queryResult QueryResult
     * @return JSONArray
     */
    private JSONArray convert(QueryResult queryResult) {
        JSONArray jsonArray = new JSONArray();
        List<Result> results = queryResult.getResults();
        for (Result result : results) {
            List<Series> seriesList = result.getSeries();
            if (seriesList == null) {
                continue;
            }
            for (Series series : seriesList) {
                List<List<Object>> valuesList = series.getValues();
                if (valuesList == null) {
                    continue;
                }
                for (List<Object> values : valuesList) {
                    List<String> columns = series.getColumns();
                    JSONObject jsonObject = new JSONObject();
                    for (int i = 0; i < columns.size(); i++) {
                        String column = columns.get(i);
                        Object value = values.get(i);
                        if (value != null) {
                            jsonObject.put(column, value);
                        }
                    }
                    jsonArray.add(jsonObject);
                }
            }
        }
        return jsonArray;
    }
}

 

類名:RocketUtils

作用:RocketMQ的通用處理類

package com.rexel.stream.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

/**
 * @ClassName: RocketUtils
 * @Description: RocketMQ通用類
 * @Author: chunhui.qu@rexel.com.cn
 * @Date: 2020/2/27
 */
@Slf4j
public class RocketUtils implements Serializable {
    private static Map<String, DefaultMQProducer> producerMap = new HashMap<>();

    /**
     * 構造函數
     */
    private RocketUtils() {
    }

    /**
     * 單例模式
     */
    private static class SingletonInstance {
        private static final RocketUtils INSTANCE = new RocketUtils();
    }

    /**
     * 獲取實例句柄
     */
    public static RocketUtils getInstance() {
        return SingletonInstance.INSTANCE;
    }

    /**
     * 創建DefaultMQAdminExt
     *
     * @param namesrvAddr namesrvAddr
     * @param accessKey accessKey
     * @param secretKey secretKey
     * @return DefaultMQAdminExt
     */
    public DefaultMQAdminExt createAdminExt(String namesrvAddr, String accessKey, String secretKey) {
        RPCHook rpcHook = getAclRpcHook(accessKey, secretKey);
        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook, 300000);
        adminExt.setNamesrvAddr(namesrvAddr);
        adminExt.setAdminExtGroup(UUID.randomUUID().toString());
        try {
            adminExt.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }
        return adminExt;
    }

    /**
     * 創建消費者
     *
     * @param namesrvAddr namesrvAddr
     * @param topic topic
     * @param group 消費組
     * @param accessKey accessKey
     * @param secretKey secretKey
     * @return 消費者句柄
     */
    public DefaultMQPushConsumer createConsumer(
        String namesrvAddr, String topic, String group, String accessKey, String secretKey) {
        RPCHook rpcHook = getAclRpcHook(accessKey, secretKey);
        DefaultMQPushConsumer consumer =
            new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setVipChannelEnabled(false);
        consumer.setConsumeTimeout(180000);
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        try {
            consumer.subscribe(topic, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }
        return consumer;
    }

    /**
     * 創建生產者
     *
     * @param nameSrvAddr NamesrvAddr
     * @param group 生產組
     * @param accessKey accessKey
     * @param secretKey secretKey
     * @return 生產者句柄
     */
    public DefaultMQProducer createProducer(
        String nameSrvAddr, String group, String accessKey, String secretKey) {
        String key = nameSrvAddr + group;
        if (producerMap.containsKey(key)) {
            return producerMap.get(key);
        }

        RPCHook rpcHook = getAclRpcHook(accessKey, secretKey);
        DefaultMQProducer producer = new DefaultMQProducer(group, rpcHook);
        producer.setNamesrvAddr(nameSrvAddr);
        producer.setSendMessageWithVIPChannel(false);
        producer.setSendMsgTimeout(5000);
        producer.setInstanceName(UUID.randomUUID().toString());
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }

        producerMap.put(key, producer);
        return producer;
    }

    /**
     * 生產數據(異步)
     *
     * @param producer 生產者句柄
     * @param msg 消息
     * @param callback 回調函數
     */
    public void sendAsync(DefaultMQProducer producer, Message msg, SendCallback callback) {
        try {
            producer.send(msg, callback);
        } catch (MQClientException | RemotingException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 生產數據(同步)
     *
     * @param producer 生產者句柄
     * @param msg 消息
     * @return 發送結果
     */
    public SendResult send(DefaultMQProducer producer, Message msg) {
        try {
            return producer.send(msg);
        } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 設置ACL權限
     *
     * @param accessKey accessKey
     * @param secretKey secretKey
     * @return RPCHook
     */
    private RPCHook getAclRpcHook(String accessKey, String secretKey) {
        return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
    }
}

 

啟動文件:

{
  "job.name": "xxx",
  "job.checkpoint": false,
  "influxdb.url": "http://xxx:8100",
  "influxdb.username": "xxx",
  "influxdb.password": "xxx",
  "influxdb.database": "xxx",
  "rocketmq.namesrvAddr": "xxx",
  "rocketmq.acl.accessKey": "xxx",
  "rocketmq.acl.secretKey": "xxx",
  "rocketmq.device.consumerGroup.notice.down": "xxx"
}

 

3、處理結果

以下是InfluxDB中的結果數據。

 

--END--

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM