Flink消費Kafka數據並把實時計算的結果導入到Redis


1. 完成的場景

在很多大數據場景下,要求數據形成數據流的形式進行計算和存儲。上篇博客介紹了Flink消費Kafka數據實現Wordcount計算,這篇博客需要完成的是將實時計算的結果寫到redis。當kafka從其他端獲取數據立刻到Flink計算,Flink計算完后結果寫到Redis,整個過程就像流水一樣形成了數據流的處理

2. 代碼

添加第三方依賴

	<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.10</artifactId>
            <version>1.1.5</version>
        </dependency>

    </dependencies>

注意這里的版本最好統一選1.4.0,flink-redis的版本最好選1.1.5,用低版本或其他版本會遇到包沖突或者不同包的同一類不同等邏輯或者第版本有些類沒有等java通用的一些問題

邏輯代碼

package com.scn;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class FilnkCostKafka {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
        properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties);

        DataStream<String> stream = env.addSource(myConsumer);
        DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

        //實例化Flink和Redis關聯類FlinkJedisPoolConfig,設置Redis端口
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //實例化RedisSink,並通過flink的addSink的方式將flink計算的結果插入到redis
        counts.addSink(new RedisSink<Tuple2<String, Integer>>(conf,new RedisExampleMapper()));
        env.execute("WordCount from Kafka data");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    //指定Redis key並將flink數據類型映射到Redis數據類型
    public static final class RedisExampleMapper implements RedisMapper<Tuple2<String,Integer>>{
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "flink");
        }

        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }

        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

編寫一個測試類

package com.scn;

import redis.clients.jedis.Jedis;

public class RedisTest {
    public static void main(String args[]){
        Jedis jedis=new Jedis("127.0.0.1");
        System.out.println("Server is running: " + jedis.ping());
        System.out.println("result:"+jedis.hgetAll("flink"));
    }
}

3. 測試

啟動Redis服務

redis-server

執行FilnkCostKafka main方法

沒有跑出異常信息證明啟動沒有問題

在kafka producer端輸出一些數據

執行測試類RedisTest的main方法

會輸出:

Server is running: PONG
result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2}

可以看到數據已經流到Redis


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM