Flink 異步IO訪問外部數據(mysql篇)


  接上篇:【翻譯】Flink 異步I / O訪問外部數據

  最近看了大佬的博客,突然想起Async I/O方式是Blink 推給社區的一大重要功能,可以使用異步的方式獲取外部數據,想着自己實現以下,項目上用的時候,可以不用現去找了。

  最開始想用scala 實現一個讀取 hbase數據的demo,參照官網demo:  

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

失敗了,上圖標紅的部分實現不了

1、Future 找不到可以用的實現類

2、unorderedWait 一直報錯

源碼example 里面也有Scala 的案例

def main(args: Array[String]) {
    val timeout = 10000L

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val input = env.addSource(new SimpleSource())

    val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) { (input, collector: ResultFuture[Int]) => Future { collector.complete(Seq(input)) } (ExecutionContext.global)
    }

    asyncMapped.print()

    env.execute("Async I/O job")
  }

主要部分是這樣的,菜雞表示無力,想繼承RichAsyncFunction,可以使用open 方法初始化鏈接。

網上博客翻了不少,大部分是翻譯官網的原理,案例也沒有可以執行的,苦惱。

失敗了。

轉為java版本的,昨天在群里問,有個大佬給我個Java版本的: https://github.com/perkinls/flink-local-train/blob/c8b4efe33620352aea0100adef4fae2a068a3b65/src/main/scala/com/lp/test/asyncio/AsyncIoSideTableJoinMysqlJava.java 還沒看過,因為Java版的官網的案例能看懂。

下面開始上mysql 版本 的 源碼(hbase 的還沒測試過,本機的hbase 掛了):

業務如下:

接收kafka數據,轉為user對象,調用async,使用user.id 查詢對應的phone,放回user對象,輸出

 主類:

import com.alibaba.fastjson.JSON;
import com.venn.common.Common;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.concurrent.TimeUnit;


public class AsyncMysqlRequest {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer<ObjectNode> source = new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp());

        // 接收kafka數據,轉為User 對象
        DataStream<User> input = env.addSource(source).map(value -> {
            String id = value.get("id").asText();
            String username = value.get("username").asText();
            String password = value.get("password").asText();

            return new User(id, username, password);
        });
        // 異步IO 獲取mysql數據, timeout 時間 1s,容量 10(超過10個請求,會反壓上游節點)
        DataStream async = AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 10);

        async.map(user -> {

            return JSON.toJSON(user).toString();
        })
        .print();

        env.execute("asyncForMysql");

    }
}

函數類:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

public class AsyncFunctionForMysqlJava extends RichAsyncFunction<AsyncUser, AsyncUser> {


    Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);
    private transient MysqlClient client;
    private transient ExecutorService executorService;

    /**
     * open 方法中初始化鏈接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        logger.info("async function for mysql java open ...");
        super.open(parameters);

        client = new MysqlClient();
        executorService = Executors.newFixedThreadPool(30);
    }

    /**
     * use asyncUser.getId async get asyncUser phone
     *
     * @param asyncUser
     * @param resultFuture
     * @throws Exception
     */
    @Override
    public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {

        executorService.submit(() -> {
            // submit query
            System.out.println("submit query : " + asyncUser.getId() + "-1-" + System.currentTimeMillis());
            AsyncUser tmp = client.query1(asyncUser);
            // 一定要記得放回 resultFuture,不然數據全部是timeout 的
            resultFuture.complete(Collections.singletonList(tmp));
        });
    }

    @Override
    public void timeout(AsyncUser input, ResultFuture<AsyncUser> resultFuture) throws Exception {
        logger.warn("Async function for hbase timeout");
        List<AsyncUser> list = new ArrayList();
        input.setPhone("timeout");
        list.add(input);
        resultFuture.complete(list);
    }

    /**
     * close function
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        logger.info("async function for mysql java close ...");
        super.close();
    }
}

MysqlClient:

import com.venn.flink.util.MathUtil;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultEventLoop;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.SucceededFuture;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class MysqlClient {

    private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true";
    private static String username = "root";
    private static String password = "123456";
    private static String driverName = "com.mysql.jdbc.Driver";
    private static java.sql.Connection conn;
    private static PreparedStatement ps;

    static {
        try {
            Class.forName(driverName);
            conn = DriverManager.getConnection(jdbcUrl, username, password);
            ps = conn.prepareStatement("select phone from async.async_test where id = ?");
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * execute query
     *
     * @param user
     * @return
     */
    public AsyncUser query1(AsyncUser user) {

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        String phone = "0000";
        try {
            ps.setString(1, user.getId());
            ResultSet rs = ps.executeQuery();
            if (!rs.isClosed() && rs.next()) {
                phone = rs.getString(1);
            }
            System.out.println("execute query : " + user.getId() + "-2-" + "phone : " + phone + "-" + System.currentTimeMillis());
        } catch (SQLException e) {
            e.printStackTrace();
        }
        user.setPhone(phone);
        return user;

    }

    // 測試代碼
    public static void main(String[] args) {
        MysqlClient mysqlClient = new MysqlClient();

        AsyncUser asyncUser = new AsyncUser();
        asyncUser.setId("526");
        long start = System.currentTimeMillis();
        asyncUser = mysqlClient.query1(asyncUser);

        System.out.println("end : " + (System.currentTimeMillis() - start));
        System.out.println(asyncUser.toString());
    }
}

 

函數類(錯誤示范:asyncInvoke 方法中阻塞查詢數據庫,是同步的):

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;

public class AsyncFunctionForMysqlJava extends RichAsyncFunction<User, User> {

    // 鏈接
    private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false";
    private static String username = "root";
    private static String password = "123456";
    private static String driverName = "com.mysql.jdbc.Driver";


    java.sql.Connection conn;
    PreparedStatement ps;
    Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);

    /**
     * open 方法中初始化鏈接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        logger.info("async function for hbase java open ...");
        super.open(parameters);

        Class.forName(driverName);
        conn = DriverManager.getConnection(jdbcUrl, username, password);
        ps = conn.prepareStatement("select phone from async.async_test where id = ?");
    }

    /**
     * use user.getId async get user phone
     *
     * @param user
     * @param resultFuture
     * @throws Exception
     */
    @Override
    public void asyncInvoke(User user, ResultFuture<User> resultFuture) throws Exception {
        // 使用 user id 查詢
        ps.setString(1, user.getId());
        ResultSet rs = ps.executeQuery(); String phone = null; if (rs.next()) { phone = rs.getString(1); }
        user.setPhone(phone);
        List<User> list = new ArrayList();
        list.add(user);
        // 放回 result 隊列
 resultFuture.complete(list);
    }

    @Override
    public void timeout(User input, ResultFuture<User> resultFuture) throws Exception {
        logger.info("Async function for hbase timeout");
        List<User> list = new ArrayList();
        list.add(input);
        resultFuture.complete(list);
    }

    /**
     * close function
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        logger.info("async function for hbase java close ...");
        super.close();
        conn.close();
    }
}

測試數據如下:

{"id" : 1, "username" : "venn", "password" : 1561709530935}
{"id" : 2, "username" : "venn", "password" : 1561709536029}
{"id" : 3, "username" : "venn", "password" : 1561709541033}
{"id" : 4, "username" : "venn", "password" : 1561709546037}
{"id" : 5, "username" : "venn", "password" : 1561709551040}
{"id" : 6, "username" : "venn", "password" : 1561709556044}
{"id" : 7, "username" : "venn", "password" : 1561709561048}

執行結果如下:

submit query : 1-1-1562763486845
submit query : 2-1-1562763486846
submit query : 3-1-1562763486846
submit query : 4-1-1562763486849
submit query : 5-1-1562763486849
submit query : 6-1-1562763486859
submit query : 7-1-1562763486913
submit query : 8-1-1562763486967
submit query : 9-1-1562763487021
execute query : 1-2-phone : 12345678910-1562763487316
1> {"password":"1562763486506","phone":"12345678910","id":"1","username":"venn"}  
submit query : 10-1-1562763487408
submit query : 11-1-1562763487408
execute query : 9-2-phone : 1562661110630-1562763487633
1> {"password":"1562763487017","phone":"1562661110630","id":"9","username":"venn"}  # 這里可以看到異步,提交查詢的到 11 了,執行查詢 的只有 1/9,返回了 1/9(unorderedWait 調用)
submit query : 12-1-1562763487634
execute query : 8-2-phone : 1562661110627-1562763487932
1> {"password":"1562763486963","phone":"1562661110627","id":"8","username":"venn"}
submit query : 13-1-1562763487933
execute query : 7-2-phone : 1562661110624-1562763488228
1> {"password":"1562763486909","phone":"1562661110624","id":"7","username":"venn"}
submit query : 14-1-1562763488230
execute query : 6-2-phone : 1562661110622-1562763488526
1> {"password":"1562763486855","phone":"1562661110622","id":"6","username":"venn"}
submit query : 15-1-1562763488527
execute query : 4-2-phone : 12345678913-1562763488832
1> {"password":"1562763486748","phone":"12345678913","id":"4","username":"venn"}

 

hbase、redis或其他實現類似

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM