Flink之布隆過濾器


大數據處理中,用去去重,布隆過濾器十分常見

1、代碼

// 定義一個布隆過濾器
class Bloom(size: Long) extends Serializable {
  //布隆過濾器的默認大小是32M
  //32 * 1024 * 1024 * 8
  //2^5  2^10   2^10 * 2^3

  //1后面28個0
  private val cap = if (size > 0) size else 1 << 28

  //定義hash函數的結果,當做位圖的offset
  def hash(value: String, seed: Int): Long = {
    var result = 0L
    for( i <- 0 until value.length ){
      //各種方法去實現都行
      result += result * seed + value.charAt(i)
    }
    //他們之間進行&運算結果一定在位圖之間
    result  & ( cap - 1 ) //0后面28個1
  }
}

2、使用

//1、定義一個對象
lazy val bloom = new Bloom(1<<28)
//2、使用布隆對象對數據進行hash,從而獲取偏移量
val offset = bloom.hash(userId, 61) 

 =================================

除了自定義的布隆過濾器,還可以使用Twitter 的開源包

使用案例:https://mp.weixin.qq.com/s/0LyPCADTAHCmV1eLA5h9oQ
<dependency>
    <groupId>com.clearspring.analytics</groupId>
    <artifactId>stream</artifactId>
    <version>2.7.0</version>
</dependency>
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.clearspring.analytics.stream.membership.BloomFilter;
 
public class BloomFilterFunction implements Function<String, Void> {
    BloomFilter filter = new BloomFilter(20, 20);
 
    Void process(String input, Context context) throws Exception {
      if (!filter.isPresent(input)) {
        filter.add(input);
        // Route to “not seen” topic
        context.publish(“notSeenTopic”, input);
      }
      return null;
   }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM