package utils
import java.net.{InetAddress, InetSocketAddress}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util
import java.util.Properties
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
/**
* Author: risen
* Date: 2019/7/25 9:45
* Desc: flink 流式處理的sink工具類(es,db,redis,kafka)
*/
object FlinkStreamSinkUtils {
//flink 流式處理到es中,注意其中端口號為9300
def streamSinkEs(clusterName:String,hosts:String,esPort:String,esIndex:String): ElasticsearchSink[util.HashMap[String,String]] ={
val esConfig = new util.HashMap[String,String]
esConfig.put("cluster.name", clusterName)
esConfig.put("bulk.flush.max.actions", "1")
//1、用來表示是否開啟重試機制
esConfig.put("bulk.flush.backoff.enable", "true")
//2、重試策略,又可以分為以下兩種類型
//a、指數型,表示多次重試之間的時間間隔按照指數方式進行增長。eg:2 -> 4 -> 8 ...
//esConfig.put("bulk.flush.backoff.type", "EXPONENTIAL");
//b、常數型,表示多次重試之間的時間間隔為固定常數。eg:2 -> 2 -> 2 ...
esConfig.put("bulk.flush.backoff.type", "CONSTANT")
//3、進行重試的時間間隔。對於指數型則表示起始的基數
esConfig.put("bulk.flush.backoff.delay", "2")
//4、失敗重試的次數
esConfig.put("bulk.flush.backoff.retries", "3")
//如果是集群可以用逗號分隔來實現連接
val transportAddresses = new util.ArrayList[InetSocketAddress]
for(host <- hosts.split(",")){
transportAddresses.add(new InetSocketAddress(InetAddress.getByName(host), esPort.toInt))
}
new ElasticsearchSink(esConfig, transportAddresses, new ElasticsearchSinkFunction[util.HashMap[String,String]] {
//實現es邏輯操作增刪改查
def createIndexRequest(dataMap: util.HashMap[String,String]): IndexRequest = {
//接受到的信息作為一個字符串處理(這里的不一定用string傳入,傳入類型取決於DataStream的類型)
val esType = dataMap.get("es_type")
val esId = dataMap.get("es_id")
dataMap.remove("es_type")
dataMap.remove("es_id")
Requests.indexRequest().index(esIndex).`type`(esType).source(dataMap).id(esId)
}
override def process(t: util.HashMap[String,String], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(t))
}
})
}
//flink 流式處理到redis中
def streamSinkRedis(redisHost:String,redisPort:String): RedisSink[(String,String)] ={
val jedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost(redisHost).setPort(redisPort.toInt).build()
new RedisSink[(String, String)](jedisPoolConfig,new MyRedisMapper())
}
//flink 流式處理到kafka中
def streamSinkKafka(topicName:String,brokerList:String,groupId:String): FlinkKafkaProducer010[String] ={
val properties = new Properties()
properties.setProperty("bootstrap.servers",brokerList)
properties.setProperty("group.id",groupId)
new FlinkKafkaProducer010[String](topicName,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),properties)
}
//flink 流式處理到數據庫中
def streamSinkDB(classDriverName:String,jdbcUrl:String,username:String,password:String,sql:String):MyDbMapper ={
new MyDbMapper(classDriverName,jdbcUrl,username,password,sql)
}
}
//實現redisMapper
class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.LPUSH)
}
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
//實現RichSinkFunction
class MyDbMapper(classDriverName:String,jdbcUrl:String,username:String,password:String,sql:String) extends RichSinkFunction[util.HashMap[Integer,Object]]{
var conn: Connection = _
var pres:PreparedStatement = _
override def open(parameters: Configuration): Unit = {
Class.forName(classDriverName)
conn = DriverManager.getConnection(jdbcUrl, username, password)
pres = conn.prepareStatement(sql)
super.close()
}
override def invoke(dataMap: util.HashMap[Integer,Object], context: SinkFunction.Context[_]): Unit = {
import scala.collection.JavaConversions._
for (index <- dataMap.keySet()){
val value = dataMap.get(index)
pres.setObject(index,value)
}
println(dataMap.values())
pres.executeUpdate()
}
override def close(): Unit = {
conn.close()
pres.close()
}
}