本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
1. 各種Connector
1.1Connector是什么鬼
Connectors是數據進出Flink的一套接口和實現,可以實現Flink與各種存儲、系統的連接
注意:數據進出Flink的方式不止Connectors,還有:
1.Async I/O(類Source能力):異步訪問外部數據庫
2.Queryable State(類Sink能力):當讀多寫少時,外部應用程序從Flink拉取需要的數據,而不是Flink把大量數據推入外部系統(后面再講)
1.2哪些渠道獲取connector
預定義Source和Sink:直接就用,無序引入額外依賴,一般用於測試、調試。
捆綁的Connectors:需要專門引入對應的依賴(按需),主要是實現外部數據進出Flink
1.Apache Kafka (source/sink)
2.Apache Cassandra (sink)
3.Amazon Kinesis Streams (source/sink)
4.Elasticsearch (sink)
5.Hadoop FileSystem (sink)
6.RabbitMQ (source/sink)
7.Apache NiFi (source/sink)
8.Twitter Streaming API (source)
Apache Bahir
1.Apache ActiveMQ (source/sink)
2.Apache Flume (sink)
3.Redis (sink)
4.Akka (sink)
5.Netty (source)
1.3預定義Source
預定義Source包含以下幾類:
1.基於文件
readTextFile
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnviro nment();
DataStream<String> lines = env.readTextFile("file:///path");
readFile
DataStream<String> lines = env.readFile(inputFormat, "file:///path");
2.基於Socket
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnviro nment();
DataStream<String> socketLines = env .socketTextStream("localhost", 9998);
3.基於Elements 和Collections
fromElements
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnviro nment();
DataStream<String> names = env.fromElements("hello", "world", "!");
fromCollections
List<String> list = new ArrayList<String>(); list.add("Hello"); list.add("world");
list.add("!");
DataStream<String> names = env.fromCollection(list);
使用場景: 應用本地測試,但是流處理應用會出現Finished的狀態
1.4預定義Sink
stream.print() /printToErr()(注: 線上應用杜絕使用,采用抽樣打印或者日志的方式)
stream.writeAsText("/path/to/file")/ TextOutputFormat
stream.writeAsCsv(“/path/to/file”)/ CsvOutputFormat
writeUsingOutputFormat() / FileOutputFormat
stream.writeToSocket(host, port, SerializationSchema)
1.5隊列系統Connector(捆綁)
支持Source 和 Sink
需要專門引入對應的依賴(按需),主要是實現外部數據進出Flink
1.Kafka(后續專門講)
2.RabbitMQ
1.6存儲系統Connector(捆綁)
只支持Sink
1.HDFS
2.ElasticSearch
3.Redis
4.Apache Cassandra
1.7 Source容錯性保證

1.8 Sink容錯性保證

2. 自定義Source與Sink
2.1自定義Source
1.實現SourceFunction(非並行,並行度為1)
1)適用配置流,通過廣播與時間流做交互
2)繼承SourceFuncion, 實現run 方法
3)cancel 方法需要處理好(cancel 應用的時候,這個方法會被調用)
4)基本不需要做容錯性保證
2.實現ParallelSourceFunction
1)實現ParallelSourceFunction類或者繼承RichParallelSourceFunction。
2)實現切分數據的邏輯
3)實現CheckpointedFunction接口,來保證容錯保證。
4)Source 擁有回溯讀取,可以減少的狀態的保存。
3.繼承RichParallelSourceFunction
2.2自定義Sink
1)實現SinkFunction 接口或者繼承RichSinkFunction。
2)實現CheckpointedFunction, 做容錯性保證。

