flink單機搭建以及快速編寫一個簡單的java job demo運行


  最近組里要對用戶數據做一個數據分析系統,然后組里讓先研究下大數據技術了,所以呢也是帶着一臉懵的就開始google大數據的東西,結果出來了一堆,感覺大數據的知識體系有點龐大,看了一堆就決定先從flink入手了,因為公司有的組主要在開發這個。

本文是最簡單的入門demo,單機搭建,目的是自己先跑起來,知道這東西大概有個什么用處。

 

步驟一 安裝

  安裝flink之前需要先安裝jvm,這個可以自行百度。

  下載flink,地址為https://archive.apache.org/dist/flink/ ,這個地址下載速度有點慢,也可以選擇騰訊鏡像地址 https://mirrors.cloud.tencent.com/apache/flink/ 。我是下載的是,linux下應該也是可以直接使用wget下載的

  如果沒有linux環境也可以下載1.8版本及以前的,這樣的話可以直接在windows版本下啟動

 

   在安裝包的同級目錄下執行解壓命令

[root@localhost bin]# tar -xvf flink-1.10.3-bin-scala_2.11.tgz

  然后進入bin目錄執行安裝啟動腳本,不出意外可以啟動成功(記得默認8081端口不能占用)

[root@localhost package]# cd flink-1.10.3/bin/
[root@localhost bin]# ./start-cluster.sh

  如果是windows環境啟動則1.8及以前的版本解壓后在bin目錄下啟動start-cluster.bat啟動

  然后記得關閉linux防火牆,瀏覽器訪問8081(默認)端口,出現以下界面就算安裝成功了

 

   

步驟二 編寫flink job

  此處先大概了解下,一個job的目的是為了接收數據→處理數據→輸出數據,這個入門級的demo參考官方的例子了。大概就是的獲取tcp服務端的信息(服務端會發送單詞),統計單詞出現的數量,最后將數據以hash的形式寫入redis。

  本例子是java編寫的maven項目

  首先是pom.xml (注意<mainClass>com.flink.starter.FlinkStarter</mainClass> 這項中的類要換成自己的啟動類)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bigdata</groupId>
    <artifactId>flink-starter</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <flink.version>1.8.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>

        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>aliyun</id>
            <name>aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.flink.starter.FlinkStarter</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

  

  第一個是創建一個redis數據結構的處理器,也就是定義key和value

public class SinkRedisMapper implements RedisMapper<Tuple2<String,Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        //hset
        return new RedisCommandDescription(RedisCommand.HSET,"flink");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        return stringIntegerTuple2.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        return stringIntegerTuple2.f1.toString();
    }
}

   然后創建一個信息處理器,標識以空格分隔字符串

package com.flink.handler;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @author zm1204760
 * @version Id: LineSplitter, v 0.1 2021/2/25 18:01 zm1204760 Exp $
 */
public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        String[] tokens = s.toLowerCase().split("\\W+");
        for(String token : tokens){
            if(token.length() > 0){
                collector.collect(new Tuple2<String,Integer>(token,1));
            }
        }
    }
}

  最后創建job運行類   注意192.168.75.128 6379是redis的連接信息,這兒根據實際情況連接 ,沒有redis也可以將sumed.addSink注釋掉,放出sumed.print(),這樣數據會打印到控制台

public class FlinkStarter {

    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String address = parameterTool.get("d");
        int port = parameterTool.getInt("p");

        //1.創建一個 flink steam 程序的執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.使用StreamExecutionEnvironment創建DataStream
        //Source(可以有多個Source)
        //Socket 監聽本地端口8888(亦可監聽linux環境下的某一台機器)
        DataStreamSource<String> lines = env.socketTextStream(address, port);

        //Transformation(s)對數據進行處理操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                //切分
                String[] words = line.split("\\W+");
                //循環,
                for (String word : words) {
                    //將每個單詞與 1 組合,形成一個元組
                    Tuple2<String, Integer> tp = Tuple2.of(word, 1);
                    //將組成的Tuple放入到 Collector 集合,並輸出
                    out.collect(tp);
                }
            }
        });

        //進行分組聚合(keyBy:將key相同的分到一個組中)
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        }).sum(1);

        //Transformation 結束

        //3.調用Sink (Sink必須調用)
        //sumed.print();
        //sumed.print();
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.75.128").setPort(6379).build();
        sumed.addSink(new RedisSink<>(conf,new SinkRedisMapper()));
        //啟動(這個異常不建議try...catch... 捕獲,因為它會拋給上層flink,flink根據異常來做相應的重啟策略等處理)

        env.execute("StreamWordCount");
    }
}

  本地就可以運行調試了。記住這兒監聽的輸入源需要提前開啟,否則會啟動失敗,本案例將地址和端口以參數的形式的傳入,所以啟動命令如下

  記住運行前需要提前開啟數據源,這兒以netassist舉例,本地開啟server模式后運行job,可以看到已經連接的信息

 

 

   然后我們發一些單詞,就可以在控制台看到打印信息

 

 如果設置了redis的輸出源,那么這些信息也會輸出到redis里面

步驟三 發布job

  任務調試成功后,就可以打包發布了

  第一步  打包  本案例使用maven

mvn clean package -Dmaven.test.skip=true

   打包好后最好本地測試下,確認包沒問題再進行下一步

  第二步上傳

此時瀏覽器打開步驟一中啟動的flink界面,然后進入submit new job菜單,然后add new job 找到剛打包好的jar包上傳,不同版本界面可能有些細微變化,但是功能應該是差不多的。記得設置啟動類和啟動參數

 

  最后點擊submit則代表提交任務,如果提交成功則可以在Running jobs里面找到剛剛的任務

 

 最后我們用之前的netassist助手接着發送一些數據,在這兒就可以看到接收和輸出的記錄。到此最簡單的一個flink demo就完成了

 控制台也能看到打印信息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM