1 Flink的前世今生(生態很重要)
原文:https://blog.csdn.net/shenshouniu/article/details/84439459
很多人可能都是在 2015 年才聽到 Flink 這個詞,其實早在 2008 年,Flink 的前身已經是柏林理工大學一個研究性項目, 在 2014 被 Apache 孵化器所接受,然后迅速地成為了 ASF(Apache Software Foundation)的頂級項目之一。
Apache Flink is an open source platform for distributed stream and batch data
processing. Flink’s core is a streaming dataflow engine that provides data
distribution, communication, and fault tolerance for distributed computations
over data streams. Flink builds batch processing on top of the streaming engine,
overlaying native iteration support, managed memory, and program optimization.
Apache Flink 是一個開源的分布式,高性能,高可用,准確的流處理框架。
主要由 Java 代碼實現,提供Java 和scala接口。
支持實時流(stream)處理和批(batch)處理,批數據只是流數據的一個極限特例。
Flink原生支持了迭代計算、內存管理和程序優化。
Flink目前也在重力打造屬於自己的大數據生態。(FinkSQL , Flink ML ,Flink Gelly等)
2 吞吐量悖論
流處理和批處理的糾結選擇和不容水火,Flink通過靈活的執行引擎,能夠同時支持批處理任務與流處理任務,但是悖論是永遠存在的。
流處理:Flink以固定的緩存塊為單位進行網絡數據傳輸,用戶可以通過設置緩存塊超時值指定緩存塊的傳輸時機。如果緩存塊的超時值為0,則Flink的數據傳輸方式類似上文所提到流處理系統的標准模型,此時系統可以獲得最低的處理延遲。
批處理:如果緩存塊的超時值為無限大,則Flink的數據傳輸方式類似上文所提到批處理系統的標准模型,此時系統可以獲得最高的吞吐量。
靈活的秘密:緩存塊的超時值也可以設置為0到無限大之間的任意值。緩存塊的超時閾值越小,則Flink流處理執行引擎的數據處理延遲越低,但吞吐量也會降低,反之亦然。通過調整緩存塊的超時閾值,用戶可根據需求靈活地權衡系統延遲和吞吐量。
3 容錯的抉擇(Flink or Spark)
SparkStreaming :微批次模型,EOS語義,基於RDD Checkpoint進行容錯,基於checkpoint狀態管理。狀態的狀態操作基於DStream模板進行管理,延時中等水平,吞吐量很高。詳情請參考我的SparkStreaming源碼解讀。
Flink :流處理模型,EOS語義,基於兩種狀態管理進行容錯,即:State和checkpoint兩種機制。狀態操作可以細粒化到算子等操作上。延時不僅低,而且吞吐量也非常高。
- State 基於task和operator兩種狀態。State類型進一步細分為
Keyed State和 Operator State 兩種類型
- checkpoint 基全局快照來實現數據容錯,注意:State的狀態保存在java的堆里面,
checkpoint則通過定時實現全局(所有State)狀態的持久化。
說實在的,Flink很狂妄:
4 Stanalone 環境全方位剖析
4.1 Stanalone 模式
集群節點規划(一主兩從)
1 基礎環境:
jdk1.8及以上【需要配置JAVA_HOME】
ssh免密碼登錄(至少要實現主節點能夠免密登錄到從節點)
主機名hostname
/etc/hosts文件配置主機名和ip的映射關系
192.168.1.160 SparkMaster
192.168.1.161 SparkWorker1
192.168.1.162 SparkWorker2
關閉防火牆
2 在SparkMaster節點上主要需要修改的配置信息
cd /usr/local/flink-1.6.1/conf
vi flink-conf.yaml
jobmanager.rpc.address: SparkMaster
3 slaves修改
vi slaves
SparkWorker1
SparkWorker2
4 然后再把修改好的flink目錄拷貝到其他兩個節點即可
scp -rq flink-1.6.1 SparkWorker1:/usr/local/
scp -rq flink-1.6.1 SparkWorker2:/usr/local/
4.2 Stanalone 運行展示
這里發生一個小插曲,因為yarn配置文件不一致,導致 hadoop Web UI 無法正常顯示所有NodeManager。所以注意配置文件的一致性。
SparkMaster節點進程:
14273 SecondaryNameNode
15010 Worker
14038 DataNode
25031 StandaloneSessionClusterEntrypoint
13895 NameNode
14903 Master
14424 ResourceManager
14569 NodeManager
25130 Jps
SparkWorker節點進程:
5732 Worker
10420 NodeManager
10268 DataNode
10540 Jps
8351 TaskManagerRunner
上圖一張:
4.3 Stanalone 簡單任務測試
(1) 增量聚合: 窗口中每進入一條數據,就進行一次計算
實現方法主要有:
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
(2) 全量聚合: 等於窗口內的數據到齊,才開始進行聚合計算
全量聚合:可以實現對窗口內的數據進行排序等需
實現方法主要有:
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。
全量聚合詳細案例如下:
public class SocketDemoFullCount {
public static void main(String[] args) throws Exception{
//獲取需要的端口號
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9010--java");
port = 9010;
}
//獲取flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "SparkMaster";
String delimiter = "\n";
//連接socket獲取輸入的數據
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> map(String value) throws Exception {
return new Tuple2<>(1,Integer.parseInt(value));
}
});
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)
throws Exception {
System.out.println("執行process......");
long count = 0;
for(Tuple2<Integer,Integer> element: elements){
count++;
}
out.collect("window:"+context.window()+",count:"+count);
}
}).print();
//這一行代碼一定要實現,否則程序不執行
env.execute("Socket window count");
}
}
(3) 數據源
root@SparkMaster:/usr/local/install/hadoop-2.7.3/sbin# nc -l 9010
(4) 運行結果
4.4 Stanalone 參數調優設置
參數調優設置:
1.jobmanager.heap.mb:jobmanager節點可用的內存大小
2.taskmanager.heap.mb:taskmanager節點可用的內存大小
3.taskmanager.numberOfTaskSlots:每台機器可用的cpu數量
4.parallelism.default:默認情況下任務的並行度
5.taskmanager.tmp.dirs:taskmanager的臨時數據存儲目錄
slot和parallelism總結:
1.slot是靜態的概念,是指taskmanager具有的並發執行能力
2.parallelism是動態的概念,是指程序運行時實際使用的並發能力
3.設置合適的parallelism能提高運算效率,太多了和太少了都不行
4.5 Stanalone 集群啟動與掛機
啟動jobmanager
如果集群中的jobmanager進程掛了,執行下面命令啟動。
bin/jobmanager.sh start
bin/jobmanager.sh stop
啟動taskmanager
添加新的taskmanager節點或者重啟taskmanager節點
bin/taskmanager.sh start
bin/taskmanager.sh stop
5 資源調度環境(Yarn 模式)
5.1 模式1:(常駐session)
開辟資源 yarn - session . sh
1啟動一個一直運行的flink集群
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
2 附着到一個已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029
3 資源所在地/tmp/.yarn-properties-root.
2018-11-24 17:24:19,644 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
- Found Yarn properties file under /tmp/.yarn-properties-root.
4:yarn資源描述
root@SparkMaster:/usr/local/install/flink-1.6.1# vim /tmp/.yarn-properties-root
#Generated YARN properties file
#Sat Nov 24 17:39:07 CST 2018
parallelism=2
dynamicPropertiesString=
applicationID=application_1543052238521_0001
執行任務flink run
3 執行任務
hadoop fs -mkdir /input/
hadoop fs -put README.txt /input/
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://SparkMaster:9000/input/README.txt -output hdfs://SparkMaster:9000/wordcount-result.txt
4:執行結果
root@SparkMaster:/usr/local/install/flink-1.6.1# hadoop fs -cat /wordcount-result.txt
1 1
13 1
5d002 1
740 1
about 1
account 1
administration 1
停止任務 【web界面或者命令行執行cancel命令】
1
5.2 模式2:(session獨立互不影響)
1 啟動集群,執行任務
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://SparkMaster:9000/input/README.txt -output hdfs://SparkMaster:9000/wordcount-result6.txt
2018-11-24 17:56:18,066 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2018-11-24 17:56:18,078 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2018-11-24 17:56:24,901 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2 :提交一次,生成一個Yarn-session
6 flink run 參數指定:
1 參數必選 :
-n,--container <arg> 分配多少個yarn容器 (=taskmanager的數量)
2 參數可選 :
-D <arg> 動態屬性
-d,--detached 獨立運行
-jm,--jobManagerMemory <arg> JobManager的內存 [in MB]
-nm,--name 在YARN上為一個自定義的應用設置一個名字
-q,--query 顯示yarn中可用的資源 (內存, cpu核數)
-qu,--queue <arg> 指定YARN隊列.
-s,--slots <arg> 每個TaskManager使用的slots數量
-tm,--taskManagerMemory <arg> 每個TaskManager的內存 [in MB]
-z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上創建NameSpace
-id,--applicationId <yarnAppId> YARN集群上的任務id,附着到一個后台運行的yarn session中
3 run [OPTIONS] <jar-file> <arguments>
run操作參數:
-c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定
-m,--jobmanager <host:port> 指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同於配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的並行度。可以覆蓋配置文件中的默認值。
4 啟動一個新的yarn-session,它們都有一個y或者yarn的前綴
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
連接指定host和port的jobmanager:
./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
啟動一個新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
5 注意:命令行的選項也可以使用./bin/flink 工具獲得。
6 Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main" method or "getPlan()" method.
Only needed if the JAR file does not
specify the class in its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
7 Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
6 結語
Flink 是一個是一個開源的分布式,高性能,高可用,准確的流處理框架。主要由 Java 代碼實現。支持實時流(stream)處理和批(batch)處理,批數據只是流數據的一個極限特例。
Flink原生支持了迭代計算、內存管理和程序優化。本文立意在運行原理兼部署及Yarn運行模式,后續精彩內容請持續關注本博客,辛苦成文,各自珍惜,謝謝!