16-Flink-Redis-Sink


戳更多文章:

1-Flink入門

2-本地環境搭建&構建第一個Flink應用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式緩存

7-重啟策略

8-Flink中的窗口

9-Flink中的Time

Flink時間戳和水印

Broadcast廣播變量

FlinkTable&SQL

Flink實戰項目實時熱銷排行

Flink寫入RedisSink

17-Flink消費Kafka寫入Mysql

簡介

流式計算中,我們經常有一些場景是消費Kafka數據,進行處理,然后存儲到其他的數據庫或者緩存或者重新發送回其他的消息隊列中。
本文講述一個簡單的Redis作為Sink的案例。
后續,我們會補充完善,比如落入Hbase,Kafka,Mysql等。

關於Redis Sink

Flink提供了封裝好的寫入Redis的包給我們用,首先我們要新增一個依賴:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.10</artifactId> <version>1.1.5</version> </dependency> 

然后我們實現一個自己的RedisSinkExample:

//指定Redis set public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> { public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET, null); } public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } } 

我們用最簡單的單機Redis的SET命令進行演示。

完整的代碼如下,實現一個讀取Kafka的消息,然后進行WordCount,並把結果更新到redis中:


public class RedisSinkTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(2000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //連接kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); DataStream<String> stream = env.addSource(consumer); DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); //實例化FlinkJedisPoolConfig 配置redis FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setHost("6379").build(); //實例化RedisSink,並通過flink的addSink的方式將flink計算的結果插入到redis counts.addSink(new RedisSink<>(conf,new RedisSinkExample())); env.execute("WordCount From Kafka To Redis"); }// public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } //指定Redis set public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> { public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET, null); } public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } } }// 

所有代碼,我放在了我的公眾號,回復Flink可以下載

  • 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
  • 更多大數據技術歡迎和作者一起探討~
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM