項目pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jike.flink</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<flink.version>1.10.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink 11中需要手動添加
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.2</version>
</dependency>
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<scope>system</scope>
<systemPath>${basedir}/lib/flink-connector-redis_2.11-1.1.5.jar</systemPath>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
實現flink寫入redis
實現wordcount功能,並將結果實時寫入redis,這里使用了第三方依賴flink-connector-redis_2.11,該依賴提供了RedisSink可以直接使用,具體代碼如下:
代碼
首先定義數據源處理實現類LineSplitter,該類將一行數據分詞,輸出<單詞,1>元祖
package com.jike.flink.examples.redis;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\W+");
for(String token : tokens){
if(token.length() > 0){
collector.collect(new Tuple2<String,Integer>(token,1));
}
}
}
}
然后定義數據寫入Redis的配置類,這里面將統計后的所有信息詞頻寫入一個哈希表,哈希表的key為"flink",作為測試使用,哈希表中每個元素key為單詞,value為詞頻
package com.jike.flink.examples.redis;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkRedisMapper implements RedisMapper<Tuple2<String,Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
//hset
return new RedisCommandDescription(RedisCommand.HSET,"flink");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
return stringIntegerTuple2.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
return stringIntegerTuple2.f1.toString();
}
}
最后編寫主程序類,該類中使用了socketTextStream數據源,通過前面定義LineSplitter完成解析,然后根據單詞進行分組統計,最后寫入redis
package com.jike.flink.examples.redis;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
public class Sink2Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("實際IP",12345);
DataStream<Tuple2<String,Integer>> counts = dataStreamSource.flatMap(new LineSplitter()).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
}).sum(1);
//控制台打印
counts.print().setParallelism(1);
//定義redis服務器信息
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("redis服務器ip").setPort(redis服務端口).setPassword("redis服務密碼").build();
counts.addSink(new RedisSink<>(conf,new SinkRedisMapper()));
executionEnvironment.execute();
}
}
運行效果
通過nc -l 12345,命令模擬數據源,並輸入一些數據
IDEA中查看打印記錄
查看redis
可以發現數據已寫入redis
總結
flink-connector-redis_2.11中提供了RedisSink類,該類實現了RichSinkFunction,可以直接使用,如果有特殊需求,可以自定義Sink類,繼承RichSinkFunction,實現特殊處理。flink-connector-redis_2.11的源碼比較簡潔,下一篇打算分析學習下。