1. 解析參數工具類(ParameterTool)
該類提供了從不同數據源讀取和解析程序參數的簡單實用方法,其解析args時,只能支持單只參數。
-
用來解析main方法傳入參數的工具類
public class ParseArgsKit { public static void main(String[] args) { ParameterTool parameters = ParameterTool.fromArgs(args); String host = parameters.getRequired("redis.host"); String port = parameters.getRequired("redis.port"); System.out.println(host); System.out.println(port); } }
參數的輸入格式如下:
這種解析程序參數的的優點是參數不需要按照順序指定,但若是參數過多的話,寫起來不方便,這時我們可以選擇使用解析配置文件的工具類
- 用來解析配置文件的工具類,該配置文件的路徑自己指定
public class ParseArgsKit { public static void main(String[] args) throws IOException { ParameterTool parameters = ParameterTool.fromPropertiesFile("E:\\flink\\conf.properties"); String host = parameters.getRequired("redis.host"); String port = parameters.getRequired("redis.port"); System.out.println(host); System.out.println(port); } }
配置文件conf.properties
redis.host=feng05 redis.port=4444
2. Flink工具類封裝(創建KafkaSource)
RealtimeETL

package cn._51doit.flink.day06; import cn._51doit.flink.Utils.FlinkUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; public class RealtimeETL { public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromPropertiesFile("E:\\flink\\conf.properties"); //使用Flink拉取Kafka中的數據,對數據進行清洗、過濾整理 DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class); lines.print(); FlinkUtils.env.execute(); } }
FlinkUtils

package cn._51doit.flink.Utils; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Properties; public class FlinkUtils { public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws IOException, IllegalAccessException, InstantiationException { // 設置checkpoint的間隔時間 env.enableCheckpointing(parameters.getLong("checkpoint.interval",300000)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); //就是將job cancel后,依然保存對應的checkpoint數據 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); String checkPointPath = parameters.get("checkpoint.path"); if(checkPointPath != null){ env.setStateBackend(new FsStateBackend(checkPointPath)); } int restartAttempts = parameters.getInt("restart.attempts", 30); int delayBetweenAttempts = parameters.getInt("delay.between.attempts", 30000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenAttempts)); Properties properties = parameters.getProperties(); String topics = parameters.getRequired("kafka.topics"); List<String> topicList = Arrays.asList(topics.split(",")); FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<T>(topicList, clazz.newInstance(), properties); //在Checkpoint的時候將Kafka的偏移量不保存到Kafka特殊的Topic中,默認是true flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false); return env.addSource(flinkKafkaConsumer); } }
此處的重點是FlinkKafkaConsumer這個類的使用,下圖顯示的是其中一種構造方法
參數一:topic名或 topic名的列表
Flink Kafka Consumer 需要知道如何將來自Kafka的二進制數據轉換為Java/Scala對象。DeserializationSchema接口允許程序員指定這個序列化的實現。該接口的 T deserialize(byte[]message) 會在收到每一條Kafka的消息的時候被調用。我們通常會實現 AbstractDeserializationSchema,它可以描述被序列化的Java/Scala類型到Flink的類型(TypeInformation)的映射。如果用戶的代碼實現了DeserializationSchema,那么就需要自己實現getProducedType(...) 方法。
為了方便使用,Flink提供了一些已實現的schema:
(1) TypeInformationSerializationSchema (andTypeInformationKeyValueSerializationSchema) ,他們會基於Flink的TypeInformation來創建schema。這對於那些從Flink寫入,又從Flink讀出的數據是很有用的。這種Flink-specific的反序列化會比其他通用的序列化方式帶來更高的性能。
(2)JsonDeserializationSchema (andJSONKeyValueDeserializationSchema) 可以把序列化后的Json反序列化成ObjectNode,ObjectNode可以通過objectNode.get(“field”).as(Int/String/…)() 來訪問指定的字段。
(3)SimpleStringSchema可以將消息反序列化為字符串。當我們接收到消息並且反序列化失敗的時候,會出現以下兩種情況: 1) Flink從deserialize(..)方法中拋出異常,這會導致job的失敗,然后job會重啟;2) 在deserialize(..) 方法出現失敗的時候返回null,這會讓Flink Kafka consumer默默的忽略這條消息。請注意,如果配置了checkpoint 為enable,由於consumer的失敗容忍機制,失敗的消息會被繼續消費,因此還會繼續失敗,這就會導致job被不斷自動重啟。
參數二:
反序列化約束,以便於Flink決定如何反序列化從Kafka獲得的數據。
參數三
Kafka consumer的屬性配置,下面三個屬性配置是必須的:
3 日志采集架構圖
(1)以前學習離線數倉時,采集數據是使用flume的agent級聯的方式,中間層是為了增大吞吐(負載均衡),和容錯(failOver),這兩個可以同時實現(多個sink)
這種agent級聯的方式是一種過時的做法了,在flume1.7前一半使用這種,flume1.7后,有kafkachannel,這種方式就被取代了,其一級agent實現不了容錯。更好的方式如下
(2)直接source+kafkaChannel的形式,kafka直接解決掉高吞吐量和容錯的問題,並且一級agent中還實現了容錯如下圖
4. 測流輸出
測流輸出與split+select相似。當單存的過濾出某類數據時,用filter效率會高點,但若是對某個數據進行分類時,若再使用filter的話,則要過濾多次,即運行多次任務,效率比較低。若是使用測流輸出,運行一次即可
SideOutPutDemo

package cn._51doit.flink.day06; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; public class SideOutPutDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); SingleOutputStreamOperator<Tuple3<String, String, String>> tpData = lines.map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String value) throws Exception { String[] fields = value.split(" "); String event = fields[0]; String guid = fields[1]; String timestamp = fields[2]; return Tuple3.of(event, guid, timestamp); } }); OutputTag<Tuple3<String, String, String>> viewTag = new OutputTag<Tuple3<String, String, String>>("view-tag"){}; OutputTag<Tuple3<String, String, String>> activityTag = new OutputTag<Tuple3<String, String, String>>("activity-tag"){}; OutputTag<Tuple3<String, String, String>> orderTag = new OutputTag<Tuple3<String, String, String>>("order-tag"){}; SingleOutputStreamOperator<Tuple3<String, String, String>> outDataStream = tpData.process(new ProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>>() { @Override public void processElement(Tuple3<String, String, String> input, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception { // 將數據打上標簽 String type = input.f0; if (type.equals("pgview")) { ctx.output(viewTag, input); } else if (type.equals("activity")) { ctx.output(activityTag, input); } else { ctx.output(orderTag, input); } // 輸出主流的數據,此處不輸出主流數據的話,在外面則獲取不到主流數據 out.collect(input); } }); // 輸出的測流只能通過getSideOutput // DataStream<Tuple3<String, String, String>> viewDataStream = outDataStream.getSideOutput(viewTag); // viewDataStream.print(); outDataStream.print(); env.execute(); } }
改進使用processElement方法

package cn._51doit.flink.day06; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * 1.將數據整理成Tuple3 * 2.然后使用側流輸出將數據分類 */ public class SideOutputsDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // view,pid,2020-03-09 11:42:30 // activity,a10,2020-03-09 11:42:38 // order,o345,2020-03-09 11:42:38 DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); OutputTag<Tuple3<String, String, String>> viewTag = new OutputTag<Tuple3<String, String, String>>("view-tag") { }; OutputTag<Tuple3<String, String, String>> activityTag = new OutputTag<Tuple3<String, String, String>>("activity-tag") { }; OutputTag<Tuple3<String, String, String>> orderTag = new OutputTag<Tuple3<String, String, String>>("order-tag") { }; //直接調用process方法 SingleOutputStreamOperator<Tuple3<String, String, String>> tpDataStream = lines.process(new ProcessFunction<String, Tuple3<String, String, String>>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void processElement(String input, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception { //1.將字符串轉成Tuple2 String[] fields = input.split(","); String type = fields[0]; String id = fields[1]; String time = fields[2]; Tuple3<String, String, String> tp = Tuple3.of(type, id, time); //2.對數據打標簽 //將數據打上標簽 if (type.equals("view")) { //輸出數據,將數據和標簽關聯 ctx.output(viewTag, tp); //ctx.output 輸出側流的 } else if (type.equals("activity")) { ctx.output(activityTag, tp); } else { ctx.output(orderTag, tp); } //輸出主流的數據 out.collect(tp); } }); //輸出的測流只能通過getSideOutput DataStream<Tuple3<String, String, String>> viewDataStream = tpDataStream.getSideOutput(viewTag); //分別處理各種類型的數據。 viewDataStream.print(); env.execute(); } }
5. 將kafka中數據寫入HDFS
- 方案一:使用flume,具體見下圖:
- 方案二:使用StreamingFileSink,此種形式更加好,其可以按照需求滾動生成文件
6 KafkaProducer的使用
現在的需求是將kafka中的數據進行處理(分主題等),然后寫回kafka中去。如下所示
這時可以使用flink的自定義sink往kafka中寫數據,具體代碼如下
KafkaSinkDemo(老版本1.9以前)

package cn._51doit.flink.day06; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class KafkaSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>( "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", // broker list "etl-test", // target topic new SimpleStringSchema()); // serialization schema myProducer.setWriteTimestampToKafka(true); //將數據寫入到Kafka lines.addSink(myProducer); env.execute(); } }
KafkaSinkDemo2(flink1.9以后)

package cn._51doit.flink.day06; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; /** * 使用新的Kafka Sink API */ public class KafkaSinkDemo2 { public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]); DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class); //寫入Kafka的topic String topic = "etl-test"; //設置Kafka相關參數 Properties properties = new Properties(); properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + ""); properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092"); //創建FlinkKafkaProducer FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( topic, //指定topic new KafkaStringSerializationSchema(topic), //指定寫入Kafka的序列化Schema properties, //指定Kafka的相關參數 FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義 ); //添加KafkaSink lines.addSink(kafkaProducer); //執行 FlinkUtils.env.execute(); } }
這里需要注意一個點,要設置如下參數:
properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
kafka brokers默認的最大事務超時時間為15min,生產者設置事務時不允許大於這個值。但是在默認的情況下,FlinkKafkaProducer設置事務超時屬性為1h,超過了默認transaction.max.ms 15min。這個時候我們選擇生產者的事務超時屬性transaction.timeout.ms小於15min即可
7. 練習(未練)