flink ETL數據處理


                      Flink ETL 實現數據清洗 

  

 

 

 

 一:需求(針對算法產生的日志數據進行清洗拆分)

  1. 算法產生的日志數據是嵌套json格式,需要拆分

  2.針對算法中的國家字段進行大區轉換

  3.最后把不同類型的日志數據分別進行儲存

二:整體架構 

 

 

 

 

      這里演示處理從rabbitmq來的數據 進行數據處理 然后發送到rabbitmq                            

 

 

 自定義redistSource flink沒有redis的source

package com.yw.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.util.HashMap;
import java.util.Map;

/**
 * redis中進行數據初始化
 * <p>
 * 在reids中保存國家和大區關系
 * hset areas AREA_IN IN
 * hset areas AREA_US US
 * hset areas AREA_CT TW,HK
 * hset areas AREA_AR PK,KW,SA
 *
 *
 * @Auther: YW
 * @Date: 2019/6/15 10:23
 * @Description:
 */
public class MyRedisSource implements SourceFunction<HashMap<String, String>> {
    private final Logger LOG = LoggerFactory.getLogger(MyRedisSource.class);

    private boolean isRuning = true;
    private Jedis jedis = null;
    private final long SLEEP = 60000;
    private final long expire = 60;

    @Override
    public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
        this.jedis = new Jedis("localhost", 6397);
        // 存儲國家和地區關系
        HashMap<String, String> map = new HashMap<>();
        while (isRuning) {
            try {
                map.clear(); // 老數據清除
                Map<String, String> areas = jedis.hgetAll("areas");
                for (Map.Entry<String, String> entry : areas.entrySet()) {
                    String key = entry.getKey();
                    String value = entry.getValue();
                    String[] splits = value.split(",");
                    for (String split : splits) {
                        map.put(split, key);
                    }
                }
                if (map.size() > 0) {
                    // map >0 數據發送出去
                    ctx.collect(map);
                }else {
                    LOG.warn("獲取數據為空!");
                }
                // 歇6秒
                Thread.sleep(SLEEP);
            } catch (JedisConnectionException e) {
                LOG.error("redis連接異常 重新連接",e.getCause());
                // 如果連接異常 重新連接
                jedis = new Jedis("localhost", 6397);
            }catch (Exception e){
                LOG.error("redis Source其他異常",e.getCause());
            }

        }
    }

    @Override
    public void cancel() {
        isRuning = false;
        while (jedis != null) {
            jedis.close();
        }
    }
}

 

DataClean數據處理
package com.yw;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.yw.source.MyRedisSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.HashMap;

/**
 * @Auther: YW
 * @Date: 2019/6/15 10:09
 * @Description:
 */
public class DataClean {
    // 隊列名
    public final static String QUEUE_NAME = "two.aa.in";

    public static void main(String[] args) throws Exception {
        // 獲取環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 一分鍾 checkpoint
        env.enableCheckpointing(60000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // enableCheckpointing最小間隔時間(一半)
        env.getCheckpointConfig().setCheckpointTimeout(10000);// 超時時間
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        final RMQConnectionConfig rmqConf = new RMQConnectionConfig.Builder().setHost("127.0.0.1").setPort(5672).setVirtualHost("/").setUserName("guest").setPassword("guest").build();
        // 獲取mq數據
        DataStream<String> data1 = env.addSource(new RMQSource<String>(rmqConf, QUEUE_NAME, false, new SimpleStringSchema())).setParallelism(1);
        //{"dt":"2019-06-10","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.1,"level":"B"},{"type":"s3","score":0.2,"level":"C"}]}
        DataStreamSource<HashMap<String, String>> mapData = env.addSource(new MyRedisSource());
        // connect可以連接兩個流
        DataStream<String> streamOperator = data1.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() {
            // 保存 redis返回數據  國家和大區的映射關系
            private HashMap<String, String> allMap = new HashMap<String, String>();

            // flatMap1 處理rabbitmq的數據
            @Override
            public void flatMap1(String value, Collector<String> out) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(value);
                String countryCode = jsonObject.getString("countryCode");
                String dt = jsonObject.getString("dt");
                // 獲取大區
                String area = allMap.get(countryCode);
                JSONArray jsonArray = jsonObject.getJSONArray("data");
                for (int i = 0; i < jsonArray.size(); i++) {
                    JSONObject jsonObject1 = jsonArray.getJSONObject(i);
                    jsonObject1.put("area", area);
                    jsonObject1.put("dt", dt);
                    out.collect(jsonObject1.toJSONString());
                }
            }

            // 處理redis的返回的map類型的數據
            @Override
            public void flatMap2(HashMap<String, String> value, Collector<String> out) throws Exception {
                this.allMap = value;
            }
        });
        streamOperator.addSink(new RMQSink<String>(rmqConf, new SimpleStringSchema(), new RMQSinkPublishOptions<String>() {
            @Override
            public String computeRoutingKey(String s) {
                return "CC";
            }

            @Override
            public AMQP.BasicProperties computeProperties(String s) {
                return null;
            }

            @Override
            public String computeExchange(String s) {
                return "test.flink.output";
            }
        }));
        data1.print();
        env.execute("etl");
    }
}

 

rabbitmq 模擬數據

package com.yw;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

/**
 * @Auther: YW
 * @Date: 2019/6/5 14:57
 * @Description:
 */
public class RabbitMQProducerUtil {
    public final static String QUEUE_NAME = "two.aa.in";

    public static void main(String[] args) throws Exception {
        //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();

        //設置RabbitMQ相關信息
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        //創建一個新的連接
        Connection connection = factory.newConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        // 聲明一個隊列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //發送消息到隊列中
        String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\"," +
                "{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}," +
                "{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}," +
                "{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}]}";

        //我們這里演示發送一千條數據
        for (int i = 0; i < 20; i++) {
                channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));
              System.out.println("Producer Send +'" + message);
        }

        //關閉通道和連接
        channel.close();
        connection.close();
    }

    public static String getCurrentTime() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.format(new Date());
    }

    public static String getCountryCode() {
        String[] types={"US","TN","HK","PK","KW","SA","IN"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }

    public static String getType() {
        String[] types={"s1","s2","s3","s4","s5"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }

    public static String getScore() {
        String[] types={"0.1","0.2","0.3","0.4","0.5"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }
    public static String getLevel() {
        String[] types={"A","B","C","D","E"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }
}

redis 初始化數據

* hset areas AREA_IN IN
* hset areas AREA_US US
* hset areas AREA_CT TW,HK
* hset areas AREA_AR PK,KW,SA

 

 

------------最后運行DataClean------------

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM