Flink自定義sink到db,es,redis,kafka


package utils

import java.net.{InetAddress, InetSocketAddress}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util
import java.util.Properties

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

/**
  * Author: risen
  * Date: 2019/7/25 9:45
  * Desc: flink 流式處理的sink工具類(es,db,redis,kafka)
  */
object FlinkStreamSinkUtils {

  //flink 流式處理到es中,注意其中端口號為9300
  def streamSinkEs(clusterName:String,hosts:String,esPort:String,esIndex:String): ElasticsearchSink[util.HashMap[String,String]] ={
    val esConfig = new util.HashMap[String,String]
    esConfig.put("cluster.name", clusterName)
    esConfig.put("bulk.flush.max.actions", "1")
    //1、用來表示是否開啟重試機制
    esConfig.put("bulk.flush.backoff.enable", "true")
    //2、重試策略,又可以分為以下兩種類型
    //a、指數型,表示多次重試之間的時間間隔按照指數方式進行增長。eg:2 -> 4 -> 8 ...
    //esConfig.put("bulk.flush.backoff.type", "EXPONENTIAL");
    //b、常數型,表示多次重試之間的時間間隔為固定常數。eg:2 -> 2 -> 2 ...
    esConfig.put("bulk.flush.backoff.type", "CONSTANT")
    //3、進行重試的時間間隔。對於指數型則表示起始的基數
    esConfig.put("bulk.flush.backoff.delay", "2")
    //4、失敗重試的次數
    esConfig.put("bulk.flush.backoff.retries", "3")
    //如果是集群可以用逗號分隔來實現連接
    val transportAddresses = new util.ArrayList[InetSocketAddress]
    for(host <- hosts.split(",")){
      transportAddresses.add(new InetSocketAddress(InetAddress.getByName(host), esPort.toInt))
    }
    new ElasticsearchSink(esConfig, transportAddresses, new ElasticsearchSinkFunction[util.HashMap[String,String]] {

      //實現es邏輯操作增刪改查
      def createIndexRequest(dataMap: util.HashMap[String,String]): IndexRequest = {
        //接受到的信息作為一個字符串處理(這里的不一定用string傳入,傳入類型取決於DataStream的類型)
        val esType = dataMap.get("es_type")
        val esId = dataMap.get("es_id")
        dataMap.remove("es_type")
        dataMap.remove("es_id")
        Requests.indexRequest().index(esIndex).`type`(esType).source(dataMap).id(esId)
      }
      override def process(t: util.HashMap[String,String], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        requestIndexer.add(createIndexRequest(t))
      }
    })
  }

  //flink 流式處理到redis中
  def streamSinkRedis(redisHost:String,redisPort:String): RedisSink[(String,String)] ={
    val jedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost(redisHost).setPort(redisPort.toInt).build()
    new RedisSink[(String, String)](jedisPoolConfig,new MyRedisMapper())
  }

  //flink 流式處理到kafka中
  def streamSinkKafka(topicName:String,brokerList:String,groupId:String): FlinkKafkaProducer010[String] ={
    val properties = new Properties()
    properties.setProperty("bootstrap.servers",brokerList)
    properties.setProperty("group.id",groupId)
    new FlinkKafkaProducer010[String](topicName,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),properties)
  }

  //flink 流式處理到數據庫中
  def streamSinkDB(classDriverName:String,jdbcUrl:String,username:String,password:String,sql:String):MyDbMapper ={
    new MyDbMapper(classDriverName,jdbcUrl,username,password,sql)
  }
}

//實現redisMapper
class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.LPUSH)
  }
  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}

//實現RichSinkFunction
class MyDbMapper(classDriverName:String,jdbcUrl:String,username:String,password:String,sql:String) extends RichSinkFunction[util.HashMap[Integer,Object]]{
  var conn: Connection = _
  var pres:PreparedStatement = _
  override def open(parameters: Configuration): Unit = {
    Class.forName(classDriverName)
    conn = DriverManager.getConnection(jdbcUrl, username, password)
    pres = conn.prepareStatement(sql)
    super.close()
  }

  override def invoke(dataMap: util.HashMap[Integer,Object], context: SinkFunction.Context[_]): Unit = {
    import scala.collection.JavaConversions._
    for (index <- dataMap.keySet()){
      val value = dataMap.get(index)
      pres.setObject(index,value)
    }
    println(dataMap.values())
    pres.executeUpdate()
  }
  override def close(): Unit = {
    conn.close()
    pres.close()
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM