將Flink計算完畢后的數據Sink到Nebula


Flink是目前流計算的隱形王者,在國際國內有龐大的擁躉。

Nebula是國產圖數據庫的后起之秀,在DBEngines中排名也逐年上升。

將兩者進行結合,可以產生很多應用場景:比如實時計算服務鏈路調用關系並將結果存到Nebula中、實時計算業務訪問風控情況並將結果存到Nebula中、實時計算預警發生情況並將結果存到Nebula中等。

將Flink計算完畢后的結果,Sink到Nebula,Nebula官方提供了一個Flink Connector,但是很不易用。

筆者根據項目實際應用情況,寫了一個更簡潔直接的Sink,作為拋磚引玉,希望與各位Flink及Nebula愛好者共同交流。

一、NebulaUtil

由於Nebula提供的Java Client是非線程安全的,所以我們首先封裝一個單例的NebulaUtil,主要代碼如下:

import lombok.val;
import lombok.var;
/**
 * Nebula工具類
 */
public class NebulaUtil {
    // Nebula會話
    private Session session = null;
    // Nebula連接池
    private NebulaPool pool = new NebulaPool();/**
     * 獲得Nebula工具類單例
     *
     * @return NebulaUtil
     */
    public static NebulaUtil getInstance() {
        return NebulaUtilHolder.instance;
    }

    /**
     * 執行NGQL
     *
     * @param nGQL NGQL
     * @return 返回執行結果
     */
    public ResultSet execute(String nGQL) {
        try {
            if (session != null) {
                return session.execute(nGQL);
            }
        } catch (IOErrorException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return null;
    }
   /**
     * 釋放會話
     */
    public void releaseSession() {
        // 釋放連接
        if (session != null) {
            session.release();
        }

        // 關閉連接池
        pool.close();
    }

    private static class NebulaUtilHolder {
        private static final NebulaUtil instance = new NebulaUtil();
    }

    private NebulaUtil() {
        initSession();
    }

    /**
     * 初始化會話
     */
    private void initSession() {// 連接地址,多個間用逗號“,”隔開
        val host = "127.0.0.1";
        val port = 9669;
        val user = "user";
        val password = "password";
        val space = "MySpace";

        var nebulaPoolConfig = new NebulaPoolConfig();
        nebulaPoolConfig.setMaxConnSize(100);

        var hostAddressList = new ArrayList<HostAddress>();

        val hostArray = host.split(",");

        for (val hostAddress : hostArray) {
            hostAddressList.add(new HostAddress(hostAddress, port));
        }

        try {
            pool.init(hostAddressList, nebulaPoolConfig);
        } catch (UnknownHostException e) {
           e.printStackTrace();
        }

        try {
            session = pool.getSession(user, password, false);
        } catch (NotValidConnectionException e) {
            e.printStackTrace();
        } catch (IOErrorException e) {
           e.printStackTrace();
        } catch (AuthFailedException e) {
            e.printStackTrace();
        }

        // 切換圖空間
        val resp = execute(String.format("USE %s;", space));

        if (resp == null || !resp.isSucceeded()) {
            System.out.println("切換圖空間失敗!" + space);
        }
    }
}

 

二、NebulaSink

有了NebulaUtil,實現NebulaSink就非常簡單了,每個方法里只有幾行代碼:

import lombok.val;

/** * Sink到Nebula數據庫 */ public class NebulaSink extends RichSinkFunction<List<String>> { /** * 打開連接 * * @param parameters 配置參數 */ @Override public void open(Configuration parameters) { } /** * 調用 * * @param nGQLList NGQL列表 * @param context 上下文 */ @Override public void invoke(List<String> nGQLList, Context context) { for (val nGQL : nGQLList) { NebulaUtil.getInstance().execute(nGQL); } } /** * 關閉連接 */ @Override public void close() throws Exception { super.close(); NebulaUtil.getInstance().releaseSession(); } }

 

三、將Vertex及Edge數據組裝成NGQL語句

有了NebulaUtil以及NebulaSink后,Sink到Nebula之前,我們主要的工作就是將Vertex及Edge數據,組裝對應的NGQL語句即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM