flink---實時項目--day02-----1. 解析參數工具類 2. Flink工具類封裝 3. 日志采集架構圖 4. 測流輸出 5. 將kafka中數據寫入HDFS 6 KafkaProducer的使用 7 練習


1. 解析參數工具類(ParameterTool)

  該類提供了從不同數據源讀取和解析程序參數的簡單實用方法,其解析args時,只能支持單只參數。

  • 用來解析main方法傳入參數的工具類
public class ParseArgsKit {
    public static void main(String[] args) {
        ParameterTool parameters = ParameterTool.fromArgs(args);
        String host = parameters.getRequired("redis.host");
        String port = parameters.getRequired("redis.port");
        System.out.println(host);
        System.out.println(port);
    }
}

參數的輸入格式如下:

 

 這種解析程序參數的的優點是參數不需要按照順序指定,但若是參數過多的話,寫起來不方便,這時我們可以選擇使用解析配置文件的工具類

  • 用來解析配置文件的工具類,該配置文件的路徑自己指定
public class ParseArgsKit {
    public static void main(String[] args) throws IOException {
        ParameterTool parameters = ParameterTool.fromPropertiesFile("E:\\flink\\conf.properties");
        String host = parameters.getRequired("redis.host");
        String port = parameters.getRequired("redis.port");
        System.out.println(host);
        System.out.println(port);
    }
}

配置文件conf.properties

redis.host=feng05
redis.port=4444 

 

2. Flink工具類封裝(創建KafkaSource)

RealtimeETL

package cn._51doit.flink.day06;

import cn._51doit.flink.Utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;


public class RealtimeETL {
    public static void main(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromPropertiesFile("E:\\flink\\conf.properties");
        //使用Flink拉取Kafka中的數據,對數據進行清洗、過濾整理
        DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);
        lines.print();
        FlinkUtils.env.execute();
    }
}
View Code

FlinkUtils

package cn._51doit.flink.Utils;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class FlinkUtils {
    public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws IOException, IllegalAccessException, InstantiationException {
        // 設置checkpoint的間隔時間
        env.enableCheckpointing(parameters.getLong("checkpoint.interval",300000));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        //就是將job cancel后,依然保存對應的checkpoint數據
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        String checkPointPath = parameters.get("checkpoint.path");
        if(checkPointPath != null){
            env.setStateBackend(new FsStateBackend(checkPointPath));
        }
        int restartAttempts = parameters.getInt("restart.attempts", 30);
        int delayBetweenAttempts = parameters.getInt("delay.between.attempts", 30000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenAttempts));
        Properties properties = parameters.getProperties();
        String topics = parameters.getRequired("kafka.topics");
        List<String> topicList = Arrays.asList(topics.split(","));

        FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<T>(topicList, clazz.newInstance(), properties);
        //在Checkpoint的時候將Kafka的偏移量不保存到Kafka特殊的Topic中,默認是true
        flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
        return env.addSource(flinkKafkaConsumer);
    }
}
View Code

此處的重點是FlinkKafkaConsumer這個類的使用,下圖顯示的是其中一種構造方法

 

 參數一:topic名或 topic名的列表

  Flink Kafka Consumer 需要知道如何將來自Kafka的二進制數據轉換為Java/Scala對象。DeserializationSchema接口允許程序員指定這個序列化的實現。該接口的 T deserialize(byte[]message) 會在收到每一條Kafka的消息的時候被調用。我們通常會實現 AbstractDeserializationSchema,它可以描述被序列化的Java/Scala類型到Flink的類型(TypeInformation)的映射。如果用戶的代碼實現了DeserializationSchema,那么就需要自己實現getProducedType(...) 方法。

為了方便使用,Flink提供了一些已實現的schema:

(1) TypeInformationSerializationSchema (andTypeInformationKeyValueSerializationSchema) ,他們會基於Flink的TypeInformation來創建schema。這對於那些從Flink寫入,又從Flink讀出的數據是很有用的。這種Flink-specific的反序列化會比其他通用的序列化方式帶來更高的性能。

(2)JsonDeserializationSchema (andJSONKeyValueDeserializationSchema) 可以把序列化后的Json反序列化成ObjectNode,ObjectNode可以通過objectNode.get(“field”).as(Int/String/…)() 來訪問指定的字段。

(3)SimpleStringSchema可以將消息反序列化為字符串。當我們接收到消息並且反序列化失敗的時候,會出現以下兩種情況: 1) Flink從deserialize(..)方法中拋出異常,這會導致job的失敗,然后job會重啟;2) 在deserialize(..) 方法出現失敗的時候返回null,這會讓Flink Kafka consumer默默的忽略這條消息。請注意,如果配置了checkpoint 為enable,由於consumer的失敗容忍機制,失敗的消息會被繼續消費,因此還會繼續失敗,這就會導致job被不斷自動重啟。

參數二:

   反序列化約束,以便於Flink決定如何反序列化從Kafka獲得的數據。

 

參數三

  Kafka consumer的屬性配置,下面三個屬性配置是必須的:

3 日志采集架構圖

(1)以前學習離線數倉時,采集數據是使用flume的agent級聯的方式,中間層是為了增大吞吐(負載均衡),和容錯(failOver),這兩個可以同時實現(多個sink)

 

 

 這種agent級聯的方式是一種過時的做法了,在flume1.7前一半使用這種,flume1.7后,有kafkachannel,這種方式就被取代了,其一級agent實現不了容錯。更好的方式如下

(2)直接source+kafkaChannel的形式,kafka直接解決掉高吞吐量和容錯的問題,並且一級agent中還實現了容錯如下圖

 

4. 測流輸出

  測流輸出與split+select相似。當單存的過濾出某類數據時,用filter效率會高點,但若是對某個數據進行分類時,若再使用filter的話,則要過濾多次,即運行多次任務,效率比較低。若是使用測流輸出,運行一次即可

SideOutPutDemo
package cn._51doit.flink.day06;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutPutDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);

        SingleOutputStreamOperator<Tuple3<String, String, String>> tpData = lines.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                String[] fields = value.split(" ");
                String event = fields[0];
                String guid = fields[1];
                String timestamp = fields[2];
                return Tuple3.of(event, guid, timestamp);
            }
        });
        OutputTag<Tuple3<String, String, String>> viewTag = new OutputTag<Tuple3<String, String, String>>("view-tag"){};
        OutputTag<Tuple3<String, String, String>> activityTag = new OutputTag<Tuple3<String, String, String>>("activity-tag"){};
        OutputTag<Tuple3<String, String, String>> orderTag = new OutputTag<Tuple3<String, String, String>>("order-tag"){};

        SingleOutputStreamOperator<Tuple3<String, String, String>> outDataStream = tpData.process(new ProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>>() {
            @Override
            public void processElement(Tuple3<String, String, String> input, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
                // 將數據打上標簽
                String type = input.f0;
                if (type.equals("pgview")) {
                    ctx.output(viewTag, input);
                } else if (type.equals("activity")) {
                    ctx.output(activityTag, input);
                } else {
                    ctx.output(orderTag, input);
                }
                // 輸出主流的數據,此處不輸出主流數據的話,在外面則獲取不到主流數據
                out.collect(input);
            }
        });
        // 輸出的測流只能通過getSideOutput
//        DataStream<Tuple3<String, String, String>> viewDataStream = outDataStream.getSideOutput(viewTag);
//        viewDataStream.print();
        outDataStream.print();
        env.execute();
    }
}
View Code

改進使用processElement方法

package cn._51doit.flink.day06;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 1.將數據整理成Tuple3
 * 2.然后使用側流輸出將數據分類
 */
public class SideOutputsDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        view,pid,2020-03-09 11:42:30
//        activity,a10,2020-03-09 11:42:38
//        order,o345,2020-03-09 11:42:38
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        OutputTag<Tuple3<String, String, String>> viewTag = new OutputTag<Tuple3<String, String, String>>("view-tag") {
        };
        OutputTag<Tuple3<String, String, String>> activityTag = new OutputTag<Tuple3<String, String, String>>("activity-tag") {
        };
        OutputTag<Tuple3<String, String, String>> orderTag = new OutputTag<Tuple3<String, String, String>>("order-tag") {
        };

        //直接調用process方法
        SingleOutputStreamOperator<Tuple3<String, String, String>> tpDataStream = lines.process(new ProcessFunction<String, Tuple3<String, String, String>>() {

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void processElement(String input, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {

                //1.將字符串轉成Tuple2
                String[] fields = input.split(",");
                String type = fields[0];
                String id = fields[1];
                String time = fields[2];
                Tuple3<String, String, String> tp = Tuple3.of(type, id, time);

                //2.對數據打標簽
                //將數據打上標簽
                if (type.equals("view")) {
                    //輸出數據,將數據和標簽關聯
                    ctx.output(viewTag, tp);  //ctx.output  輸出側流的
                } else if (type.equals("activity")) {
                    ctx.output(activityTag, tp);
                } else {
                    ctx.output(orderTag, tp);
                }
                //輸出主流的數據
                out.collect(tp);
            }
        });


        //輸出的測流只能通過getSideOutput
        DataStream<Tuple3<String, String, String>> viewDataStream = tpDataStream.getSideOutput(viewTag);

        //分別處理各種類型的數據。
        viewDataStream.print();

        env.execute();

    }
}
View Code

 

5. 將kafka中數據寫入HDFS

  • 方案一:使用flume,具體見下圖:

 

  •  方案二:使用StreamingFileSink,此種形式更加好,其可以按照需求滾動生成文件

 6 KafkaProducer的使用

  現在的需求是將kafka中的數據進行處理(分主題等),然后寫回kafka中去。如下所示

 

 

 這時可以使用flink的自定義sink往kafka中寫數據,具體代碼如下

KafkaSinkDemo(老版本1.9以前)

package cn._51doit.flink.day06;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class KafkaSinkDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);


        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
                "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092",            // broker list
                "etl-test",                  // target topic
                new SimpleStringSchema());   // serialization schema

        myProducer.setWriteTimestampToKafka(true);

        //將數據寫入到Kafka
        lines.addSink(myProducer);

        env.execute();

    }
}
View Code

KafkaSinkDemo2(flink1.9以后)

package cn._51doit.flink.day06;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * 使用新的Kafka Sink API
 */
public class KafkaSinkDemo2 {

    public static void main(String[] args) throws Exception {

        ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
        DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);
        //寫入Kafka的topic
        String topic = "etl-test";
        //設置Kafka相關參數
        Properties properties = new Properties();
        properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
        properties.setProperty("bootstrap.servers",
                "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092");
        //創建FlinkKafkaProducer
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
                topic, //指定topic
                new KafkaStringSerializationSchema(topic), //指定寫入Kafka的序列化Schema
                properties, //指定Kafka的相關參數
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義
        );
        //添加KafkaSink
        lines.addSink(kafkaProducer);
        //執行
        FlinkUtils.env.execute();

    }
}
View Code

這里需要注意一個點,要設置如下參數:

properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");

  kafka brokers默認的最大事務超時時間為15min,生產者設置事務時不允許大於這個值。但是在默認的情況下,FlinkKafkaProducer設置事務超時屬性為1h,超過了默認transaction.max.ms 15min。這個時候我們選擇生產者的事務超時屬性transaction.timeout.ms小於15min即可

 

 7. 練習(未練)

 

 

 

 

 

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM