Flink在流處理上常見的Source和sink操作


flink在流處理上的source和在批處理上的source基本一致。大致有4大類

1.基於本地集合的source(Collection-based-source)

2.基於文件的source(File-based-source)

3.基於網絡套接字的source(Socket-based-source)

4.自定義的source(Custom-source)

基於集合的source

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

import scala.collection.immutable.{Queue, Stack}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object DataSource001 {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    //0.用element創建DataStream(fromElements)
    val ds0: DataStream[String] = senv.fromElements("spark", "flink")
    ds0.print()

    //1.用Tuple創建DataStream(fromElements)
    val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))
    ds1.print()

    //2.用Array創建DataStream
    val ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))
    ds2.print()

    //3.用ArrayBuffer創建DataStream
    val ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink"))
    ds3.print()

    //4.用List創建DataStream
    val ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))
    ds4.print()

    //5.用List創建DataStream
    val ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink"))
    ds5.print()

    //6.用Vector創建DataStream
    val ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))
    ds6.print()

    //7.用Queue創建DataStream
    val ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))
    ds7.print()

    //8.用Stack創建DataStream
    val ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))
    ds8.print()

    //9.用Stream創建DataStream(Stream相當於lazy List,避免在中間過程中生成不必要的集合)
    val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))
    ds9.print()

    //10.用Seq創建DataStream
    val ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))
    ds10.print()

    //11.用Set創建DataStream(不支持)
    //val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))
    //ds11.print()

    //12.用Iterable創建DataStream(不支持)
    //val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))
    //ds12.print()

    //13.用ArraySeq創建DataStream
    val ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink"))
    ds13.print()

    //14.用ArrayStack創建DataStream
    val ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink"))
    ds14.print()

    //15.用Map創建DataStream(不支持)
    //val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    //ds15.print()

    //16.用Range創建DataStream
    val ds16: DataStream[Int] = senv.fromCollection(Range(1, 9))
    ds16.print()

    //17.用fromElements創建DataStream
    val ds17: DataStream[Long] = senv.generateSequence(1, 9)
    ds17.print()
    
    senv.execute(this.getClass.getName)
  }
}
View Code

基於文件的source(File-based-source)

//TODO 2.基於文件的source(File-based-source)
//0.創建運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//TODO 1.讀取本地文件
val text1 = env.readTextFile("data2.csv")
text1.print()
//TODO 2.讀取hdfs文件
val text2 = env.readTextFile("hdfs://hadoop01:9000/input/flink/README.txt")
text2.print()
env.execute()
View Code

基於網絡套接字的source(Socket-based-source)

val source = env.socketTextStream("IP", PORT)
View Code

自定義的source(Custom-source,以kafka為例)

Kafka基本命令:

 ● 查看當前服務器中的所有topic
bin/kafka-topics.sh --list --zookeeper  hadoop01:2181
  ● 創建topic
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test
  ● 刪除topic
sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。
  ● 通過shell命令發送消息
sh bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test
  ● 通過shell消費消息
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1
  ● 查看消費位置
bin/kafka-run-cla.ss.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
  ● 查看某個Topic的詳情
bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
  ● 對分區數進行修改
kafka-topics.sh --zookeeper  zk01 --alter --partitions 15 --topic   utopic

使用flink消費kafka的消息(不規范,其實需要自己手動維護offset):

import java.util.Properties

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
/**
  * Created by angel;
  */
object DataSource_kafka {
  def main(args: Array[String]): Unit = {
    //1指定kafka數據流的相關信息
    val zkCluster = "hadoop01,hadoop02,hadoop03:2181"
    val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
    val kafkaTopicName = "test"
    //2.創建流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //3.創建kafka數據流
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaCluster)
    properties.setProperty("zookeeper.connect", zkCluster)
    properties.setProperty("group.id", kafkaTopicName)

    val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName,
      new SimpleStringSchema(), properties)
    //4.添加數據源addSource(kafka09)
    val text = env.addSource(kafka09).setParallelism(4)

    /**
      * test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie
      * */
    val values: DataStream[ProcessedData] = text.map{
      line =>
        var encrypted = line
        val values = encrypted.split("#CS#")
        val valuesLength = values.length
        var regionalRequest =  if(valuesLength > 1) values(1) else ""
        val requestMethod = if (valuesLength > 2) values(2) else ""
        val contentType = if (valuesLength > 3) values(3) else ""
        //Post提交的數據體
        val requestBody = if (valuesLength > 4) values(4) else ""
        //http_referrer
        val httpReferrer = if (valuesLength > 5) values(5) else ""
        //客戶端IP
        val remoteAddr = if (valuesLength > 6) values(6) else ""
        //客戶端UA
        val httpUserAgent = if (valuesLength > 7) values(7) else ""
        //服務器時間的ISO8610格式
        val timeIso8601 = if (valuesLength > 8) values(8) else ""
        //服務器地址
        val serverAddr = if (valuesLength > 9) values(9) else ""
        //獲取原始信息中的cookie字符串
        val cookiesStr = if (valuesLength > 10) values(10) else ""
        ProcessedData(regionalRequest,
          requestMethod,
          contentType,
          requestBody,
          httpReferrer,
          remoteAddr,
          httpUserAgent,
          timeIso8601,
          serverAddr,
          cookiesStr)
    }
    values.print()
    val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)
    remoteAddr.print()

    //5.觸發運算
    env.execute("flink-kafka-wordcunt")
  }
}

//保存結構化數據
case class ProcessedData(regionalRequest: String,
                         requestMethod: String,
                         contentType: String,
                         requestBody: String,
                         httpReferrer: String,
                         remoteAddr: String,
                         httpUserAgent: String,
                         timeIso8601: String,
                         serverAddr: String,
                         cookiesStr: String
                         )

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM