Flink學習筆記:Connectors概述


本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

Flink大數據項目實戰:http://t.cn/EJtKhaz

1. 各種Connector

1.1Connector是什么鬼

Connectors是數據進出Flink的一套接口和實現,可以實現Flink與各種存儲、系統的連接

注意:數據進出Flink的方式不止Connectors,還有:

1.Async I/O(類Source能力):異步訪問外部數據庫

2.Queryable State(類Sink能力):當讀多寫少時,外部應用程序從Flink拉取需要的數據,而不是Flink把大量數據推入外部系統(后面再講)

1.2哪些渠道獲取connector

預定義Source和Sink:直接就用,無序引入額外依賴,一般用於測試、調試。

捆綁的Connectors:需要專門引入對應的依賴(按需),主要是實現外部數據進出Flink

1.Apache Kafka (source/sink)

2.Apache Cassandra (sink)

3.Amazon Kinesis Streams (source/sink)

4.Elasticsearch (sink)

5.Hadoop FileSystem (sink)

6.RabbitMQ (source/sink)

7.Apache NiFi (source/sink)

8.Twitter Streaming API (source)

Apache Bahir

1.Apache ActiveMQ (source/sink)

2.Apache Flume (sink)

3.Redis (sink)

4.Akka (sink)

5.Netty (source)

1.3預定義Source

預定義Source包含以下幾類:

1.基於文件

readTextFile

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnviro nment();

DataStream<String> lines = env.readTextFile("file:///path");

readFile

DataStream<String> lines = env.readFile(inputFormat, "file:///path");

2.基於Socket

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnviro nment();

DataStream<String> socketLines = env .socketTextStream("localhost", 9998);

3.基於Elements 和Collections

fromElements 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnviro nment();

DataStream<String> names = env.fromElements("hello", "world", "!");

fromCollections

List<String> list = new ArrayList<String>(); list.add("Hello"); list.add("world");

list.add("!");

DataStream<String> names = env.fromCollection(list);

使用場景: 應用本地測試,但是流處理應用會出現Finished的狀態

1.4預定義Sink

stream.print() /printToErr()(注: 線上應用杜絕使用,采用抽樣打印或者日志的方式)

stream.writeAsText("/path/to/file")/ TextOutputFormat

stream.writeAsCsv(“/path/to/file”)/ CsvOutputFormat

writeUsingOutputFormat() / FileOutputFormat 

stream.writeToSocket(host, port, SerializationSchema)

1.5隊列系統Connector(捆綁)

支持Source 和 Sink

需要專門引入對應的依賴(按需),主要是實現外部數據進出Flink

1.Kafka(后續專門講)

2.RabbitMQ

1.6存儲系統Connector(捆綁)

只支持Sink

1.HDFS

2.ElasticSearch

3.Redis

4.Apache Cassandra

1.7 Source容錯性保證

1.8 Sink容錯性保證

2. 自定義Source與Sink

2.1自定義Source

1.實現SourceFunction(非並行,並行度為1)

1)適用配置流,通過廣播與時間流做交互

2)繼承SourceFuncion, 實現run 方法

3)cancel 方法需要處理好(cancel 應用的時候,這個方法會被調用)

4)基本不需要做容錯性保證

2.實現ParallelSourceFunction

1)實現ParallelSourceFunction類或者繼承RichParallelSourceFunction。

2)實現切分數據的邏輯 

3)實現CheckpointedFunction接口,來保證容錯保證。

4)Source 擁有回溯讀取,可以減少的狀態的保存。

3.繼承RichParallelSourceFunction

2.2自定義Sink

1)實現SinkFunction 接口或者繼承RichSinkFunction。

2)實現CheckpointedFunction, 做容錯性保證。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM