Spark Streaming ---------------- 流計算,不間斷。 Spark Streaming模塊, 實現方式是批量計算,按照時間片對stream切割形成靜態數據。 //創建上下文時,指定時間片。 val ssc = new StreamingContext(conf, Seconds(1)) //已經限定了時間片 ssc.socketTextStream(); ... socket文本流運行在executor端,不在driver端。 SockeTextStream執行過程 ------------------------------- driver端創建StreamingContext對象,啟動上下文時,依次創建 JobScheduler和ReceiverTracker,並調用他們的start方法。 ReceiverTracker在start方法中發送啟動接收器消息給遠程Executor, 消息內部含有ServerSocket的地址信息,在executor一側,由Receiver TrackerEndpoint終端接受消息,抽取消息內容,利用sparkContext結合 消息內容創建ReceiverRDD對象,最后提交rdd給spark集群. 流計算的窗口化處理 ------------------------ 在批次的基礎上擴展應用, 窗口長度和滑動間隔(計算頻率)這個指標都需要是batch的整倍數。 reduceByKeyAndWindow((a:Int,b:Int)=> {a + b}, Seconds(5) , Seconds(3)) windows().reduceByKey(...); DStream的分區 ------------------------ DStream的分區是對內部每個RDD的分區。 dstream.repartition(num){ // this.transform(_.repartition(numPartitions)) } updateStateByKey() ----------------------- 計算自流應用啟動以來,每個單詞的數量。 更新可以結合windows出計算。 val ds3 = ds2.window(Seconds(5),Seconds(3)) def update(a:Seq[Int] , state:Option[ArrayBuffer[(Long, Int)]]) : Option[ArrayBuffer[(Long, Int)]] = { //println("a + " + a) val count = a.sum val time = System.currentTimeMillis() if(state.isEmpty){ val buf:ArrayBuffer[(Long,Int)] = ArrayBuffer[(Long, Int)]() buf.append((time ,count)) Some(buf) } else{ val buf2: ArrayBuffer[(Long, Int)] = ArrayBuffer[(Long, Int)]() var buf = state.get for(t <- buf){ if((time - t._1) <= 4000){ buf2.+=(t) } } buf2.append((time, count)) Some(buf2) } } val ds3 = ds2.window(Seconds(5),Seconds(3)) val ds4 = ds3.updateStateByKey(update _) SparkStreaming計算中分區的計算方式 ----------------------------------- DStream分區是RDD的分區,分區由conf.set("spark.streaming.blockInterval" ,"200ms"), 就是哪找指定時間片切割數據成小塊,對應一個分區。 DStream.foreachRDD ---------------------- 針對流中的每個RDD進行操作。 import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2018/3/8. */ object SparkStreamingForeachRDDScala { def createNewConnection() = { Class.forName("com.mysql.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://192.168.231.1:3306/big9","root","root") conn } def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("worldCount") conf.setMaster("local[4]") //時間片是2秒 val ssc = new StreamingContext(conf ,Seconds(2)) ssc.checkpoint("file:///d:/java/chk") //創建套接字文本流 val ds1 = ssc.socketTextStream("s101", 8888) val ds2 = ds1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) ds2.foreachRDD(rdd=>{ rdd.foreachPartition(it=>{ val conn = createNewConnection() // executed at the driver val ppst = conn.prepareStatement("insert into wc(word,cnt) values(?,?)") conn.setAutoCommit(false) for(e <- it){ ppst.setString(1 , e._1) ppst.setInt(2,e._2) ppst.executeUpdate() } conn.commit() conn.close() ppst.close() }) }) //啟動流 ssc.start() ssc.awaitTermination() } } Spark Stream + Spark SQL組合使用 -------------------------------- import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2018/3/8. */ object SparkStreamingWordCountSparkSQLScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("worldCount") conf.setMaster("local[2]") //時間片是2秒 val ssc = new StreamingContext(conf ,Seconds(2)) ssc.checkpoint("file:///d:/java/chk") //創建套接字文本流 val lines = ssc.socketTextStream("s101", 8888) //壓扁生成單詞流 val words = lines.flatMap(_.split(" ")) words.foreachRDD(rdd=>{ val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ val df1= rdd.toDF("word") df1.createOrReplaceTempView("_temp") spark.sql("select word,count(*) from _temp group by word").show() }) //啟動流 ssc.start() ssc.awaitTermination() } } Kafka -------------------- 消息系統. Scala 針對分區,n - 1 Spark Streaming集成kafka ------------------------- 1.注意 spark-streaming-kafka-0-10_2.11不兼容之前的版本, spark-streaming-kafka-0-8_2.11兼容0.9和0.10. 2.啟動kafka集群並創建主題. xkafka.sh start 3.驗證kafka是否ok 3.1)啟動消費者 kafka-console-consumer.sh --zookeeper s102:2181 --topic t1 3.2)啟動生產者 kafka-console-producer.sh --broker-list s102:9092 --topic t1 3.3)發送消息 ... 4.引入maven依賴 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> 5.編程 import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * Created by Administrator on 2018/3/8. */ object SparkStreamingKafkaScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("kafka") conf.setMaster("local[*]") val ssc = new StreamingContext(conf , Seconds(2)) //kafka參數 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "s102:9092,s103:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topic1") //val topics = Array("topicl") val stream = KafkaUtils.createDirectStream[String, String]( //val stream = KafkaUtils.createDirectStream[String,String] ssc, //ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParams) PreferConsistent, //位置策略 Subscribe[String, String](topics, kafkaParams) //消費者策略 ) val ds2 = stream.map(record => (record.key, record.value)) //val ds2 = stream.map(record=>(record.key,record.value)) ds2.print() //ds2.print() ssc.start() ssc.awaitTermination() ssc.start() ssc.awaitTermination() } } 6.在控制台生產者發送消息 SparkKafka直接流(createDirectStream)和kafka分區 -------------------------------- 每個kafka主題分區對應一個RDD分區。 spark可以通過spark.streaming.kafka.maxRatePerPartition 配置,對每個分區每秒接受的消息樹進行控制。 LocationStrategies ---------------- 位置策略, 控制特定的主題分區在哪個執行器上消費的。 在executor針對主題分區如何對消費者進行調度。 位置的選擇是相對的,位置策略有三種方案: 1.PreferBrokers 首選kafka服務器,只有在kafka服務器和executor位於同一主機,可以使用該中策略。 2.PreferConsistent 首選一致性. 多數時候采用該方式,在所有可用的執行器上均勻分配kakfa的主題的所有分區。 綜合利用集群的計算資源。 3.PreferFixed 首選固定模式。 如果負載不均衡,可以使用該中策略放置在特定節點使用指定的主題分區。手動控制方案。 沒有顯式指定的分區仍然采用(2)方案。 ConsumerStrategy ------------------- ConsumerStrategies -------------------- 消費者策略,是控制如何創建和配制消費者對象。 或者對kafka上的消息進行如何消費界定,比如t1主題的分區0和1, 或者消費特定分區上的特定消息段。 該類可擴展,自行實現。 1.ConsumerStrategies.Assign 指定固定的分區集合,指定了特別詳細的方范圍。 def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]) 2.ConsumerStrategies.Subscribe 允許消費訂閱固定的主題集合。 3.ConsumerStrategies.SubscribePattern 使用正則表達式指定感興趣的主題集合。 消費者策略和語義模型 ----------------------------- import java.net.Socket import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ArrayBuffer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * Created by Administrator on 2018/3/8. */ object SparkStreamingKafkaScala { def sendInfo(msg: String, objStr: String) = { //獲取ip val ip = java.net.InetAddress.getLocalHost.getHostAddress //得到pid val rr = java.lang.management.ManagementFactory.getRuntimeMXBean(); val pid = rr.getName().split("@")(0); //pid //線程 val tname = Thread.currentThread().getName //對象id val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + msg + "\t:" + objStr + "\r\n" out.write(m.getBytes) out.flush() out.close() } def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("kafka") // conf.setMaster("spark://s101:7077") conf.setMaster("local[8]") val ssc = new StreamingContext(conf, Seconds(5)) //kafka參數 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "s102:9092,s103:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val map = scala.collection.mutable.Map[TopicPartition,String]() map.put(new TopicPartition("t1" , 0) , "s102") map.put(new TopicPartition("t1" , 1) , "s102") map.put(new TopicPartition("t1" , 2) , "s102") map.put(new TopicPartition("t1" , 3) , "s102") val locStra = LocationStrategies.PreferFixed(map) ; val consit = LocationStrategies.PreferConsistent val topics = Array("t1") //主題分區集合 val tps = scala.collection.mutable.ArrayBuffer[TopicPartition]() tps.+=(new TopicPartition("t1" , 0)) // tps.+=(new TopicPartition("t2" , 1)) // tps.+=(new TopicPartition("t3" , 2)) //偏移量集合 val offsets = scala.collection.mutable.Map[TopicPartition,Long]() offsets.put(new TopicPartition("t1", 0), 3) // offsets.put(new TopicPartition("t2", 1), 3) // offsets.put(new TopicPartition("t3", 2), 0) val conss = ConsumerStrategies.Assign[String,String](tps , kafkaParams , offsets) //創建kakfa直向流 val stream = KafkaUtils.createDirectStream[String,String]( ssc, locStra, ConsumerStrategies.Assign[String, String](tps, kafkaParams, offsets) ) val ds2 = stream.map(record => { val t = Thread.currentThread().getName val key = record.key() val value = record.value() val offset = record.offset() val par = record.partition() val topic = record.topic() val tt = ("k:"+key , "v:" + value , "o:" + offset, "p:" + par,"t:" + topic ,"T : " + t) //xxxx(tt) ; //sendInfo(tt.toString() ,this.toString) tt }) ds2.print() ssc.start() ssc.awaitTermination() } } kafka消費語義 //tt.offset ------------------- 1.at most once 最多消費一次 commit(offset) //wrong xxx(tt) //ok 2.at least once 最少一次 xxx(tt) //ok commit(offset) //wrong 3.extact once 精准消費一次。 mysql Assign: --->