構建一個flink程序,從kafka讀取然后寫入MYSQL


  最近flink已經變得比較流行了,所以大家要了解flink並且使用flink。現在最流行的實時計算應該就是flink了,它具有了流計算和批處理功能。它可以處理有界數據和無界數據,也就是可以處理永遠生產的數據。具體的細節我們不討論,我們直接搭建一個flink功能。總體的思路是source -> transform -> sink,即從source獲取相應的數據來源,然后進行數據轉換,將數據從比較亂的格式,轉換成我們需要的格式,轉換處理后,然后進行sink功能,也就是將數據寫入到相應的db里邊或文件中用於存儲和展現。

  接下來我們需要下載flink,kafka,mysql, zookeeper,  我直接下載了tar或tgz包,然后解壓。

  下載好flink之后,然后啟動一下,比如我下載了flink-1.9.1-bin-scala_2.11.tgz,然后解壓一下。

tar -zxvf flink-1.9.1-bin-scala_2.11.tgz
cd flink-1.9.1
./bin/start-cluster.sh

  啟動好了之后訪問 http://localhost:8081 ,會看到截圖。

 

 

   下載zookeeper,解壓之后,復制zookeeper/conf下的zoo_sample.cfg, 然后啟動,命令如下,zookeeper是和kafka結合使用的,因為kafka要監聽和發現所有broker的。

cp zoo_sample.cfg zoo.cfg
cd ../
./bin/zkServer.sh start

   接下來下載kafka和啟動, 創建一個person的topic,由一個partition和一個備份構成。

./bin/kafka-server-start.sh config/server.properties
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person

  mysql的話,大家可以自行安裝了,安裝好之后可以在數據庫里創建一張表。

CREATE TABLE `Person` (
  `id` mediumint NOT NULL auto_increment,
  `name` varchar(255) NOT NULL,
  `age` int(11) DEFAULT NULL,
  `createDate` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

  接下來我們該創建一個JAVA工程,采用的maven的方式。前提是大家一定要先安裝好maven,可以執行mvn命令。直接執行一下maven的時候可能會卡住,下載不了,我先從

  https://repo.maven.apache.org/maven2/上下載一個  archetype-catalog.xml 文件,然后放到本地的maven對應的庫,你們可以參考這個我的路徑進行調整。    /Users/huangqingshi/.m2/repository/org/apache/maven/archetype/archetype-catalog/2.4
  
mvn archetype:generate \ 
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.7.2 \
    -DgroupId=flink-project \
    -DartifactId=flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false \  #這個是創建項目時采用交互方式,上邊指定了了相關的版本號和包名等信息,所以不需要交互方式進行。
    -DarchetypeCatalog=local  #這個是使用上邊下載的文件,local也就是從本地文件獲取,因為遠程獲取特別慢。導致工程生成不了。

  我的這個項目添加了一些依賴比如kafka的,數據庫連接等,具體的pom文件內容為:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flink-project</groupId>
    <artifactId>flink-project</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.7.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>28.1-jre</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.1.0</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.20</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>myflink.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

  接下來,創建一個POJO對象用於保存數據等操作。

package myflink.pojo;

import java.util.Date;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class Person {

    private String name;
    private int age;
    private Date createDate;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }
}

   創建一個寫入kafka的任務,用於將數據寫入到kafka。

package myflink.kafka;

import com.alibaba.fastjson.JSON;
import myflink.pojo.Person;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class KafkaWriter {

    //本地的kafka機器列表
    public static final String BROKER_LIST = "localhost:9092";
    //kafka的topic
    public static final String TOPIC_PERSON = "PERSON";
    //key序列化的方式,采用字符串的形式
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //value的序列化的方式
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

    public static void writeToKafka() throws Exception{
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        //構建Person對象,在name為hqs后邊加個隨機數
        int randomInt = RandomUtils.nextInt(1, 100000);
        Person person = new Person();
        person.setName("hqs" + randomInt);
        person.setAge(randomInt);
        person.setCreateDate(new Date());
        //轉換成JSON
        String personJson = JSON.toJSONString(person);

        //包裝成kafka發送的記錄
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_PERSON, null,
                null, personJson);
        //發送到緩存
        producer.send(record);
        System.out.println("向kafka發送數據:" + personJson);
        //立即發送
        producer.flush();

    }

    public static void main(String[] args) {
        while(true) {
            try {
                //每三秒寫一條數據
                TimeUnit.SECONDS.sleep(3);
                writeToKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}

  創建一個數據庫連接的工具類,用於連接數據庫。使用Druid工具,然后放入具體的Driver,Url,數據庫用戶名和密碼,初始化連接數,最大活動連接數,最小空閑連接數也就是數據庫連接池,創建好之后返回需要的連接。

 
          
package myflink.db;

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;

/**
* @author huangqingshi
* @Date 2019-12-07
*/
public class DbUtils {

private static DruidDataSource dataSource;

public static Connection getConnection() throws Exception {
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/testdb");
dataSource.setUsername("root");
dataSource.setPassword("root");
//設置初始化連接數,最大連接數,最小閑置數
dataSource.setInitialSize(10);
dataSource.setMaxActive(50);
dataSource.setMinIdle(5);
//返回連接
return dataSource.getConnection();
}

}
 

  接下來創建一個MySQLSink,繼承RichSinkFunction類。重載里邊的open、invoke、close方法,在執行數據sink之前先執行open方法,然后開始調用invoke, 調用完成后最后執行close方法。也就是先在open里邊創建數據庫連接,創建好之后進行調用invoke,執行具體的數據庫寫入程序,執行完所有的數據庫寫入程序之后,最后沒有數據之后會調用close方法,將數據庫連接資源進行關閉和釋放。具體參考如下代碼。

package myflink.sink;

import myflink.db.DbUtils;
import myflink.pojo.Person;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.List;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class MySqlSink extends RichSinkFunction<List<Person>> {

    private PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //獲取數據庫連接,准備寫入數據庫
        connection = DbUtils.getConnection();
        String sql = "insert into person(name, age, createDate) values (?, ?, ?); ";
        ps = connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關閉並釋放資源
        if(connection != null) {
            connection.close();
        }

        if(ps != null) {
            ps.close();
        }
    }

    @Override
    public void invoke(List<Person> persons, Context context) throws Exception {
        for(Person person : persons) {
            ps.setString(1, person.getName());
            ps.setInt(2, person.getAge());
            ps.setTimestamp(3, new Timestamp(person.getCreateDate().getTime()));
            ps.addBatch();
        }

        //一次性寫入
        int[] count = ps.executeBatch();
        System.out.println("成功寫入Mysql數量:" + count.length);

    }
}

  創建從kafka讀取數據的source,然后sink到數據庫。配置連接kafka所需要的環境,然后從kafka里邊讀取數據然后transform成Person對象,這個就是上邊所說的transform。收集5秒鍾窗口從kafka獲取的所有數據,最后sink到MySQL數據庫。

package myflink;

import com.alibaba.fastjson.JSONObject;
import myflink.kafka.KafkaWriter;
import myflink.pojo.Person;
import myflink.sink.MySqlSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Properties;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class DataSourceFromKafka {

    public static void main(String[] args) throws Exception{
        //構建流執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //kafka
        Properties prop = new Properties();
        prop.put("bootstrap.servers", KafkaWriter.BROKER_LIST);
        prop.put("zookeeper.connect", "localhost:2181");
        prop.put("group.id", KafkaWriter.TOPIC_PERSON);
        prop.put("key.serializer", KafkaWriter.KEY_SERIALIZER);
        prop.put("value.serializer", KafkaWriter.VALUE_SERIALIZER);
        prop.put("auto.offset.reset", "latest");

        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<String>(
                KafkaWriter.TOPIC_PERSON,
                new SimpleStringSchema(),
                prop
                )).
                //單線程打印,控制台不亂序,不影響結果
                setParallelism(1);

        //從kafka里讀取數據,轉換成Person對象
        DataStream<Person> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, Person.class));
        //收集5秒鍾的總數
        dataStream.timeWindowAll(Time.seconds(5L)).
                apply(new AllWindowFunction<Person, List<Person>, TimeWindow>() {

                    @Override
                    public void apply(TimeWindow timeWindow, Iterable<Person> iterable, Collector<List<Person>> out) throws Exception {
                        List<Person> persons = Lists.newArrayList(iterable);

                        if(persons.size() > 0) {
                            System.out.println("5秒的總共收到的條數:" + persons.size());
                            out.collect(persons);
                        }

                    }
                })
                //sink 到數據庫
                .addSink(new MySqlSink());
                //打印到控制台
                //.print();


        env.execute("kafka 消費任務開始");
    }

}

  一切准備就緒,然后運行KafkaWriter的main方法往kafka的person主題里邊寫入數據。看到日志說明已經寫入成功了。

   運行DataSourceFromKafka的main方法從kafka讀取數據,然后寫入數據庫,提示如下:

 

   然后查詢數據庫,數據庫里邊寫入數據庫了, 說明成功了。

 

   完工啦, 如果有什么地方不對的地方歡迎指出。

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM