安裝環境信息
flink-1.6.2-bin-hadoop27-scala_2.11.tgz
hadoop-2.7.5
java 1.8
zookeeper 3.4.6
os:centos 6.4
- 1
- 2
- 3
- 4
- 5
1、下載
直接去flink的社區下載就可以了。http://flink.apache.org/downloads.html
2、解壓
tar -zxvf flink-1.6.2-bin-hadoop27-scala_2.11.tgz
- 1
3、修改環境變量 ~.bash_profile
export FLINK_HOME=/opt/flink-1.6.2
export PATH=$FLINK_HOME/bin:$PATH
- 1
- 2
4、修改flink-conf.yaml配置文件,先配置一個簡單版本,standalone的模式
Hadoop的nameservice
jobmanager.rpc.address: cdh1
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 12
- 1
- 2
- 3
- 4
- 5
- 6
- 7
5、修改slaves和masters2個文件,用來配置taskManager和JobManager信息
[hadoop@cdh1 conf]$ cat slaves
cdh2
cdh3
cdh4
cdh5
[hadoop@cdh1 conf]$ cat masters
cdh1:8081
- 1
- 2
- 3
- 4
- 5
- 6
- 7
6、將flink安裝所有信息已經環境信息同步到其他機器上面,這里有幾台機器就要執行幾次
scp .bash_profile hadoop@cdh3:~/.bash_profile
scp -r ./flink-1.6.2 hadoop@cdh3:/opt/
- 1
- 2
7、啟動flink
[hadoop@cdh1 bin]$ ./start-cluster.sh
8、啟動完成已經我們可以用jps。分別可以看到JobManager和TaskManager的2個進程
[hadoop@cdh1 bin]$ jps
3866 StandaloneSessionClusterEntrypoint
[hadoop@cdh2 ~]$ jps
3534 TaskManagerRunner
- 1
- 2
- 3
- 4
8、登錄JobManager的地址查看ui http://192.168.18.160:8081
已經表示搭建完成了,現在我們開始驗證一下集群
使用start-scala-shell.sh來驗證${FLINK_HOME}/bin/start-scala-shell.sh
是flink提供的交互式clinet,可以用於代碼片段的測試,方便開發工作,它有兩種啟動方式,一種是工作在本地,另一種是工作到集群。本例中因為機器連接非常方便,就直接使用集群進行測試,在開發中,如果集群連接不是非常方便,可以連接到本地,在本地開發測試通過后,再連接到集群進行部署工作。如果程序有依賴的jar包,則可以使用 -a <path/to/jar.jar> 或 --addclasspath <path/to/jar.jar>參數來添加依賴。
1.本地連接
${FLINK_HOME}/bin/start-scala-shell.sh local
- 1
2.集群連接
${FLINK_HOME}/bin/start-scala-shell.sh remote <hostname> <portnumber>
- 1
3.帶有依賴包的格式
${FLINK_HOME}/bin/start-scala-shell.sh [local|remote<host><port>] --addclasspath<path/to/jar.jar>
- 1
4.查看幫助
${FLINK_HOME}/bin/start-scala-shell.sh --help
[hadoop@cdh2 bin]$ ./start-scala-shell.sh --help
Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
Command: local [options]
Starts Flink scala shell with a local Flink cluster
-a, --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster
<host> Remote host name as string
<port> Remote port as integer
-a, --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster
-n, --container arg Number of YARN container to allocate (= Number of TaskManagers)
-jm, --jobManagerMemory arg
Memory for JobManager container
-nm, --name <value> Set a custom name for the application on YARN
-qu, --queue <arg> Specifies YARN queue
-s, --slots <arg> Number of slots per TaskManager
-tm, --taskManagerMemory <arg>
Memory per TaskManager container
-a, --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
--configDir <value> The configuration directory.
-h, --help Prints this usage text
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
我們 使用集群模式去驗證
[hadoop@cdh1 bin]$ ./start-scala-shell.sh remote 192.168.18.160 8081
- 1
運行如下案例代碼
Scala> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala> counts.print()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
運行結果
web url也可以看到詳細的信息
遇到異常情況:
我們這邊是因為安裝了Scala導致通信失敗,將Scala的環境信息去掉就可以了。具體問題還不是很清楚,待后續查明白。
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
2018-11-19 01:49:52,298 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Job Socket Window WordCount (8b38f995aa8e61fd520b61e0888ecd46) switched from state RUNNING to FAILING.
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)