1 練習講解(此處自己沒跑通,以后debug)
題目見flink---實時項目---day02
kafka中的數據,見day02的文檔
GeoUtils

package cn._51doit.flink.day07; import ch.hsr.geohash.GeoHash; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.lang3.StringUtils; import redis.clients.jedis.Jedis; import java.util.ArrayList; public class GeoUtils { public static final String key = "9833dc1de6e628e1b8e244710783a31d"; public static String[] getAreaInfoByLongitudeAndLatitude(HttpClient httpClient, Jedis jedis, double longitude, double latitude) { String province = ""; String city = ""; String district = ""; String bizNames = ""; //將經緯度使用GEOHash進行編碼 try { GeoHash geoHash = GeoHash.withCharacterPrecision(latitude, longitude, 8); String base32Code = geoHash.toBase32(); //根據geoHash的編碼,到Redis中進行查找 String areaInfo = jedis.get(base32Code); //{wx4sqk42 -> 省,市,區|商圈1,商圈2} //如果有就關聯地理信息和商圈信息 if (areaInfo != null) { String[] fields = areaInfo.split("[|]"); String area = fields[0]; //判斷是否有商圈信息 if (fields.length > 1) { bizNames = fields[1]; } String[] pcd = area.split(","); province = pcd[0]; city = pcd[1]; district = pcd[2]; } else { //如果沒有查找到,就通過httpclient請求高德的API //通過外網調用高德的API //構造一個get對象 GetMethod getMethod = new GetMethod("https://restapi.amap.com/v3/geocode/regeo?key="+ key +"&location=" + longitude + "," + latitude); //發送請求 int status = httpClient.executeMethod(getMethod); if (status == 200) { //獲取請求的json字符串 String jsonStr = getMethod.getResponseBodyAsString(); //轉成json對象 JSONObject jsonObj = JSON.parseObject(jsonStr); //獲取位置信息 JSONObject regeocode = jsonObj.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //獲取省市區、商圈信息 province = address.getString("province"); city = address.getString("city"); district = address.getString("district"); ArrayList<String> lb = new ArrayList<>(); //商圈數組(多個) JSONArray businessAreas = address.getJSONArray("businessAreas"); for (int i = 0; i < businessAreas.size(); i++) { JSONObject businessArea = null; try { businessAreas.getJSONObject(i); } catch (Exception e) { //e.printStackTrace(); } if (businessArea != null) { String businessName = businessArea.getString("name"); String longitudeAndLatitude = businessArea.getString("location"); String[] fds = longitudeAndLatitude.split(","); lb.add(businessName); //將商圈的經緯度使用GeoHash進行編碼 GeoHash geohash = GeoHash.withCharacterPrecision(Double.parseDouble(fds[1]), Double.parseDouble(fds[0]), 8); //將地理位置信息和商圈信息更新到Redis中 //更新完善本地的商圈知識庫 //將查詢處理的商圈的實際信息保存到Redis jedis.set(geohash.toBase32(), province + "," + city + "," + district + "|" + businessName); } } bizNames = StringUtils.join(lb.toArray(), ","); jedis.set(base32Code, province + "," + city + "," + district + "|" + bizNames); } } } } catch (Exception e) { e.printStackTrace(); } return new String[]{province, city, district, bizNames}; } }
ToJSONMapFunction

package cn._51doit.flink.day06; import cn._51doit.flink.Utils.GeoUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HttpClient; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import redis.clients.jedis.Jedis; import java.util.HashMap; public class ToJSONMapFunction extends RichMapFunction<String, LogBean> { private transient Jedis jedis = null; private transient HttpClient httpClient = null; @Override public void open(Configuration parameters) throws Exception { //獲取一個HTTP連接、Redis的連接 ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); //獲取全局的參數 String host = params.getRequired("redis.host"); String password = params.getRequired("redis.password"); int db = params.getInt("redis.db", 0); jedis = new Jedis(host, 6379, 5000); jedis.auth(password); jedis.select(db); // 訪問高德地圖API httpClient = new HttpClient(); } @Override public LogBean map(String line) throws Exception { if(!jedis.isConnected()){ jedis.connect(); } LogBean logBean = null; try { String[] fields = line.split("-->"); String dateTime = fields[0]; String dt = dateTime.split(" ")[0]; String json = fields[1]; //使用FastJSON解析 JSONObject jsonObj = JSON.parseObject(json); JSONObject uObj = jsonObj.getJSONObject("u"); JSONObject phoneObj = uObj.getJSONObject("phone"); JSONObject locObj = uObj.getJSONObject("loc"); JSONObject appObj = uObj.getJSONObject("app"); // 取出user對象中的扁平字段 String id = uObj.getString("id"); String account = uObj.getString("account"); String sessionId = uObj.getString("sessionId"); // 取出手機設備信息 String imei = phoneObj.getString("imei"); String osName = phoneObj.getString("osName"); String osVer = phoneObj.getString("osVer"); String resolution = phoneObj.getString("resolution"); String androidId = phoneObj.getString("androidId"); String manufacture = phoneObj.getString("manufacture"); String deviceId = phoneObj.getString("deviceId"); // 取出loc位置信息 String areacode = locObj.getString("areacode"); double longtitude = locObj.getDouble("longtitude"); double latitude = locObj.getDouble("latitude"); //根據經緯度查找省份、市、商圈 String[] areaInfo = GeoUtils.getAreaInfoByLongitudeAndLatitude(httpClient, jedis, longtitude, latitude); String province = areaInfo[0]; String city = areaInfo[1]; String district = areaInfo[2]; String bizNames = areaInfo[3]; String carrier = locObj.getString("carrier"); String netType = locObj.getString("netType"); String cid_sn = locObj.getString("cid_sn"); String ip = locObj.getString("ip"); // 取出app各個字段 String appid = appObj.getString("appid"); String appVer = appObj.getString("appVer"); String release_ch = appObj.getString("release_ch"); String promotion_ch = appObj.getString("promotion_ch"); //事件類型 String logType = jsonObj.getString("logType"); //提交時間 long commit_time = jsonObj.getLong("commit_time"); JSONObject eventObj = jsonObj.getJSONObject("event"); // 構造一個用於裝event數據的hashmap HashMap<String, String> eventMap = new HashMap<>(); // 迭代取出event中每一對kv for (String k : eventObj.keySet()) { String v = eventObj.getString(k); // 添加到hashmap中 eventMap.put(k, v); } // 組裝數據並返回 logBean = new LogBean(id, account, sessionId, imei, osName, osVer, resolution, androidId, manufacture, deviceId, areacode, longtitude, latitude, province, city, district, bizNames, carrier, netType, cid_sn, ip, appid, appVer, release_ch, promotion_ch, logType, commit_time, dt, eventMap ); } catch (Exception e) { e.printStackTrace(); } return logBean; } @Override public void close() throws Exception { //關閉Http連接和Redis連接 jedis.close(); httpClient = null; } }
PreETLAndTopicSplit

package cn._51doit.flink.day07; import cn._51doit.flink.Utils.FlinkUtils; import cn._51doit.flink.day06.KafkaStringSerializationSchema; import cn._51doit.flink.day06.LogBean; import cn._51doit.flink.day06.ToJSONMapFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.time.ZoneId; import java.util.Properties; public class PreETLAndTopicSplit { public static void main(String[] args) throws Exception { // 1 獲取kafka中的數據 ParameterTool parameters = ParameterTool.fromPropertiesFile("E:\\flink\\conf.properties"); DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class); //設置全局的參數,main方法中的參數就能在Task的實例中使用了 FlinkUtils.env.getConfig().setGlobalJobParameters(parameters); // 過濾數據 SingleOutputStreamOperator<LogBean> beanDataStream = lines.map(new ToJSONMapFunction()); SingleOutputStreamOperator<LogBean> filtered = beanDataStream.filter(new FilterFunction<LogBean>() { @Override public boolean filter(LogBean bean) { return bean != null; } }); // 將數據進行拆分===>使用測流輸出 //原來使用split方式,再select,現在使用側流輸出 //流量的Tag OutputTag<LogBean> flowOutputTag = new OutputTag<LogBean>("flow-output") {}; //活動的Tag OutputTag<LogBean> activityOutputTag = new OutputTag<LogBean>("activity-output") {}; SingleOutputStreamOperator<LogBean> mainStream = filtered.process(new ProcessFunction<LogBean, LogBean>() { @Override public void processElement(LogBean bean, Context ctx, Collector<LogBean> out) { //根據數據所攜帶的具有類型進行判斷 String logType = bean.getLogType(); if (logType.startsWith("act")) { ctx.output(activityOutputTag, bean); } else { ctx.output(flowOutputTag, bean); } // 輸出主流數據 out.collect(bean); } }); DataStream<LogBean> activityStream = mainStream.getSideOutput(activityOutputTag); DataStream<LogBean> flowStream = mainStream.getSideOutput(flowOutputTag); Properties properties = parameters.getProperties(); // 將測流flowStream寫入kafka中 String flowTopic = parameters.getRequired("flow.topic"); // 創建kafkaProducer,即flink中的sink FlinkKafkaProducer<String> kafkaProducer1 = new FlinkKafkaProducer<>( flowTopic, new KafkaStringSerializationSchema(flowTopic), properties, //指定Kafka的相關參數 FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義 ); flowStream.map(new MapFunction<LogBean, String>() { @Override public String map(LogBean bean) { return JSON.toJSONString(bean); } }).addSink(kafkaProducer1); // 將測流activityStream寫入kafka中 String activityTopic = parameters.getRequired("activity.topic"); // 創建kafkaProducer,即flink中的sink FlinkKafkaProducer<String> kafkaProducer2 = new FlinkKafkaProducer<>( flowTopic, new KafkaStringSerializationSchema(activityTopic), properties, //指定Kafka的相關參數 FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義 ); activityStream.map(new MapFunction<LogBean, String>() { @Override public String map(LogBean bean) { return JSON.toJSONString(bean); } }).addSink(kafkaProducer2); // 將主流數據寫入hdfs,並指定格式為parquet String path = parameters.getRequired("mainstream.hdfs.out.path"); //指定文件目錄生成的格式 DateTimeBucketAssigner<LogBean> bucketAssigner = new DateTimeBucketAssigner<>( "yyyy-MM-dd--HH-mm", ZoneId.of("Asia/Shanghai")); StreamingFileSink<LogBean> streamingFileSink = StreamingFileSink .forBulkFormat( new Path(path), ParquetAvroWriters.forReflectRecord(LogBean.class) ) .withBucketAssigner(bucketAssigner).build(); // 存儲到HDFS中 mainStream.addSink(streamingFileSink); FlinkUtils.env.execute(); } }
conf.properties

redis.host=feng05 redis.password=feng redis.db=6 bootstrap.servers=feng05:9092,feng06:9092,feng07:9092 group.id=f001 auto.offset.reset=earliest kafka.topics=flink-read flow.topic=flow.topic activity.topic=activity.topic key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer transaction.timeout.ms=300000 checkpoint.interval=30000 checkpoint.path=file:///E:/flink/ckt mainstream.hdfs.out.path=hdfs://feng05:9000/flink_data
此處的重要知識點:
- 全局參數
//設置全局的參數,main方法中的參數就能在Task的實例中使用了 FlinkUtils.env.getConfig().setGlobalJobParameters(parameters);
使用全局參數
SingleOutputStreamOperator<LogBean> beanDataStream = lines.map(new ToJSONMapFunction());
ToJSONMapFunction類的部分代碼
ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); //獲取全局的參數
flink中main方法中的參數設置成全局參數,就等價於將這些參數廣播到TaskManager的各個slot中,以便subTask使用。不設置成全局參數,這些參數的引用也會隨着subTask一起調度到slot中,供subTask使用,只是每次使用都需要網絡IO進行獲取,效率低。
- 將數據以Parquet的格式寫入hdfs中(可看官網文檔)
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
使用的還是StreamingFileSink

// 將主流數據寫入hdfs,並指定格式為parquet String path = parameters.getRequired("mainstream.hdfs.out.path"); //指定文件目錄生成的格式 DateTimeBucketAssigner<LogBean> bucketAssigner = new DateTimeBucketAssigner<>( "yyyy-MM-dd--HH-mm", ZoneId.of("Asia/Shanghai")); StreamingFileSink<LogBean> streamingFileSink = StreamingFileSink .forBulkFormat( new Path(path), ParquetAvroWriters.forReflectRecord(LogBean.class) ) .withBucketAssigner(bucketAssigner).build(); // 存儲到HDFS中 mainStream.addSink(streamingFileSink);
2 異步查詢
多個線程同時處理任務,有點就是快,缺點是耗費資源(生產環境中計算資源是允許的)
ThreadPoolTest

package cn._51doit.flink.day07; import java.util.concurrent.*; public class ThreadPoolTest { public static void main(String[] args) throws Exception { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); Future<String> future1 = fixedThreadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("開始查詢"); Thread.sleep(15000); System.out.println("查詢結束"); return "res666"; } }); System.out.println("主線程繼續執行..."); System.out.println("是否返回結果" + future1.isDone()); //String res = future1.get(10, TimeUnit.SECONDS); String res = future1.get(); System.out.println("返回的結果為:" + res); } }
-
所謂異步調用其實就是實現一個可無需等待被調用函數的返回值而讓操作繼續運行的方法。在 Java 語言中,簡單的講就是另啟一個線程來完成調用中的部分計算,使調用繼續運行或返回,而不需要等待計算結果。但調用者仍需要取線程的計算結果。
-
JDK5新增了Future接口,用於描述一個異步計算的結果。雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對於結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,而且也不能及時地得到計算結果。
https://www.cnblogs.com/cjsblog/p/9267163.html(以部調用博客)
2.1 使用異步httpclient查詢經緯度
http://hc.apache.org/httpcomponents-asyncclient-4.1.x/examples.html(httpclient相關的api可以去此網站查)
數據

{"oid": "o1000", "cid": "c10", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1001", "cid": "c11", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1000", "cid": "c10", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1001", "cid": "c11", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1000", "cid": "c10", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1001", "cid": "c11", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072}
HttpAsyncQueryDemo

package cn._51doit.flink.day07; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; public class HttpAsyncQueryDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); SingleOutputStreamOperator<OrderBean> result = AsyncDataStream.unorderedWait(lines, new AsyncHttpQueryFunction(), 2000, TimeUnit.MILLISECONDS, 10); result.print(); env.execute(); } }
AsyncHttpQueryFunction

package cn._51doit.flink.day07; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.util.EntityUtils; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.Supplier; public class AsyncHttpQueryFunction extends RichAsyncFunction<String, OrderBean> { private String key = "4924f7ef5c86a278f5500851541cdcff"; private transient CloseableHttpAsyncClient httpclient; @Override public void open(Configuration parameters) throws Exception { //創建異步查詢的HTTPClient //創建一個異步的HttpClient連接池 //初始化異步的HttpClient RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(3000) .setConnectionRequestTimeout(3000) .build(); httpclient = HttpAsyncClients.custom() .setMaxConnTotal(20) .setDefaultRequestConfig(requestConfig) .build(); httpclient.start(); } @Override public void asyncInvoke(String line, ResultFuture<OrderBean> resultFuture) throws Exception { try { OrderBean orderBean = JSON.parseObject(line, OrderBean.class); //異步查詢 double longitude = orderBean.longitude; double latitude = orderBean.latitude; HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location=" + longitude + "," + latitude + "&key=" + key); //查詢返回Future Future<HttpResponse> future = httpclient.execute(httpGet, null); //從Future中取數據 CompletableFuture.supplyAsync(new Supplier<OrderBean>() { @Override public OrderBean get() { try { HttpResponse response = future.get(); String province = null; String city = null; if (response.getStatusLine().getStatusCode() == 200) { //獲取請求的json字符串 String result = EntityUtils.toString(response.getEntity()); //System.out.println(result); //轉成json對象 JSONObject jsonObj = JSON.parseObject(result); //獲取位置信息 JSONObject regeocode = jsonObj.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //獲取省市區 province = address.getString("province"); city = address.getString("city"); //String businessAreas = address.getString("businessAreas"); } } orderBean.province = province; orderBean.city = city; return orderBean; } catch (Exception e) { // Normally handled explicitly. return null; } } }).thenAccept((OrderBean result) -> { resultFuture.complete(Collections.singleton(result)); }); } catch (Exception e) { resultFuture.complete(Collections.singleton(null)); } } @Override public void close() throws Exception { super.close(); } }
OrderBean

package cn._51doit.flink.day07; public class OrderBean { public String oid; public String cid; public Double money; public Double longitude; public Double latitude; public String province; public String city; public OrderBean(){} public OrderBean(String oid, String cid, Double money, Double longitude, Double latitude) { this.oid = oid; this.cid = cid; this.money = money; this.longitude = longitude; this.latitude = latitude; } public static OrderBean of(String oid, String cid, Double money, Double longitude, Double latitude) { return new OrderBean(oid, cid, money, longitude, latitude); } @Override public String toString() { return "OrderBean{" + "oid='" + oid + '\'' + ", cid='" + cid + '\'' + ", money=" + money + ", longitude=" + longitude + ", latitude=" + latitude + ", province='" + province + '\'' + ", city='" + city + '\'' + '}'; } }
此處注意這個類CompletableFuture(jdk1.8加入的,解決了future不好獲取異步結果的缺點)
2.2 使用線程池異步查詢MySQL數據庫
mysql數據庫不支持異步查詢,即一個mysql連接不支持多個線程同時進行查詢===>代替的方案:創建連接池,建立多個連接
AsyncQueryFromMySQL

package cn._51doit.flink.day07; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; public class AsyncQueryFromMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); int capacity = 20; DataStream<Tuple2<String, String>> result = AsyncDataStream.orderedWait( lines, //輸入的數據流 new MySQLAsyncFunction(capacity), //異步查詢的Function實例 3000, //超時時間 TimeUnit.MILLISECONDS, //時間單位 capacity); //最大異步並發請求數量 result.print(); env.execute(); } }
MySQLAsyncFunction

package cn._51doit.flink.day07; import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; import java.util.concurrent.*; import java.util.function.Supplier; public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> { private transient DruidDataSource dataSource; private transient ExecutorService executorService; private int maxConnTotal; //不能加transient public MySQLAsyncFunction(int maxConnTotal) { this.maxConnTotal = maxConnTotal; } @Override public void open(Configuration parameters) throws Exception { //創建一個線程池(為了實現並發請求的) executorService = Executors.newFixedThreadPool(maxConnTotal); //創建連接池(異步IO 一個請求就是一個線程,一個請求對應一個連接) dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8"); dataSource.setMaxActive(maxConnTotal); } @Override public void close() throws Exception { dataSource.close(); executorService.shutdown(); } @Override public void asyncInvoke(String id, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { //將一個查詢請求丟入的線程池中 // Future<String> future = executorService.submit(() -> { // return queryFromMySql(id); // }); Future<String> future = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return queryFromMySql(id); } }); CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return future.get(); } catch (Exception e) { return null; } } }).thenAccept((String result) -> { resultFuture.complete(Collections.singleton(Tuple2.of(id, result))); }); } private String queryFromMySql(String param) throws SQLException { String sql = "SELECT id, name FROM t_data WHERE id = ?"; String result = null; Connection connection = null; PreparedStatement stmt = null; ResultSet rs = null; try { connection = dataSource.getConnection(); stmt = connection.prepareStatement(sql); stmt.setString(1, param); rs = stmt.executeQuery(); while (rs.next()) { result = rs.getString("name"); } } finally { if (rs != null) { rs.close(); } if (stmt != null) { stmt.close(); } if (connection != null) { connection.close(); } } return result; } }
3 BroadcastState
其與前面學的廣播變量有點像,但是BroadcastState(特殊的operatorState)可以動態發生變化
ActivityCountWithBroadcastState

package cn._51doit.flink.day07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; public class ActivityCountWithBroadcastState { public static void main(String[] args) throws Exception{ //使用BroadcastState的目錄是為了實現高效的關聯字典數據(map side join) //創建兩個了流,其中一個是可廣播的流,另外一個是非廣播的流 //可廣播的流會通過網絡廣播到所有的TaskManager的TaskSlot中 //非廣播的流要連接廣播的流,然后就可以實現關聯了 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //可廣播的流,時間的類型如下: //INSERT,1,新人禮包 //INSERT,2,女神節促銷 //INSERT,3,周末活動 //UPDATE,3,周末抽獎 //DELETE,3,周末抽獎 DataStreamSource<String> stream1 = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple3<String, String, String>> dicStream = stream1.map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String line) throws Exception { String[] fields = line.split(","); String type = fields[0]; String id = fields[1]; String name = fields[2]; return Tuple3.of(type, id, name); } }); //廣播數據的狀態描述器 MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<String, String>( "broadcasr-state", String.class, String.class ); //將字典數據進行廣播 BroadcastStream<Tuple3<String, String, String>> broadcastStream = dicStream.broadcast(mapStateDescriptor); //創建一個非廣播的流,就是我們要處理的數據 DataStreamSource<String> lines = env.socketTextStream("localhost", 9999); //uid01,2020-03-08 11:11:11,2 //uid01,2020-03-08 11:11:11,1 //uid01,2020-03-08 11:11:11,3 //uid01,2020-03-08 11:11:11,3 SingleOutputStreamOperator<Tuple3<String, String, String>> activityStream = lines.map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String line) throws Exception { String[] fields = line.split(","); String uid = fields[0]; String time = fields[1]; String aid = fields[2]; return Tuple3.of(uid, time, aid); } }); //將非廣播的流和廣播的流connect連接到一起 BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> connected = activityStream.connect(broadcastStream); //調用底層的process api實現關聯 SingleOutputStreamOperator<Tuple4<String, String, String, String>> result = connected.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple4<String, String, String, String>>() { //處理每一條廣播的數據 @Override public void processBroadcastElement(Tuple3<String, String, String> input, Context ctx, Collector<Tuple4<String, String, String, String>> out) throws Exception { String type = input.f0; String aid = input.f1; String aname = input.f2; //將數據存儲到每個TaskSlot的內存中 BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); //將輸入的規則數據存儲到broadcastState中 if (type.equals("DELETE")) { broadcastState.remove(aid); } else { broadcastState.put(aid, aname); } System.out.println("subtask : " + getRuntimeContext().getIndexOfThisSubtask() + " 的規則發生變化了!"); } @Override public void processElement(Tuple3<String, String, String> input, ReadOnlyContext ctx, Collector<Tuple4<String, String, String, String>> out) throws Exception { //處理正常數據(就是非廣播的流) ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); String uid = input.f0; String time = input.f1; String aid = input.f2; //通過aid關聯broadcastState活動名稱 String name = broadcastState.get(aid); out.collect(Tuple4.of(uid, time, aid, name)); } }); result.print(); env.execute(); } }
可以看的博客