Spark布隆過濾器(bloomFilter)


     數據過濾在很多場景都會應用到,特別是在大數據環境下。在數據量很大的場景實現過濾或者全局去重,需要存儲的數據量和計算代價是非常龐大的。很多小伙伴第一念頭肯定會想到布隆過濾器,有一定的精度損失,但是存儲性能和計算性能可以達到幾何級別的提升。很多第三方框架也實現了相應的功能,比如hbase框架實現的布隆過濾器性能是非常的棒,redis也可以實現相應的功能。這些需要借助於第三方框架,需要維護第三方框架。如果公司沒有部署相應架構,單獨為使用布隆過濾器部署一套集群,代價還是非常大的。

        我們在做流式計算時需要實現數據小時級別去重和天級別數據去重,初始功能版本使用的是基於redis實現的布隆過濾器。性能也非常的好,三個節點的redis集群(三主三從,主從交叉策略)性能可以達到每秒十幾萬的處理性能。在后期的使用中主要瓶頸就在redis的吞吐量的性能上。一直想在這塊做一定的性能優化。

        后來,發現spark官方封裝了基於DataFrame的布隆過濾器,使用起來相當方便。性能不再受制於第三方框架的吞吐量限制,依賴於spark的並行資源。可以減少架構設計的復雜度,提高可維護性。在流式計算應用中可以將布隆過濾器做成driver級別的全局變量,在batch結束更新布隆過濾器。如果考慮容錯,可以將布隆過濾器數據定期持久化到磁盤(hdfs/redis)。

       直接上代碼,看一下使用方法

 val bf = df.stat.bloomFilter("dd",dataLen,0.01) val rightNum = rdd.map(x=>(x.toInt,bf.mightContainString(x)))

首先,在生成布隆過濾器直接調用bloomFilter(colName:String,expectedNumItems:Long,fpp:Double)就可以了,第一個參數是使用的數據列,第二個參數是數據量期望會有多少,第三個參數是損失精度。損失精度越低生成的布隆數組長度就會越長,占用的空間就會越多,計算過程就會越漫長。

        在用有些場景布隆過濾器還需要合並,官方也提供了相應的API

   mergeInPlace(BloomFilter var1):BloomFilter

    判定數據是否存在,官方一共提供了四個方法:

    mightContain(Object var1),     mightContainString(String val1),     mightContainLong(long var1),     mightContainBinary(byte[] var1)

不同的方法適用於不同的類型,bloomFilter(calname:String...)這個方法中使用列的數據類型一定要和以上四個方法對應,否則會出問題。

        官方還很貼心的提供了序列化和反序列化工具:writeTo和readFrom,可以很方便的將布隆過濾器序列化到磁盤和從磁盤加載布隆過濾器。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM