一些概念
一個partition 對應一個task,一個task 必定存在於一個Executor,一個Executor 對應一個JVM.
- Partition 是一個可迭代數據集合
- Task 本質是作用於Partition的線程
問題
Task 里如何使用Kafka Producer 將數據發送到Kafka呢。 其他譬如HBase/Redis/MySQL 也是如此。
解決方案
直觀的解決方案自然是能夠在Executor(JVM)里有個Prodcuer Pool(或者共享單個Producer實例),但是我們的代碼都是
先在Driver端執行,然后將一些函數序列化到Executor端執行,這里就有序列化問題,正常如Pool,Connection都是無法序列化的。
一個簡單的解決辦法是定義個Object 類,
譬如
object SimpleHBaseClient { private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181" private lazy val (table, conn) = createConnection def bulk(items:Iterator) = { items.foreach(conn.put(_)) conn.flush.... } ...... }
然后保證這個類在map,foreachRDD等函數下使用,譬如:
dstream.foreachRDD{ rdd => rdd.foreachPartition{iter=> SimpleHBaseClient.bulk(iter) } }
為什么要保證放到foreachRDD/map 等這些函數里呢?
Spark的機制是先將用戶的程序作為一個單機運行(運行者是Driver),Driver通過序列化機制,將對應算子規定的函數發送到Executor進行執行。這里,foreachRDD/map 等函數都是會發送到Executor執行的,Driver端並不會執行。里面引用的object 類 會作為一個stub 被序列化過去,object內部屬性的初始化其實是在Executor端完成的,所以可以避過序列化的問題。
Pool也是類似的做法。然而我們並不建議使用pool,因為Spark 本身已經是分布式的,舉個例子可能有100個executor,如果每個executor再搞10個connection
的pool,則會有100*10 個鏈接,Kafka也受不了。一個Executor 維持一個connection就好。
關於Executor掛掉丟數據的問題,其實就看你什么時候flush,這是一個性能的權衡。