Flink四種Sink



Sink有下沉的意思,在Flink中所謂的Sink其實可以表示為將數據存儲起來的意思,也可以將范圍擴大,表示將處理完的數據發送到指定的存儲系統的輸出操作.
之前我們一直在使用的print方法其實就是一種Sink

public DataStreamSink<T> print(String sinkIdentifier) {
   PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
   return addSink(printFunction).name("Print to Std. Out");
}

Flink內置了一些Sink, 除此之外的Sink需要用戶自定義!

本次測試使用的Flink版本為1.12

KafkaSink

1)添加kafka依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

2)啟動Kafka集群
kafka群起腳本鏈接:
https://www.cnblogs.com/traveller-hzq/p/14487977.html
3)Sink到Kafka的實例代碼

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * TODO
 *
 * @author hzq
 * @version 1.0
 * @date 2021/3/5 11:08
 */
public class Flink01_KafkaSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("localhost", 9999);

        // TODO Sink - kafka
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");

        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
                "flink0923",
                new SimpleStringSchema(),
                properties
        );

        inputDS.addSink(kafkaSink);

        env.execute();
    }
}

4.使用 nc -lk 9999命令輸入數據
5.在linux啟動一個消費者, 查看是否收到數據
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_sensor

RedisSink

1)添加Redis連接依賴

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>

2)啟動Redis服務器

./redis-server /etc/redis/6379.conf

3)Sink到Redis的示例代碼

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.util.Properties;

/**
 * TODO
 *
 * @author hzq
 * @version 1.0
 * @date 2021/3/5 11:08
 */
public class Flink02_RedisSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("localhost", 9999);

        // TODO Sink - Redis

        FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .setPort(6379)
                .build();

        RedisSink<String> redisSink = new RedisSink<>(
                flinkJedisPoolConfig,
                new RedisMapper<String>() {
                    @Override
                    public RedisCommandDescription getCommandDescription() {
                        // 第一個參數:redis命令的封裝
                        // 第二個參數:redis 最外層的 key
                        return new RedisCommandDescription(RedisCommand.HSET, "flink0923");
                    }

                    /*
                        從數據里提取key,如果是 Hash結構,那么key就是hash的key
                     */
                    @Override
                    public String getKeyFromData(String data) {
                        return data.split(",")[1];
                    }

                    // 從數據里提取value,如果是 hash結構,那么 value就是hash的value
                    @Override
                    public String getValueFromData(String data) {
                        return data.split(",")[2];
                    }
                }
        );


        inputDS.addSink(redisSink);

        env.execute();
    }
}

Redis查看是否收到數據

redis-cli --raw

注意:
發送了5條數據, redis中只有2條數據. 原因是hash的field的重復了, 后面的會把前面的覆蓋掉

ElasticsearchSink

1)添加ES依賴

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.12.0</version>
</dependency>

2)啟動ES集群
3)Sink到ES實例代碼

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * TODO
 *
 * @author hzq
 * @version 1.0
 * @date 2021/3/5 11:08
 */
public class Flink03_EsSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);

        // TODO Sink - ElasticSearch
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200));
        httpHosts.add(new HttpHost("hadoop103", 9200));
        httpHosts.add(new HttpHost("hadoop104", 9200));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        Map<String, String> dataMap = new HashMap<>();
                        dataMap.put("data", element);
                        // ESAPI的寫法
                        IndexRequest indexRequest = Requests.indexRequest("flink0923").type("dasfgdasf").source(dataMap);
                        indexer.add(indexRequest);
                    }
                }
        );

        // TODO 為了演示,bulk設為1,生產環境不要這么設置
        esSinkBuilder.setBulkFlushMaxActions(1);


        inputDS.addSink(esSinkBuilder.build());


        env.execute();
    }
}
/*
    ES 5.x : index -》 庫, type -》 表
    ES 6.x : 每個 index 只能有 一個 type,所以可以認為 index是一個 表
    ES 7.x : 移除了 Type


    url查看index:

        查看 index列表:http://hadoop102:9200/_cat/indices?v
        查看 index內容:http://hadoop102:9200/flink0923/_search
 */

Elasticsearch查看是否收到數據

注意
 如果出現如下錯誤:

添加log4j2的依賴:

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-to-slf4j</artifactId>
    <version>2.14.0</version>
</dependency>

 如果是無界流, 需要配置bulk的緩存

esSinkBuilder.setBulkFlushMaxActions(1);

自定義Sink

如果Flink沒有提供給我們可以直接使用的連接器,那我們如果想將數據存儲到我們自己的存儲設備中,怎么辦?
我們自定義一個到Mysql的Sink
1)在mysql中創建數據庫和表

create database test;
use test;
CREATE TABLE `sensor` (
  `id` varchar(20) NOT NULL,
  `ts` bigint(20) NOT NULL,
  `vc` int(11) NOT NULL,
  PRIMARY KEY (`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2)導入Mysql驅動

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>

3)寫入到Mysql的自定義Sink實例代碼

import com.atguigu.chapter05.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * TODO
 *
 * @author hzq
 * @version 1.0
 * @date 2021/3/5 11:08
 */
public class Flink04_MySink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                // 切分
                String[] line = value.split(",");
                return new WaterSensor(line[0], Long.valueOf(line[1]), Integer.valueOf(line[2]));

            }
        });
        // TODO Sink - 自定義:MySQL
        sensorDS.addSink(new MySinkFunction());


        env.execute();
    }

    public static class MySinkFunction extends RichSinkFunction<WaterSensor> {

        Connection conn;
        PreparedStatement pstmt;

        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "000000");
            pstmt = conn.prepareStatement("insert into sensor values (?,?,?)");
        }

        @Override
        public void close() throws Exception {
            if (pstmt != null) {
                pstmt.close();
            }
            if (conn != null){
                conn.close();
            }
        }

        @Override
        public void invoke(WaterSensor value, Context context) throws Exception {
            pstmt.setString(1, value.getId());
            pstmt.setLong(2, value.getTs());
            pstmt.setInt(3, value.getVc());
            pstmt.execute();
        }
    }
}
/*

 */

使用nc命令輸入命令進行測試


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM