.NET/JAVA/GO 固定時間窗口算法實現(無鎖線程安全)


一.前言

最近有一個生成 APM TraceId 的需求,公司的APM系統的 TraceId 的格式為:APM AgentId+毫秒級時間戳+自增數字,根據此規則生成的 Id 可以保證全局唯一(有 NTP 時間同步),前兩個字段好說,最后一個字段也不復雜,我的想法是按秒來進行自增。比如說1秒的時候,自增計數為100,在2秒的時候會重置為0,然后進行自增。其實這個思想就是固定時間窗口算法,這個算法一般常用在限流、Id生成器等場景。

二. .NET 代碼實現

long _currentTime;
long _current;
public long FixedWindow()
{
    var now = DateTimeOffset.Now.ToUnixTimeSeconds();
    var ct = Interlocked.Read(ref _currentTime);
    if (now > ct)
    {
        if (Interlocked.CompareExchange(ref _currentTime, now, ct)==ct)
        {
            Interlocked.Exchange(ref _current, 0);
        }
    }
    
    return Interlocked.Increment(ref _current);
}

代碼沒多少,每調用一次就返回計數,采用的 C# CAS API Interlocked ,保證每個計數操作都是原子操作,從而達到無鎖。

測試代碼,使用10個線程並發調用,每個線程調用 1w次,最終期望計數應該是10w。

private static long _currentTime;
private static long _current;
private static Semaphore _semaphore = new Semaphore(0, 10);
static void Main(string[] args)
{
    _currentTime = DateTimeOffset.Now.ToUnixTimeSeconds();
    _current = 0;
    for (int i = 0; i < 10; i++)
    {
        Task.Factory.StartNew(() =>
        {
            for (int j = 0; j < 10000; j++)
            {
                FixedWindow();
            }

            _semaphore.Release(1);
        });
    }

    for (int i = 0; i < 10; i++)
    {
        _semaphore.WaitOne();
    }
    
    Console.WriteLine(_current);
    Console.WriteLine("sleep 2s");
    Thread.Sleep(2000);
    Console.WriteLine(FixedWindow());
}

執行結果:

image-20220217141347106

符合預期,10線程的計數在 1s 內能執行完畢,所以最終計數是10w,然后sleep 2s,重置計數,再次調用就返回了 1

三.JAVA代碼實現

static AtomicLong currentTime = new AtomicLong();
static AtomicLong currentNumber = new AtomicLong();

public static long fixedWindow() {
    long now = currentTimeSeconds();
    long ct = currentTime.get();
    if (now > ct) {
        if (currentTime.compareAndSet(ct, now)) {
            currentNumber.set(0);
        }
    }

    return currentNumber.incrementAndGet();
}

public static long currentTimeSeconds(){
    return System.currentTimeMillis() / 1000;
}

測試代碼:

public static void main(String[] args) throws InterruptedException {
    currentTime.set(currentTimeSeconds());
    currentNumber.set(0);

    long num = 0;
    for (int i = 0; i < 1000; i++) {
        num = fixedWindow();
    }
    System.out.println(num);
    Thread.sleep(2000);
    System.out.println(fixedWindow());
}

執行結果:

符合預期,但是以上代碼用在生產環境,需要自定替換 currentTimeSeconds 方法的實現,不能每次都調用 System.currentTimeMillis(),在多線程同時調用下,會有性能問題,可以自己實現一個定時器來返回當前時間

四.GO代碼實現

var currentTime atomic.Int64
var currentNumber atomic.Int64

func fixedWindow() int64 {
	now := time.Now().Unix()
	ct := currentTime.Load()
	if now > ct {
		if currentTime.CAS(ct, now) {
			currentNumber.Store(0)
		}
	}

	return currentNumber.Inc()

}

測試代碼:

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			for j := 0; j < 10000; j++ {
				fixedWindow()
			}
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Println(currentNumber.Load())
	time.Sleep(2 * time.Second)
	fmt.Println(fixedWindow())
}

執行結果:

符合預期,10個協程的計數在 1s 內能執行完畢,所以最終計數是10w,然后sleep 2s,重置計數,再次調用就返回了 1

五.資料

本文 Demo 代碼:github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM