Fink| source| transform| sink


1. Flink 批處理Api

 1.1 Source

Flink+kafka是如何實現exactly-once語義的

  Flink通過checkpoint來保存數據是否處理完成的狀態;

  有JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。

執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。

如果宕機需要通過StateBackend進行恢復,只能恢復所有確認提交的操作。

                                                            

 Spark中要想實現有狀態的,需要使用updateBykey或者借助redis;

而Fink是把它記錄在State Bachend,只要是經過keyBy等處理之后結果會記錄在State Bachend(已處理未提交; 如果是處理完了就是已提交狀態;),

它還會記錄另外一種狀態值:keyState,比如keyBy累積的結果;

StateBachend如果不想存儲在內存中,也可以存儲在fs文件中或者HDFS中; IDEA的工具只支持memory內存式存儲,一旦重啟就沒了;部署到linux中就支持存儲在文件中了;

Kakfa的自動提交:“enable.auto.commit”,比如從kafka出來后到sparkStreaming之后,一進來consumer會幫你自動提交,如果在處理過程中,到最后有一個沒有寫出去(比如寫到redis、ES),雖然處理失敗了但kafka的偏移量已經發生改變;所以移偏移量的時機很重要;

1.2 Transform  轉換算子

map

    

 

 

 

val streamMap = stream.map { x => x * 2 }

 

object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

    //dstream.print().setParallelism(1) 測試從kafka中獲得數據是否打通到了flink中

    //將json轉換成json對象
    val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString =>
      JSON.parseObject(jsonString, classOf[StartupLog])
    }

    //需求一 相同渠道的值進行累加
    val sumDStream: DataStream[(String, Int)] = startupLogDStream.map { startuplog => (startuplog.ch, 1) }.keyBy(0)
    .reduce { (startuplogCount1, startuplogCount2) => val newCount: Int = startuplogCount1._2 + startuplogCount2._2 (startuplogCount1._1, newCount) } //val sumDStream: DataStream[(String, Int)] = startupLogDStream.map{startuplog => (startuplog.ch,1)}.keyBy(0).sum(1) //sumDStream.print() env.execute() } }

  flatMap  

flatMap的函數簽名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
結果是List(1,1,2,2,3,3), 
而List("a b", "c d").flatMap(line ⇒ line.split(" "))
結果是List(a, b, c, d)

val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}

 

  Filter

  

val streamFilter = stream.filter{
    x => x == 1
}

 KeyBy

                                                      

DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的。

以HashCode來進行分區,可能有些key值不相同也會分到相同區。

 滾動聚合算子(Rolling Aggregation)

這些算子可以針對KeyedStream的每一個支流做聚合,必須KeyBy分組之后再sum聚合等sum() min() max() minBy() maxBy()

 Reduce

KeyedStream → DataStream:一個分組數據流的聚合操作,合並當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。

val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
  .map( data => {
    val dataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  })
  .keyBy("id")
  .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )

 

Split 和 Select

  

DataStream 通過split得到→ SplitStream --->通過select得到DataStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream。

SplitStream→DataStream從一個SplitStream中獲取一個或者多個DataStream。

    //需求二   把 appstore   和其他的渠道的數據 分成兩個流
    val splitableStream: SplitStream[StartupLog] = startupLogDStream.split { startuplog =>
      var flagList: List[String] = List()
      if (startuplog.ch.equals("appstore")) {
        flagList = List("apple")
      } else {
        flagList = List("other")
      }
      flagList
    }
    val appleStream: DataStream[StartupLog] = splitableStream.select("apple")
    //appleStream.print("this is apple").setParallelism(1)
    val otherdStream: DataStream[StartupLog] = splitableStream.select("other")
    //otherdStream.print("this is other").setParallelism(1)

 Connect和 CoMap

            

connecte的兩條流數據類型可以不同,但一次操作只能合並2條流;

DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

ConnectedStreams → DataStream:作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。

 //需求三 把上面兩個流合並為一個
    val connStream: ConnectedStreams[StartupLog, StartupLog] = appleStream.connect(otherdStream)
    val allDataStream: DataStream[String] = connStream.map((startuplog1: StartupLog) => startuplog1.ch, (startuplog2: StartupLog) => startuplog2.ch)
    allDataStream.print("all").setParallelism(1)

 CoMap,CoFlatMap

    

 

 ConnectedStreams → DataStream:作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。

val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) )
val connected = warning.connect(low)

val coMap = connected.map(
    warningData => (warningData._1, warningData._2, "warning"),
    lowData => (lowData.id, "healthy")
)

 

Union

                                             

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。

可以合並多條流,但是數據結構必須一樣;

    //合並流union
    val unionDStream: DataStream[StartupLog] = appleStream.union(otherdStream)
    unionDStream.print("union").setParallelism(1)

Connect與 Union 區別:

1 、 Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調整成為一樣的。

2    Connect只能操作兩個流,Union可以操作多個

 

 

 1.2 Sink

Flink沒有類似於spark中foreach方法,讓用戶進行迭代的操作。雖有對外的輸出操作都要利用Sink完成。最后通過類似如下方式完成整個任務最終輸出操作。

   myDstream.addSink(new MySink(xxxx)) 

官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink。  

                       

 

 Kafka

object MyKafkaUtil {
  val prop = new Properties()
  prop.setProperty("bootstrap.servers","hadoop101:9092")
  prop.setProperty("group.id","gmall")

  def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
    val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
    myKafkaConsumer
  }

  def getProducer(topic:String):FlinkKafkaProducer011[String]={
    new FlinkKafkaProducer011[String]("hadoop101:9092",topic,new SimpleStringSchema())
  }

}


//sink到kafka
    unionDStream.map(_.toString).addSink(MyKafkaUtil.getProducer("gmall_union"))
    ///opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic gmall_union

 

從kafka到kafka

啟動kafka

kafka生產者:[kris@hadoop101 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic sensor

kafka消費者:

[kris@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic sinkTest --from-beginning
SensorReading(sensor_1,1547718199,35.80018327300259)
SensorReading(sensor_6,1547718201,15.402984393403084)
SensorReading(sensor_7,1547718202,6.720945201171228)
SensorReading(sensor_10,1547718205,38.101067604893444)
SensorReading(sensor_1,1547718206,35.1)
SensorReading(sensor_1,1547718207,35.6)

 

 

 

Redis

import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object MyRedisUtil {
  private val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).build()

  def getRedisSink(): RedisSink[(String, String)] = {
    new RedisSink[(String, String)](config, new MyRedisMapper)
  }

}
class MyRedisMapper extends RedisMapper[(String, String)]{
  //用何種命令進行保存
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "channel_sum") //hset類型, apple, 111
  }
  //流中的元素哪部分是value
  override def getKeyFromData(channel_sum: (String, String)): String = channel_sum._2
  //流中的元素哪部分是key
  override def getValueFromData(channel_sum: (String, String)): String = channel_sum._1

}


object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

    //dstream.print().setParallelism(1) 測試從kafka中獲得數據是否打通到了flink中

    //將json轉換成json對象
    val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString =>
      JSON.parseObject(jsonString, classOf[StartupLog])
    }

//sink到redis
    //把按渠道的統計值保存到redis中  hash   key: channel_sum  field ch  value: count
    //按照不同渠道進行累加
    val chCountDStream: DataStream[(String, Int)] = startupLogDStream.map(startuplog => (startuplog.ch, 1)).keyBy(0).sum(1)
    //把上述結果String, Int轉換成String, String類型
    val channelDStream: DataStream[(String, String)] = chCountDStream.map(chCount => (chCount._1, chCount._2.toString))
    channelDStream.addSink(MyRedisUtil.getRedisSink())

ES

object MyEsUtil {
  val hostList: util.List[HttpHost] = new util.ArrayList[HttpHost]()
  hostList.add(new HttpHost("hadoop101", 9200, "http"))
  hostList.add(new HttpHost("hadoop102", 9200, "http"))
  hostList.add(new HttpHost("hadoop103", 9200, "http"))

  def getEsSink(indexName: String): ElasticsearchSink[String] = {
    //new接口---> 要實現一個方法
    val esSinkFunc: ElasticsearchSinkFunction[String] = new ElasticsearchSinkFunction[String] {
      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        val jSONObject: JSONObject = JSON.parseObject(element)
        val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jSONObject)
        indexer.add(indexRequest)
      }
    }
    val esSinkBuilder = new ElasticsearchSink.Builder[String](hostList, esSinkFunc)
    esSinkBuilder.setBulkFlushMaxActions(10)
    val esSink: ElasticsearchSink[String] = esSinkBuilder.build()
    esSink
  }
}



object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
//sink之三  保存到ES
    val esSink: ElasticsearchSink[String] = MyEsUtil.getEsSink("gmall_startup")
    dstream.addSink(esSink) //dstream來自kafka的數據源
GET gmall_startup/_search

Mysql

class MyjdbcSink(sql: String) extends RichSinkFunction[Array[Any]] {
  val driver = "com.mysql.jdbc.Driver"
  val url = "jdbc:mysql://hadoop101:3306/gmall?useSSL=false"
  val username = "root"
  val password = "123456"
  val maxActive = "20"
  var connection: Connection = null

  // 創建連接
  override def open(parameters: Configuration) {
    val properties = new Properties()
    properties.put("driverClassName",driver)
    properties.put("url",url)
    properties.put("username",username)
    properties.put("password",password)
    properties.put("maxActive",maxActive)

    val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)
    connection = dataSource.getConnection()
  }

  // 把每個Array[Any] 作為數據庫表的一行記錄進行保存

  override def invoke(values: Array[Any]): Unit = {
    val ps: PreparedStatement = connection.prepareStatement(sql)
    for (i <- 0 to values.length-1) {
      ps.setObject(i+1, values(i))
    }
    ps.executeUpdate()
  }


  override def close(): Unit = {
    if (connection != null){
      connection.close()
    }
  }
}


object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

    //dstream.print().setParallelism(1) 測試從kafka中獲得數據是否打通到了flink中

    //將json轉換成json對象
    val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString =>
      JSON.parseObject(jsonString, classOf[StartupLog])
    }

   //sink之四 保存到Mysql中
    startupLogDStream.map(startuplog => Array(startuplog.mid, startuplog.uid, startuplog.ch, startuplog.area,startuplog.ts))
      .addSink(new MyjdbcSink("insert into fink_startup values(?,?,?,?,?)"))

    env.execute()

  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM