Hyperloglog算法


什么是Hyperloglog?

  • 一個在大數據量下統計基數的算法, 占用內存小, 誤差小, 但是會損失一定精度(Kylin中需要高精度可以用bitmap)。

作為數據人, 我們為何要了解它?

  • 它與我們的部分實際業務是有關聯的, 理解原理能更好的做好工作。
    • 應用了Hyperloglog算法的框架:
      • Redis
      • Apache Kylin

理解方式

  • 有兩種理解方式

    • 在理想狀態下, 將一對數據hash至[0, 1], 每兩點間距離d相等, 則這堆數據的基數即為 1/d。

      • 但實際情況通常都不能如願, 只能用分桶取kmax的方式不斷逼近該基數值(積分?)。
      • 分桶將數據分為m組, 每組取第k個位置的值, 所有組中得到最大的kmax, (k - 1)/kmax 即為得到估計的基數。
    • 以拋硬幣的方式理解

      • 以拋硬幣出現一次反面為一次過程, 記錄為1, 若拋硬幣為正面則記錄為0。

      • 當實驗次數k很大時, 硬幣不出現反面的概率基本為0。

      • 轉換到基數的思想是: 可以用第一個1出現前0的個數n來統計基數。

      • 當基數大致為2n+1時, 硬幣的概率統計可以為:

        \[\frac{1}{2}*1+\frac{1}{4}*2+\frac{1}{8}*3 ...... \]

算法偽代碼

img

  • 流程概括:
    • hash成32位的值, 並獲取最左位置為1所對應的數
    • 初始化m個登記表, m∈[24, 216]
    • 計算出每組最大的首零位
    • 計算基數預估值並根據預估值大小做調整

Hyperloglog的開源Java實現

/*
 * Copyright (C) 2012 Clearspring Technologies, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.clearspring.analytics.stream.cardinality;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.Serializable;

import com.clearspring.analytics.hash.MurmurHash;
import com.clearspring.analytics.util.Bits;
import com.clearspring.analytics.util.IBuilder;

/**
 * Java implementation of HyperLogLog (HLL) algorithm from this paper:
 * <p/>
 * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
 * <p/>
 * HLL is an improved version of LogLog that is capable of estimating
 * the cardinality of a set with accuracy = 1.04/sqrt(m) where
 * m = 2^b.  So we can control accuracy vs space usage by increasing
 * or decreasing b.
 * 准確度: a = 1.04/sqrt(m), m = 2^b, 可以通過增加或減少b參數來控制精度和占用空間
 * <p/>
 * The main benefit of using HLL over LL is that it only requires 64%
 * of the space that LL does to get the same accuracy.
 * Hyperloglog算法最大的優勢是它只需要常規loglog算法的64%空間就能維持與其相等的精度
 * <p/>
 * This implementation implements a single counter.  If a large (millions)
 * number of counters are required you may want to refer to:
 * 此實現僅實現了單個計數器, 如果需要千百萬數量的計數器, 請參考以下鏈接:
 * <p/>
 * http://dsiutils.di.unimi.it/
 * <p/>
 * It has a more complex implementation of HLL that supports multiple counters
 * in a single object, drastically reducing the java overhead from creating
 * a large number of objects.
 * 它有更復雜的支持單對象中有多個計數器的Hyperloglog實現, 大幅度減少了java創建大量對象的開銷
 * <p/>
 * This implementation leveraged a javascript implementation that Yammer has
 * been working on:
 * 該實現對Yammer所做的js實現有一定影響
 * <p/>
 * https://github.com/yammer/probablyjs
 * <p>
 * Note that this implementation does not include the long range correction function
 * defined in the original paper.  Empirical evidence shows that the correction
 * function causes more harm than good.
 * 需要注意的是, 此實現沒有包含原先paper中的長跨度修正函數。實驗表明修正函數的負面影響大於正面影響。
 * </p>
 * <p/>
 * <p>
 * Users have different motivations to use different types of hashing functions.
 * 使用者有不同的動機來使用不同的哈希函數, 
 * Rather than try to keep up with all available hash functions and to remove
 * the concern of causing future binary incompatibilities this class allows clients
 * to offer the value in hashed int or long form.  
 * 是設法保留所有哈希函數並移除所有會導致將來的
 * 二進制不兼容性比如該類允許客戶端提供hashed int 或者 hashed long形式的參數。
 * This way clients are free to change their hash function on their own time line. 
 * 此方式下客戶端可以隨意在它們的時間線上改變它們的哈希函數。
 * We recommend using Google's Guava Murmur3_128 implementation as it provides good 
 * performance and speed when high precision is required.  
 * 我們推薦使用Google的Guava Murmur3_128實現, 因為它在高精度要求下提供了優秀的性能和速
 * 度。
 * In our tests the 32bit MurmurHash function included in this project is faster and 
 * produces better results than the 32 bit murmur3 implementation google provides.
 * 在我們的測試中此項目中的32bit MurmurHash 函數 相比Google提供的 32 bit murmur3實現 更
 * 快且產生了更好的結果。
 * </p>
 */
public class HyperLogLog implements ICardinality, Serializable {

    // 注冊集
    private final RegisterSet registerSet;
    private final int log2m;
    private final double alphaMM;


    /**
     * Create a new HyperLogLog instance using the specified standard deviation.
     * 通過使用特定的標准差創建一個新的HyperLogLog實例。
     *
     * rsd是該計數器的相對標准差, 該值越小, 創建計數器就需要更多的空間(精度與空間的取舍)。
     * @param rsd - the relative standard deviation for the counter.
     *            smaller values create counters that require more space.
     */
    public HyperLogLog(double rsd) {
        this(log2m(rsd));
    }

    private static int log2m(double rsd) {
        return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2));
    }

    private static double rsd(int log2m) {
        return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2)));
    }

    private static double logBase(double exponent, double base) {
        return Math.log(exponent) / Math.log(base);
    }

    private static int accuracyToLog2m(double accuracy) {
        return Math.toIntExact(2 * Math.round(logBase(1.04 / (1 - accuracy), 2)));
    }

    private static void validateLog2m(int log2m) {
        if (log2m < 0 || log2m > 30) {
            throw new IllegalArgumentException("log2m argument is "
                                               + log2m + " and is outside the range [0, 30]");
        }
    }

    /**
     * Create a new HyperLogLog instance.  The log2m parameter defines the accuracy 
     * of the counter.  
     * 創建一個新的Hyperloglog實例, log2m參數定義了計數器的准確度(log2m越大越准確)
     * The larger the log2m the better the accuracy.<p/>
     * accuracy = 1 - 1.04/sqrt(2^log2m)
     *
     * @param log2m - the number of bits to use as the basis for the HLL instance
     * log2m: 被用作HyperLogLog實例基類的比特數
     */
    public HyperLogLog(int log2m) {
        this(log2m, new RegisterSet(1 << log2m));
    }

    /**
     * Creates a new HyperLogLog instance using the given registers. 
     * 用所給的注冊集創建一個新的HyperLogLog實例(已過時)。
     * Used for unmarshalling a serialized
     * instance and for merging multiple counters together.
     * 用於解組一個序列化過的實例以及合並多個計數器
     *
     * @param registerSet - the initial values for the register set
     * 注冊集的初始值
     */
    @Deprecated
    public HyperLogLog(int log2m, RegisterSet registerSet) {
        validateLog2m(log2m);
        this.registerSet = registerSet;
        this.log2m = log2m;
        int m = 1 << this.log2m;

        alphaMM = getAlphaMM(log2m, m);
    }

    @Override
    public boolean offerHashed(long hashedValue) {
        // j becomes the binary address determined by the first b log2m of x
        // j成為了由第一個b(即log2m)所決定的地址, >>> 無符號右移, 若hashedValue為正則高位補0, 若為負責, 則右移后補0. 等價於:
        /*
        	if(hashedValue == 0){
        		j = 0
        	} else if(hashValue > 0){
        		j = hashedValue >> (Long.SIZE - log2m) = hashedValue/2^(Long.SIZE - log2m)
        	} else {
        		j = -hashedValue >> (Long.SIZE - log2m) = -hashedValue/2^(Long.SIZE - log2m)
        	}
        */
        // j will be between 0 and 2^log2m j會在0~2^log2m之間
        // 比較j位置的桶內的數值與傳入的值r, 比較當前值和新值, 如果新值大就更新
        final int j = (int) (hashedValue >>> (Long.SIZE - log2m));
        final int r = Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
        return registerSet.updateIfGreater(j, r);
    }

    @Override
    public boolean offerHashed(int hashedValue) {
        // j becomes the binary address determined by the first b log2m of x
        // j will be between 0 and 2^log2m
        final int j = hashedValue >>> (Integer.SIZE - log2m);
        final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
        return registerSet.updateIfGreater(j, r);
    }

    @Override
    public boolean offer(Object o) {
        final int x = MurmurHash.hash(o);
        return offerHashed(x);
    }


    @Override
    public long cardinality() {
        double registerSum = 0;
        int count = registerSet.count;
        double zeros = 0.0;
        for (int j = 0; j < registerSet.count; j++) {
            int val = registerSet.get(j);
            registerSum += 1.0 / (1 << val);
            if (val == 0) {
                zeros++;
            }
        }

        double estimate = alphaMM * (1 / registerSum);

        if (estimate <= (5.0 / 2.0) * count) {
            // Small Range Estimate 小范圍的預估
            return Math.round(linearCounting(count, zeros));
        } else {
            return Math.round(estimate);
        }
    }

    @Override
    public int sizeof() {
        return registerSet.size * 4;
    }

    @Override
    public byte[] getBytes() throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutput dos = new DataOutputStream(baos);
        writeBytes(dos);
        baos.close();

        return baos.toByteArray();
    }

    private void writeBytes(DataOutput serializedByteStream) throws IOException {
        serializedByteStream.writeInt(log2m);
        serializedByteStream.writeInt(registerSet.size * 4);
        for (int x : registerSet.readOnlyBits()) {
            serializedByteStream.writeInt(x);
        }
    }

    /**
     * Add all the elements of the other set to this set.
     * 將所有其他結合的元素放入此集合
     * <p/>
     * This operation does not imply a loss of precision.
     * 此操作不會產生精度的損失
     *
     * @param other A compatible Hyperloglog instance (same log2m)
     * 另一個可兼容的HyperLogLog實例(相同的 log2m)
     * @throws CardinalityMergeException if other is not compatible
     */
    public void addAll(HyperLogLog other) throws CardinalityMergeException {
        if (this.sizeof() != other.sizeof()) {
            throw new HyperLogLogMergeException("Cannot merge estimators of different sizes");
        }

        registerSet.merge(other.registerSet);
    }

    @Override
    public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException {
        HyperLogLog merged = new HyperLogLog(log2m, new RegisterSet(this.registerSet.count));
        merged.addAll(this);

        if (estimators == null) {
            return merged;
        }

        for (ICardinality estimator : estimators) {
            if (!(estimator instanceof HyperLogLog)) {
                throw new HyperLogLogMergeException("Cannot merge estimators of different class");
            }
            HyperLogLog hll = (HyperLogLog) estimator;
            merged.addAll(hll);
        }

        return merged;
    }

    private Object writeReplace() {
        return new SerializationHolder(this);
    }

    /**
     * This class exists to support Externalizable semantics for
     * HyperLogLog objects without having to expose a public
     * constructor, public write/read methods, or pretend final
     * fields aren't final.
     * 該類的存在時為了支持Hyperloglog對象的外部化語義並不暴露公有構造器, 公有讀寫方式, 或
     * 者預防最終fields不為final
     *
     * In short, Externalizable allows you to skip some of the more
     * verbose meta-data default Serializable gets you, but still
     * includes the class name. In that sense, there is some cost
     * to this holder object because it has a longer class name. I
     * imagine people who care about optimizing for that have their
     * own work-around for long class names in general, or just use
     * a custom serialization framework. Therefore we make no attempt
     * to optimize that here (eg. by raising this from an inner class
     * and giving it an unhelpful name).
     * 簡短的說Externalizable允許你跳過一些冗長的元數據默認序列化, 但仍包含類名。如此, 維持該長名對象就有一定的開銷。此處沒有做優化的想法。
     */
    private static class SerializationHolder implements Externalizable {

        HyperLogLog hyperLogLogHolder;

        public SerializationHolder(HyperLogLog hyperLogLogHolder) {
            this.hyperLogLogHolder = hyperLogLogHolder;
        }

        /**
         * required for Externalizable 
         * Externalizable 不需要序列化的時候可以用
         */
        public SerializationHolder() {

        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            hyperLogLogHolder.writeBytes(out);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            hyperLogLogHolder = Builder.build(in);
        }

        private Object readResolve() {
            return hyperLogLogHolder;
        }
    }

    public static class Builder implements IBuilder<ICardinality>, Serializable {
        private static final long serialVersionUID = -2567898469253021883L;

        private final double rsd;
        private transient int log2m;

        /**
         * Uses the given RSD percentage to determine how many bytes the constructed HyperLogLog will use.
         * 使用所給的RSD比例來決定所構造的HyperLogLog會占用多少字節(已過時)
         * @deprecated Use {@link #withRsd(double)} instead. This builder's constructors did not match the (already
         * themselves ambiguous) constructors of the HyperLogLog class, but there is no way to make them match without
         * risking behavior changes downstream.
         */
        @Deprecated
        public Builder(double rsd) {
            this.log2m = log2m(rsd);
            validateLog2m(log2m);
            this.rsd = rsd;
        }

        /** This constructor is private to prevent behavior change for ambiguous usages. (Legacy support). 
        * 此構造器為了以防語意不清的使用, 所以是私有的。
        */
        private Builder(int log2m) {
            this.log2m = log2m;
            validateLog2m(log2m);
            this.rsd = rsd(log2m);
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            this.log2m = log2m(rsd);
        }

        @Override
        public HyperLogLog build() {
            return new HyperLogLog(log2m);
        }

        @Override
        public int sizeof() {
            int k = 1 << log2m;
            return RegisterSet.getBits(k) * 4;
        }

        public static Builder withLog2m(int log2m) {
            return new Builder(log2m);
        }

        public static Builder withRsd(double rsd) {
            return new Builder(rsd);
        }

        public static Builder withAccuracy(double accuracy) { return new Builder(accuracyToLog2m(accuracy)); }

        public static HyperLogLog build(byte[] bytes) throws IOException {
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            return build(new DataInputStream(bais));
        }

        public static HyperLogLog build(DataInput serializedByteStream) throws IOException {
            int log2m = serializedByteStream.readInt();
            int byteArraySize = serializedByteStream.readInt();
            return new HyperLogLog(log2m,
                    new RegisterSet(1 << log2m, Bits.getBits(serializedByteStream, byteArraySize)));
        }
    }

    @SuppressWarnings("serial")
    protected static class HyperLogLogMergeException extends CardinalityMergeException {

        public HyperLogLogMergeException(String message) {
            super(message);
        }
    }

    protected static double getAlphaMM(final int p, final int m) {
        // See the paper.
        switch (p) {
            case 4:
                return 0.673 * m * m;
            case 5:
                return 0.697 * m * m;
            case 6:
                return 0.709 * m * m;
            default:
                return (0.7213 / (1 + 1.079 / m)) * m * m;
        }
    }

    protected static double linearCounting(int m, double V) {
        return m * Math.log(m / V);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM