最近看了大佬的博客,突然想起Async I/O方式是Blink 推給社區的一大重要功能,可以使用異步的方式獲取外部數據,想着自己實現以下,項目上用的時候,可以不用現去找了。
最開始想用scala 實現一個讀取 hbase數據的demo,參照官網demo:
/** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** The context used for the future callbacks */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { // issue the asynchronous request, receive a future for the result val resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future resultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } } } // create the original stream val stream: DataStream[String] = ... // apply the async I/O transformation val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
失敗了,上圖標紅的部分實現不了
1、Future 找不到可以用的實現類
2、unorderedWait 一直報錯
源碼example 里面也有Scala 的案例
def main(args: Array[String]) { val timeout = 10000L val env = StreamExecutionEnvironment.getExecutionEnvironment val input = env.addSource(new SimpleSource()) val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) { (input, collector: ResultFuture[Int]) => Future { collector.complete(Seq(input)) } (ExecutionContext.global) } asyncMapped.print() env.execute("Async I/O job") }
主要部分是這樣的,菜雞表示無力,想繼承RichAsyncFunction,可以使用open 方法初始化鏈接。
網上博客翻了不少,大部分是翻譯官網的原理,案例也沒有可以執行的,苦惱。
失敗了。
轉為java版本的,昨天在群里問,有個大佬給我個Java版本的: https://github.com/perkinls/flink-local-train/blob/c8b4efe33620352aea0100adef4fae2a068a3b65/src/main/scala/com/lp/test/asyncio/AsyncIoSideTableJoinMysqlJava.java 還沒看過,因為Java版的官網的案例能看懂。
下面開始上mysql 版本 的 源碼(hbase 的還沒測試過,本機的hbase 掛了):
業務如下:
接收kafka數據,轉為user對象,調用async,使用user.id 查詢對應的phone,放回user對象,輸出
主類:
import com.alibaba.fastjson.JSON; import com.venn.common.Common; import org.apache.flink.formats.json.JsonNodeDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.concurrent.TimeUnit; public class AsyncMysqlRequest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer<ObjectNode> source = new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp()); // 接收kafka數據,轉為User 對象 DataStream<User> input = env.addSource(source).map(value -> { String id = value.get("id").asText(); String username = value.get("username").asText(); String password = value.get("password").asText(); return new User(id, username, password); }); // 異步IO 獲取mysql數據, timeout 時間 1s,容量 10(超過10個請求,會反壓上游節點) DataStream async = AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 10); async.map(user -> { return JSON.toJSON(user).toString(); }) .print(); env.execute("asyncForMysql"); } }
函數類:
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.*; public class AsyncFunctionForMysqlJava extends RichAsyncFunction<AsyncUser, AsyncUser> { Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class); private transient MysqlClient client; private transient ExecutorService executorService; /** * open 方法中初始化鏈接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { logger.info("async function for mysql java open ..."); super.open(parameters); client = new MysqlClient(); executorService = Executors.newFixedThreadPool(30); } /** * use asyncUser.getId async get asyncUser phone * * @param asyncUser * @param resultFuture * @throws Exception */ @Override public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception { executorService.submit(() -> { // submit query System.out.println("submit query : " + asyncUser.getId() + "-1-" + System.currentTimeMillis()); AsyncUser tmp = client.query1(asyncUser); // 一定要記得放回 resultFuture,不然數據全部是timeout 的 resultFuture.complete(Collections.singletonList(tmp)); }); } @Override public void timeout(AsyncUser input, ResultFuture<AsyncUser> resultFuture) throws Exception { logger.warn("Async function for hbase timeout"); List<AsyncUser> list = new ArrayList(); input.setPhone("timeout"); list.add(input); resultFuture.complete(list); } /** * close function * * @throws Exception */ @Override public void close() throws Exception { logger.info("async function for mysql java close ..."); super.close(); } }
MysqlClient:
import com.venn.flink.util.MathUtil; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultEventLoop; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.SucceededFuture; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class MysqlClient { private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true"; private static String username = "root"; private static String password = "123456"; private static String driverName = "com.mysql.jdbc.Driver"; private static java.sql.Connection conn; private static PreparedStatement ps; static { try { Class.forName(driverName); conn = DriverManager.getConnection(jdbcUrl, username, password); ps = conn.prepareStatement("select phone from async.async_test where id = ?"); } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); } } /** * execute query * * @param user * @return */ public AsyncUser query1(AsyncUser user) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } String phone = "0000"; try { ps.setString(1, user.getId()); ResultSet rs = ps.executeQuery(); if (!rs.isClosed() && rs.next()) { phone = rs.getString(1); } System.out.println("execute query : " + user.getId() + "-2-" + "phone : " + phone + "-" + System.currentTimeMillis()); } catch (SQLException e) { e.printStackTrace(); } user.setPhone(phone); return user; } // 測試代碼 public static void main(String[] args) { MysqlClient mysqlClient = new MysqlClient(); AsyncUser asyncUser = new AsyncUser(); asyncUser.setId("526"); long start = System.currentTimeMillis(); asyncUser = mysqlClient.query1(asyncUser); System.out.println("end : " + (System.currentTimeMillis() - start)); System.out.println(asyncUser.toString()); } }
函數類(錯誤示范:asyncInvoke 方法中阻塞查詢數據庫,是同步的):
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; public class AsyncFunctionForMysqlJava extends RichAsyncFunction<User, User> { // 鏈接 private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false"; private static String username = "root"; private static String password = "123456"; private static String driverName = "com.mysql.jdbc.Driver"; java.sql.Connection conn; PreparedStatement ps; Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class); /** * open 方法中初始化鏈接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { logger.info("async function for hbase java open ..."); super.open(parameters); Class.forName(driverName); conn = DriverManager.getConnection(jdbcUrl, username, password); ps = conn.prepareStatement("select phone from async.async_test where id = ?"); } /** * use user.getId async get user phone * * @param user * @param resultFuture * @throws Exception */ @Override public void asyncInvoke(User user, ResultFuture<User> resultFuture) throws Exception { // 使用 user id 查詢 ps.setString(1, user.getId()); ResultSet rs = ps.executeQuery(); String phone = null; if (rs.next()) { phone = rs.getString(1); } user.setPhone(phone); List<User> list = new ArrayList(); list.add(user); // 放回 result 隊列 resultFuture.complete(list); } @Override public void timeout(User input, ResultFuture<User> resultFuture) throws Exception { logger.info("Async function for hbase timeout"); List<User> list = new ArrayList(); list.add(input); resultFuture.complete(list); } /** * close function * * @throws Exception */ @Override public void close() throws Exception { logger.info("async function for hbase java close ..."); super.close(); conn.close(); } }
測試數據如下:
{"id" : 1, "username" : "venn", "password" : 1561709530935} {"id" : 2, "username" : "venn", "password" : 1561709536029} {"id" : 3, "username" : "venn", "password" : 1561709541033} {"id" : 4, "username" : "venn", "password" : 1561709546037} {"id" : 5, "username" : "venn", "password" : 1561709551040} {"id" : 6, "username" : "venn", "password" : 1561709556044} {"id" : 7, "username" : "venn", "password" : 1561709561048}
執行結果如下:
submit query : 1-1-1562763486845 submit query : 2-1-1562763486846 submit query : 3-1-1562763486846 submit query : 4-1-1562763486849 submit query : 5-1-1562763486849 submit query : 6-1-1562763486859 submit query : 7-1-1562763486913 submit query : 8-1-1562763486967 submit query : 9-1-1562763487021 execute query : 1-2-phone : 12345678910-1562763487316 1> {"password":"1562763486506","phone":"12345678910","id":"1","username":"venn"} submit query : 10-1-1562763487408 submit query : 11-1-1562763487408 execute query : 9-2-phone : 1562661110630-1562763487633 1> {"password":"1562763487017","phone":"1562661110630","id":"9","username":"venn"} # 這里可以看到異步,提交查詢的到 11 了,執行查詢 的只有 1/9,返回了 1/9(unorderedWait 調用) submit query : 12-1-1562763487634 execute query : 8-2-phone : 1562661110627-1562763487932 1> {"password":"1562763486963","phone":"1562661110627","id":"8","username":"venn"} submit query : 13-1-1562763487933 execute query : 7-2-phone : 1562661110624-1562763488228 1> {"password":"1562763486909","phone":"1562661110624","id":"7","username":"venn"} submit query : 14-1-1562763488230 execute query : 6-2-phone : 1562661110622-1562763488526 1> {"password":"1562763486855","phone":"1562661110622","id":"6","username":"venn"} submit query : 15-1-1562763488527 execute query : 4-2-phone : 12345678913-1562763488832 1> {"password":"1562763486748","phone":"12345678913","id":"4","username":"venn"}
hbase、redis或其他實現類似
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文