最近組里要對用戶數據做一個數據分析系統,然后組里讓先研究下大數據技術了,所以呢也是帶着一臉懵的就開始google大數據的東西,結果出來了一堆,感覺大數據的知識體系有點龐大,看了一堆就決定先從flink入手了,因為公司有的組主要在開發這個。
本文是最簡單的入門demo,單機搭建,目的是自己先跑起來,知道這東西大概有個什么用處。
步驟一 安裝
安裝flink之前需要先安裝jvm,這個可以自行百度。
下載flink,地址為https://archive.apache.org/dist/flink/ ,這個地址下載速度有點慢,也可以選擇騰訊鏡像地址 https://mirrors.cloud.tencent.com/apache/flink/ 。我是下載的是,linux下應該也是可以直接使用wget下載的
如果沒有linux環境也可以下載1.8版本及以前的,這樣的話可以直接在windows版本下啟動
在安裝包的同級目錄下執行解壓命令
[root@localhost bin]# tar -xvf flink-1.10.3-bin-scala_2.11.tgz
然后進入bin目錄執行安裝啟動腳本,不出意外可以啟動成功(記得默認8081端口不能占用)
[root@localhost package]# cd flink-1.10.3/bin/ [root@localhost bin]# ./start-cluster.sh
如果是windows環境啟動則1.8及以前的版本解壓后在bin目錄下啟動start-cluster.bat啟動
然后記得關閉linux防火牆,瀏覽器訪問8081(默認)端口,出現以下界面就算安裝成功了
步驟二 編寫flink job
此處先大概了解下,一個job的目的是為了接收數據→處理數據→輸出數據,這個入門級的demo參考官方的例子了。大概就是的獲取tcp服務端的信息(服務端會發送單詞),統計單詞出現的數量,最后將數據以hash的形式寫入redis。
本例子是java編寫的maven項目
首先是pom.xml (注意<mainClass>com.flink.starter.FlinkStarter</mainClass> 這項中的類要換成自己的啟動類)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bigdata</groupId> <artifactId>flink-starter</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.8.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.0</version> </dependency> </dependencies> <repositories> <repository> <id>aliyun</id> <name>aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.flink.starter.FlinkStarter</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>
第一個是創建一個redis數據結構的處理器,也就是定義key和value
public class SinkRedisMapper implements RedisMapper<Tuple2<String,Integer>> { @Override public RedisCommandDescription getCommandDescription() { //hset return new RedisCommandDescription(RedisCommand.HSET,"flink"); } @Override public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) { return stringIntegerTuple2.f0; } @Override public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) { return stringIntegerTuple2.f1.toString(); } }
然后創建一個信息處理器,標識以空格分隔字符串
package com.flink.handler; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author zm1204760 * @version Id: LineSplitter, v 0.1 2021/2/25 18:01 zm1204760 Exp $ */ public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> { public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for(String token : tokens){ if(token.length() > 0){ collector.collect(new Tuple2<String,Integer>(token,1)); } } } }
最后創建job運行類 注意192.168.75.128 6379是redis的連接信息,這兒根據實際情況連接 ,沒有redis也可以將sumed.addSink注釋掉,放出sumed.print(),這樣數據會打印到控制台
public class FlinkStarter { public static void main(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); String address = parameterTool.get("d"); int port = parameterTool.getInt("p"); //1.創建一個 flink steam 程序的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.使用StreamExecutionEnvironment創建DataStream //Source(可以有多個Source) //Socket 監聽本地端口8888(亦可監聽linux環境下的某一台機器) DataStreamSource<String> lines = env.socketTextStream(address, port); //Transformation(s)對數據進行處理操作 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { //切分 String[] words = line.split("\\W+"); //循環, for (String word : words) { //將每個單詞與 1 組合,形成一個元組 Tuple2<String, Integer> tp = Tuple2.of(word, 1); //將組成的Tuple放入到 Collector 集合,並輸出 out.collect(tp); } } }); //進行分組聚合(keyBy:將key相同的分到一個組中) SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f0; } }).sum(1); //Transformation 結束 //3.調用Sink (Sink必須調用) //sumed.print(); //sumed.print(); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.75.128").setPort(6379).build(); sumed.addSink(new RedisSink<>(conf,new SinkRedisMapper())); //啟動(這個異常不建議try...catch... 捕獲,因為它會拋給上層flink,flink根據異常來做相應的重啟策略等處理) env.execute("StreamWordCount"); } }
本地就可以運行調試了。記住這兒監聽的輸入源需要提前開啟,否則會啟動失敗,本案例將地址和端口以參數的形式的傳入,所以啟動命令如下
記住運行前需要提前開啟數據源,這兒以netassist舉例,本地開啟server模式后運行job,可以看到已經連接的信息
然后我們發一些單詞,就可以在控制台看到打印信息
如果設置了redis的輸出源,那么這些信息也會輸出到redis里面
步驟三 發布job
任務調試成功后,就可以打包發布了
第一步 打包 本案例使用maven
mvn clean package -Dmaven.test.skip=true
打包好后最好本地測試下,確認包沒問題再進行下一步
第二步上傳
此時瀏覽器打開步驟一中啟動的flink界面,然后進入submit new job菜單,然后add new job 找到剛打包好的jar包上傳,不同版本界面可能有些細微變化,但是功能應該是差不多的。記得設置啟動類和啟動參數
最后點擊submit則代表提交任務,如果提交成功則可以在Running jobs里面找到剛剛的任務
最后我們用之前的netassist助手接着發送一些數據,在這兒就可以看到接收和輸出的記錄。到此最簡單的一個flink demo就完成了
控制台也能看到打印信息