1 练习讲解(此处自己没跑通,以后debug)
题目见flink---实时项目---day02
kafka中的数据,见day02的文档
GeoUtils

package cn._51doit.flink.day07; import ch.hsr.geohash.GeoHash; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.lang3.StringUtils; import redis.clients.jedis.Jedis; import java.util.ArrayList; public class GeoUtils { public static final String key = "9833dc1de6e628e1b8e244710783a31d"; public static String[] getAreaInfoByLongitudeAndLatitude(HttpClient httpClient, Jedis jedis, double longitude, double latitude) { String province = ""; String city = ""; String district = ""; String bizNames = ""; //将经纬度使用GEOHash进行编码 try { GeoHash geoHash = GeoHash.withCharacterPrecision(latitude, longitude, 8); String base32Code = geoHash.toBase32(); //根据geoHash的编码,到Redis中进行查找 String areaInfo = jedis.get(base32Code); //{wx4sqk42 -> 省,市,区|商圈1,商圈2} //如果有就关联地理信息和商圈信息 if (areaInfo != null) { String[] fields = areaInfo.split("[|]"); String area = fields[0]; //判断是否有商圈信息 if (fields.length > 1) { bizNames = fields[1]; } String[] pcd = area.split(","); province = pcd[0]; city = pcd[1]; district = pcd[2]; } else { //如果没有查找到,就通过httpclient请求高德的API //通过外网调用高德的API //构造一个get对象 GetMethod getMethod = new GetMethod("https://restapi.amap.com/v3/geocode/regeo?key="+ key +"&location=" + longitude + "," + latitude); //发送请求 int status = httpClient.executeMethod(getMethod); if (status == 200) { //获取请求的json字符串 String jsonStr = getMethod.getResponseBodyAsString(); //转成json对象 JSONObject jsonObj = JSON.parseObject(jsonStr); //获取位置信息 JSONObject regeocode = jsonObj.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省市区、商圈信息 province = address.getString("province"); city = address.getString("city"); district = address.getString("district"); ArrayList<String> lb = new ArrayList<>(); //商圈数组(多个) JSONArray businessAreas = address.getJSONArray("businessAreas"); for (int i = 0; i < businessAreas.size(); i++) { JSONObject businessArea = null; try { businessAreas.getJSONObject(i); } catch (Exception e) { //e.printStackTrace(); } if (businessArea != null) { String businessName = businessArea.getString("name"); String longitudeAndLatitude = businessArea.getString("location"); String[] fds = longitudeAndLatitude.split(","); lb.add(businessName); //将商圈的经纬度使用GeoHash进行编码 GeoHash geohash = GeoHash.withCharacterPrecision(Double.parseDouble(fds[1]), Double.parseDouble(fds[0]), 8); //将地理位置信息和商圈信息更新到Redis中 //更新完善本地的商圈知识库 //将查询处理的商圈的实际信息保存到Redis jedis.set(geohash.toBase32(), province + "," + city + "," + district + "|" + businessName); } } bizNames = StringUtils.join(lb.toArray(), ","); jedis.set(base32Code, province + "," + city + "," + district + "|" + bizNames); } } } } catch (Exception e) { e.printStackTrace(); } return new String[]{province, city, district, bizNames}; } }
ToJSONMapFunction

package cn._51doit.flink.day06; import cn._51doit.flink.Utils.GeoUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.HttpClient; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import redis.clients.jedis.Jedis; import java.util.HashMap; public class ToJSONMapFunction extends RichMapFunction<String, LogBean> { private transient Jedis jedis = null; private transient HttpClient httpClient = null; @Override public void open(Configuration parameters) throws Exception { //获取一个HTTP连接、Redis的连接 ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); //获取全局的参数 String host = params.getRequired("redis.host"); String password = params.getRequired("redis.password"); int db = params.getInt("redis.db", 0); jedis = new Jedis(host, 6379, 5000); jedis.auth(password); jedis.select(db); // 访问高德地图API httpClient = new HttpClient(); } @Override public LogBean map(String line) throws Exception { if(!jedis.isConnected()){ jedis.connect(); } LogBean logBean = null; try { String[] fields = line.split("-->"); String dateTime = fields[0]; String dt = dateTime.split(" ")[0]; String json = fields[1]; //使用FastJSON解析 JSONObject jsonObj = JSON.parseObject(json); JSONObject uObj = jsonObj.getJSONObject("u"); JSONObject phoneObj = uObj.getJSONObject("phone"); JSONObject locObj = uObj.getJSONObject("loc"); JSONObject appObj = uObj.getJSONObject("app"); // 取出user对象中的扁平字段 String id = uObj.getString("id"); String account = uObj.getString("account"); String sessionId = uObj.getString("sessionId"); // 取出手机设备信息 String imei = phoneObj.getString("imei"); String osName = phoneObj.getString("osName"); String osVer = phoneObj.getString("osVer"); String resolution = phoneObj.getString("resolution"); String androidId = phoneObj.getString("androidId"); String manufacture = phoneObj.getString("manufacture"); String deviceId = phoneObj.getString("deviceId"); // 取出loc位置信息 String areacode = locObj.getString("areacode"); double longtitude = locObj.getDouble("longtitude"); double latitude = locObj.getDouble("latitude"); //根据经纬度查找省份、市、商圈 String[] areaInfo = GeoUtils.getAreaInfoByLongitudeAndLatitude(httpClient, jedis, longtitude, latitude); String province = areaInfo[0]; String city = areaInfo[1]; String district = areaInfo[2]; String bizNames = areaInfo[3]; String carrier = locObj.getString("carrier"); String netType = locObj.getString("netType"); String cid_sn = locObj.getString("cid_sn"); String ip = locObj.getString("ip"); // 取出app各个字段 String appid = appObj.getString("appid"); String appVer = appObj.getString("appVer"); String release_ch = appObj.getString("release_ch"); String promotion_ch = appObj.getString("promotion_ch"); //事件类型 String logType = jsonObj.getString("logType"); //提交时间 long commit_time = jsonObj.getLong("commit_time"); JSONObject eventObj = jsonObj.getJSONObject("event"); // 构造一个用于装event数据的hashmap HashMap<String, String> eventMap = new HashMap<>(); // 迭代取出event中每一对kv for (String k : eventObj.keySet()) { String v = eventObj.getString(k); // 添加到hashmap中 eventMap.put(k, v); } // 组装数据并返回 logBean = new LogBean(id, account, sessionId, imei, osName, osVer, resolution, androidId, manufacture, deviceId, areacode, longtitude, latitude, province, city, district, bizNames, carrier, netType, cid_sn, ip, appid, appVer, release_ch, promotion_ch, logType, commit_time, dt, eventMap ); } catch (Exception e) { e.printStackTrace(); } return logBean; } @Override public void close() throws Exception { //关闭Http连接和Redis连接 jedis.close(); httpClient = null; } }
PreETLAndTopicSplit

package cn._51doit.flink.day07; import cn._51doit.flink.Utils.FlinkUtils; import cn._51doit.flink.day06.KafkaStringSerializationSchema; import cn._51doit.flink.day06.LogBean; import cn._51doit.flink.day06.ToJSONMapFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.time.ZoneId; import java.util.Properties; public class PreETLAndTopicSplit { public static void main(String[] args) throws Exception { // 1 获取kafka中的数据 ParameterTool parameters = ParameterTool.fromPropertiesFile("E:\\flink\\conf.properties"); DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class); //设置全局的参数,main方法中的参数就能在Task的实例中使用了 FlinkUtils.env.getConfig().setGlobalJobParameters(parameters); // 过滤数据 SingleOutputStreamOperator<LogBean> beanDataStream = lines.map(new ToJSONMapFunction()); SingleOutputStreamOperator<LogBean> filtered = beanDataStream.filter(new FilterFunction<LogBean>() { @Override public boolean filter(LogBean bean) { return bean != null; } }); // 将数据进行拆分===>使用测流输出 //原来使用split方式,再select,现在使用侧流输出 //流量的Tag OutputTag<LogBean> flowOutputTag = new OutputTag<LogBean>("flow-output") {}; //活动的Tag OutputTag<LogBean> activityOutputTag = new OutputTag<LogBean>("activity-output") {}; SingleOutputStreamOperator<LogBean> mainStream = filtered.process(new ProcessFunction<LogBean, LogBean>() { @Override public void processElement(LogBean bean, Context ctx, Collector<LogBean> out) { //根据数据所携带的具有类型进行判断 String logType = bean.getLogType(); if (logType.startsWith("act")) { ctx.output(activityOutputTag, bean); } else { ctx.output(flowOutputTag, bean); } // 输出主流数据 out.collect(bean); } }); DataStream<LogBean> activityStream = mainStream.getSideOutput(activityOutputTag); DataStream<LogBean> flowStream = mainStream.getSideOutput(flowOutputTag); Properties properties = parameters.getProperties(); // 将测流flowStream写入kafka中 String flowTopic = parameters.getRequired("flow.topic"); // 创建kafkaProducer,即flink中的sink FlinkKafkaProducer<String> kafkaProducer1 = new FlinkKafkaProducer<>( flowTopic, new KafkaStringSerializationSchema(flowTopic), properties, //指定Kafka的相关参数 FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义 ); flowStream.map(new MapFunction<LogBean, String>() { @Override public String map(LogBean bean) { return JSON.toJSONString(bean); } }).addSink(kafkaProducer1); // 将测流activityStream写入kafka中 String activityTopic = parameters.getRequired("activity.topic"); // 创建kafkaProducer,即flink中的sink FlinkKafkaProducer<String> kafkaProducer2 = new FlinkKafkaProducer<>( flowTopic, new KafkaStringSerializationSchema(activityTopic), properties, //指定Kafka的相关参数 FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义 ); activityStream.map(new MapFunction<LogBean, String>() { @Override public String map(LogBean bean) { return JSON.toJSONString(bean); } }).addSink(kafkaProducer2); // 将主流数据写入hdfs,并指定格式为parquet String path = parameters.getRequired("mainstream.hdfs.out.path"); //指定文件目录生成的格式 DateTimeBucketAssigner<LogBean> bucketAssigner = new DateTimeBucketAssigner<>( "yyyy-MM-dd--HH-mm", ZoneId.of("Asia/Shanghai")); StreamingFileSink<LogBean> streamingFileSink = StreamingFileSink .forBulkFormat( new Path(path), ParquetAvroWriters.forReflectRecord(LogBean.class) ) .withBucketAssigner(bucketAssigner).build(); // 存储到HDFS中 mainStream.addSink(streamingFileSink); FlinkUtils.env.execute(); } }
conf.properties

redis.host=feng05 redis.password=feng redis.db=6 bootstrap.servers=feng05:9092,feng06:9092,feng07:9092 group.id=f001 auto.offset.reset=earliest kafka.topics=flink-read flow.topic=flow.topic activity.topic=activity.topic key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer transaction.timeout.ms=300000 checkpoint.interval=30000 checkpoint.path=file:///E:/flink/ckt mainstream.hdfs.out.path=hdfs://feng05:9000/flink_data
此处的重要知识点:
- 全局参数
//设置全局的参数,main方法中的参数就能在Task的实例中使用了 FlinkUtils.env.getConfig().setGlobalJobParameters(parameters);
使用全局参数
SingleOutputStreamOperator<LogBean> beanDataStream = lines.map(new ToJSONMapFunction());
ToJSONMapFunction类的部分代码
ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); //获取全局的参数
flink中main方法中的参数设置成全局参数,就等价于将这些参数广播到TaskManager的各个slot中,以便subTask使用。不设置成全局参数,这些参数的引用也会随着subTask一起调度到slot中,供subTask使用,只是每次使用都需要网络IO进行获取,效率低。
- 将数据以Parquet的格式写入hdfs中(可看官网文档)
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
使用的还是StreamingFileSink

// 将主流数据写入hdfs,并指定格式为parquet String path = parameters.getRequired("mainstream.hdfs.out.path"); //指定文件目录生成的格式 DateTimeBucketAssigner<LogBean> bucketAssigner = new DateTimeBucketAssigner<>( "yyyy-MM-dd--HH-mm", ZoneId.of("Asia/Shanghai")); StreamingFileSink<LogBean> streamingFileSink = StreamingFileSink .forBulkFormat( new Path(path), ParquetAvroWriters.forReflectRecord(LogBean.class) ) .withBucketAssigner(bucketAssigner).build(); // 存储到HDFS中 mainStream.addSink(streamingFileSink);
2 异步查询
多个线程同时处理任务,有点就是快,缺点是耗费资源(生产环境中计算资源是允许的)
ThreadPoolTest

package cn._51doit.flink.day07; import java.util.concurrent.*; public class ThreadPoolTest { public static void main(String[] args) throws Exception { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); Future<String> future1 = fixedThreadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("开始查询"); Thread.sleep(15000); System.out.println("查询结束"); return "res666"; } }); System.out.println("主线程继续执行..."); System.out.println("是否返回结果" + future1.isDone()); //String res = future1.get(10, TimeUnit.SECONDS); String res = future1.get(); System.out.println("返回的结果为:" + res); } }
-
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
-
JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
https://www.cnblogs.com/cjsblog/p/9267163.html(以部调用博客)
2.1 使用异步httpclient查询经纬度
http://hc.apache.org/httpcomponents-asyncclient-4.1.x/examples.html(httpclient相关的api可以去此网站查)
数据

{"oid": "o1000", "cid": "c10", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1001", "cid": "c11", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1000", "cid": "c10", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1001", "cid": "c11", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1000", "cid": "c10", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072} {"oid": "o1001", "cid": "c11", "money": 99.99, "longitude": 116.413467, "latitude": 39.908072}
HttpAsyncQueryDemo

package cn._51doit.flink.day07; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; public class HttpAsyncQueryDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); SingleOutputStreamOperator<OrderBean> result = AsyncDataStream.unorderedWait(lines, new AsyncHttpQueryFunction(), 2000, TimeUnit.MILLISECONDS, 10); result.print(); env.execute(); } }
AsyncHttpQueryFunction

package cn._51doit.flink.day07; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.util.EntityUtils; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.Supplier; public class AsyncHttpQueryFunction extends RichAsyncFunction<String, OrderBean> { private String key = "4924f7ef5c86a278f5500851541cdcff"; private transient CloseableHttpAsyncClient httpclient; @Override public void open(Configuration parameters) throws Exception { //创建异步查询的HTTPClient //创建一个异步的HttpClient连接池 //初始化异步的HttpClient RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(3000) .setConnectionRequestTimeout(3000) .build(); httpclient = HttpAsyncClients.custom() .setMaxConnTotal(20) .setDefaultRequestConfig(requestConfig) .build(); httpclient.start(); } @Override public void asyncInvoke(String line, ResultFuture<OrderBean> resultFuture) throws Exception { try { OrderBean orderBean = JSON.parseObject(line, OrderBean.class); //异步查询 double longitude = orderBean.longitude; double latitude = orderBean.latitude; HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location=" + longitude + "," + latitude + "&key=" + key); //查询返回Future Future<HttpResponse> future = httpclient.execute(httpGet, null); //从Future中取数据 CompletableFuture.supplyAsync(new Supplier<OrderBean>() { @Override public OrderBean get() { try { HttpResponse response = future.get(); String province = null; String city = null; if (response.getStatusLine().getStatusCode() == 200) { //获取请求的json字符串 String result = EntityUtils.toString(response.getEntity()); //System.out.println(result); //转成json对象 JSONObject jsonObj = JSON.parseObject(result); //获取位置信息 JSONObject regeocode = jsonObj.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省市区 province = address.getString("province"); city = address.getString("city"); //String businessAreas = address.getString("businessAreas"); } } orderBean.province = province; orderBean.city = city; return orderBean; } catch (Exception e) { // Normally handled explicitly. return null; } } }).thenAccept((OrderBean result) -> { resultFuture.complete(Collections.singleton(result)); }); } catch (Exception e) { resultFuture.complete(Collections.singleton(null)); } } @Override public void close() throws Exception { super.close(); } }
OrderBean

package cn._51doit.flink.day07; public class OrderBean { public String oid; public String cid; public Double money; public Double longitude; public Double latitude; public String province; public String city; public OrderBean(){} public OrderBean(String oid, String cid, Double money, Double longitude, Double latitude) { this.oid = oid; this.cid = cid; this.money = money; this.longitude = longitude; this.latitude = latitude; } public static OrderBean of(String oid, String cid, Double money, Double longitude, Double latitude) { return new OrderBean(oid, cid, money, longitude, latitude); } @Override public String toString() { return "OrderBean{" + "oid='" + oid + '\'' + ", cid='" + cid + '\'' + ", money=" + money + ", longitude=" + longitude + ", latitude=" + latitude + ", province='" + province + '\'' + ", city='" + city + '\'' + '}'; } }
此处注意这个类CompletableFuture(jdk1.8加入的,解决了future不好获取异步结果的缺点)
2.2 使用线程池异步查询MySQL数据库
mysql数据库不支持异步查询,即一个mysql连接不支持多个线程同时进行查询===>代替的方案:创建连接池,建立多个连接
AsyncQueryFromMySQL

package cn._51doit.flink.day07; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; public class AsyncQueryFromMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); int capacity = 20; DataStream<Tuple2<String, String>> result = AsyncDataStream.orderedWait( lines, //输入的数据流 new MySQLAsyncFunction(capacity), //异步查询的Function实例 3000, //超时时间 TimeUnit.MILLISECONDS, //时间单位 capacity); //最大异步并发请求数量 result.print(); env.execute(); } }
MySQLAsyncFunction

package cn._51doit.flink.day07; import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; import java.util.concurrent.*; import java.util.function.Supplier; public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> { private transient DruidDataSource dataSource; private transient ExecutorService executorService; private int maxConnTotal; //不能加transient public MySQLAsyncFunction(int maxConnTotal) { this.maxConnTotal = maxConnTotal; } @Override public void open(Configuration parameters) throws Exception { //创建一个线程池(为了实现并发请求的) executorService = Executors.newFixedThreadPool(maxConnTotal); //创建连接池(异步IO 一个请求就是一个线程,一个请求对应一个连接) dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8"); dataSource.setMaxActive(maxConnTotal); } @Override public void close() throws Exception { dataSource.close(); executorService.shutdown(); } @Override public void asyncInvoke(String id, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { //将一个查询请求丢入的线程池中 // Future<String> future = executorService.submit(() -> { // return queryFromMySql(id); // }); Future<String> future = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return queryFromMySql(id); } }); CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return future.get(); } catch (Exception e) { return null; } } }).thenAccept((String result) -> { resultFuture.complete(Collections.singleton(Tuple2.of(id, result))); }); } private String queryFromMySql(String param) throws SQLException { String sql = "SELECT id, name FROM t_data WHERE id = ?"; String result = null; Connection connection = null; PreparedStatement stmt = null; ResultSet rs = null; try { connection = dataSource.getConnection(); stmt = connection.prepareStatement(sql); stmt.setString(1, param); rs = stmt.executeQuery(); while (rs.next()) { result = rs.getString("name"); } } finally { if (rs != null) { rs.close(); } if (stmt != null) { stmt.close(); } if (connection != null) { connection.close(); } } return result; } }
3 BroadcastState
其与前面学的广播变量有点像,但是BroadcastState(特殊的operatorState)可以动态发生变化
ActivityCountWithBroadcastState

package cn._51doit.flink.day07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; public class ActivityCountWithBroadcastState { public static void main(String[] args) throws Exception{ //使用BroadcastState的目录是为了实现高效的关联字典数据(map side join) //创建两个了流,其中一个是可广播的流,另外一个是非广播的流 //可广播的流会通过网络广播到所有的TaskManager的TaskSlot中 //非广播的流要连接广播的流,然后就可以实现关联了 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //可广播的流,时间的类型如下: //INSERT,1,新人礼包 //INSERT,2,女神节促销 //INSERT,3,周末活动 //UPDATE,3,周末抽奖 //DELETE,3,周末抽奖 DataStreamSource<String> stream1 = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple3<String, String, String>> dicStream = stream1.map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String line) throws Exception { String[] fields = line.split(","); String type = fields[0]; String id = fields[1]; String name = fields[2]; return Tuple3.of(type, id, name); } }); //广播数据的状态描述器 MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<String, String>( "broadcasr-state", String.class, String.class ); //将字典数据进行广播 BroadcastStream<Tuple3<String, String, String>> broadcastStream = dicStream.broadcast(mapStateDescriptor); //创建一个非广播的流,就是我们要处理的数据 DataStreamSource<String> lines = env.socketTextStream("localhost", 9999); //uid01,2020-03-08 11:11:11,2 //uid01,2020-03-08 11:11:11,1 //uid01,2020-03-08 11:11:11,3 //uid01,2020-03-08 11:11:11,3 SingleOutputStreamOperator<Tuple3<String, String, String>> activityStream = lines.map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String line) throws Exception { String[] fields = line.split(","); String uid = fields[0]; String time = fields[1]; String aid = fields[2]; return Tuple3.of(uid, time, aid); } }); //将非广播的流和广播的流connect连接到一起 BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> connected = activityStream.connect(broadcastStream); //调用底层的process api实现关联 SingleOutputStreamOperator<Tuple4<String, String, String, String>> result = connected.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple4<String, String, String, String>>() { //处理每一条广播的数据 @Override public void processBroadcastElement(Tuple3<String, String, String> input, Context ctx, Collector<Tuple4<String, String, String, String>> out) throws Exception { String type = input.f0; String aid = input.f1; String aname = input.f2; //将数据存储到每个TaskSlot的内存中 BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); //将输入的规则数据存储到broadcastState中 if (type.equals("DELETE")) { broadcastState.remove(aid); } else { broadcastState.put(aid, aname); } System.out.println("subtask : " + getRuntimeContext().getIndexOfThisSubtask() + " 的规则发生变化了!"); } @Override public void processElement(Tuple3<String, String, String> input, ReadOnlyContext ctx, Collector<Tuple4<String, String, String, String>> out) throws Exception { //处理正常数据(就是非广播的流) ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); String uid = input.f0; String time = input.f1; String aid = input.f2; //通过aid关联broadcastState活动名称 String name = broadcastState.get(aid); out.collect(Tuple4.of(uid, time, aid, name)); } }); result.print(); env.execute(); } }
可以看的博客