Flink ETL 實現數據清洗
一:需求(針對算法產生的日志數據進行清洗拆分)
1. 算法產生的日志數據是嵌套json格式,需要拆分
2.針對算法中的國家字段進行大區轉換
3.最后把不同類型的日志數據分別進行儲存
二:整體架構
這里演示處理從rabbitmq來的數據 進行數據處理 然后發送到rabbitmq
自定義redistSource flink沒有redis的source
package com.yw.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisConnectionException; import java.util.HashMap; import java.util.Map; /** * redis中進行數據初始化 * <p> * 在reids中保存國家和大區關系 * hset areas AREA_IN IN * hset areas AREA_US US * hset areas AREA_CT TW,HK * hset areas AREA_AR PK,KW,SA * * * @Auther: YW * @Date: 2019/6/15 10:23 * @Description: */ public class MyRedisSource implements SourceFunction<HashMap<String, String>> { private final Logger LOG = LoggerFactory.getLogger(MyRedisSource.class); private boolean isRuning = true; private Jedis jedis = null; private final long SLEEP = 60000; private final long expire = 60; @Override public void run(SourceContext<HashMap<String, String>> ctx) throws Exception { this.jedis = new Jedis("localhost", 6397); // 存儲國家和地區關系 HashMap<String, String> map = new HashMap<>(); while (isRuning) { try { map.clear(); // 老數據清除 Map<String, String> areas = jedis.hgetAll("areas"); for (Map.Entry<String, String> entry : areas.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); String[] splits = value.split(","); for (String split : splits) { map.put(split, key); } } if (map.size() > 0) { // map >0 數據發送出去 ctx.collect(map); }else { LOG.warn("獲取數據為空!"); } // 歇6秒 Thread.sleep(SLEEP); } catch (JedisConnectionException e) { LOG.error("redis連接異常 重新連接",e.getCause()); // 如果連接異常 重新連接 jedis = new Jedis("localhost", 6397); }catch (Exception e){ LOG.error("redis Source其他異常",e.getCause()); } } } @Override public void cancel() { isRuning = false; while (jedis != null) { jedis.close(); } } }
DataClean數據處理
package com.yw; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.AMQP; import com.yw.source.MyRedisSource; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import java.util.HashMap; /** * @Auther: YW * @Date: 2019/6/15 10:09 * @Description: */ public class DataClean { // 隊列名 public final static String QUEUE_NAME = "two.aa.in"; public static void main(String[] args) throws Exception { // 獲取環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 一分鍾 checkpoint env.enableCheckpointing(60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // enableCheckpointing最小間隔時間(一半) env.getCheckpointConfig().setCheckpointTimeout(10000);// 超時時間 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); final RMQConnectionConfig rmqConf = new RMQConnectionConfig.Builder().setHost("127.0.0.1").setPort(5672).setVirtualHost("/").setUserName("guest").setPassword("guest").build(); // 獲取mq數據 DataStream<String> data1 = env.addSource(new RMQSource<String>(rmqConf, QUEUE_NAME, false, new SimpleStringSchema())).setParallelism(1); //{"dt":"2019-06-10","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.1,"level":"B"},{"type":"s3","score":0.2,"level":"C"}]} DataStreamSource<HashMap<String, String>> mapData = env.addSource(new MyRedisSource()); // connect可以連接兩個流 DataStream<String> streamOperator = data1.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() { // 保存 redis返回數據 國家和大區的映射關系 private HashMap<String, String> allMap = new HashMap<String, String>(); // flatMap1 處理rabbitmq的數據 @Override public void flatMap1(String value, Collector<String> out) throws Exception { JSONObject jsonObject = JSONObject.parseObject(value); String countryCode = jsonObject.getString("countryCode"); String dt = jsonObject.getString("dt"); // 獲取大區 String area = allMap.get(countryCode); JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonObject1 = jsonArray.getJSONObject(i); jsonObject1.put("area", area); jsonObject1.put("dt", dt); out.collect(jsonObject1.toJSONString()); } } // 處理redis的返回的map類型的數據 @Override public void flatMap2(HashMap<String, String> value, Collector<String> out) throws Exception { this.allMap = value; } }); streamOperator.addSink(new RMQSink<String>(rmqConf, new SimpleStringSchema(), new RMQSinkPublishOptions<String>() { @Override public String computeRoutingKey(String s) { return "CC"; } @Override public AMQP.BasicProperties computeProperties(String s) { return null; } @Override public String computeExchange(String s) { return "test.flink.output"; } })); data1.print(); env.execute("etl"); } }
rabbitmq 模擬數據
package com.yw; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; /** * @Auther: YW * @Date: 2019/6/5 14:57 * @Description: */ public class RabbitMQProducerUtil { public final static String QUEUE_NAME = "two.aa.in"; public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ相關信息 factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setPort(5672); //創建一個新的連接 Connection connection = factory.newConnection(); //創建一個通道 Channel channel = connection.createChannel(); // 聲明一個隊列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發送消息到隊列中 String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\"," + "{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}," + "{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}," + "{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}]}"; //我們這里演示發送一千條數據 for (int i = 0; i < 20; i++) { channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8")); System.out.println("Producer Send +'" + message); } //關閉通道和連接 channel.close(); connection.close(); } public static String getCurrentTime() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(new Date()); } public static String getCountryCode() { String[] types={"US","TN","HK","PK","KW","SA","IN"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getType() { String[] types={"s1","s2","s3","s4","s5"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getScore() { String[] types={"0.1","0.2","0.3","0.4","0.5"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getLevel() { String[] types={"A","B","C","D","E"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } }
redis 初始化數據
* hset areas AREA_IN IN
* hset areas AREA_US US
* hset areas AREA_CT TW,HK
* hset areas AREA_AR PK,KW,SA
------------最后運行DataClean------------