go goroutine channel 和C# Task BlockingCollection 以及python該如何實現


首先說結論吧,個人感覺go的goroutine 和C# 的Task 相似,goroutine 和Task 可以近似理解為邏輯線程, 至於多個goroutine 或Task 對應操作系統幾個物理線程 是底層決定的,我們可以不用太關心;但是一定是多對多【這個我們可以簡單理解多對一, 一個或多個goroutine 或Task 對應底層一個物理線程】, 具體的blockingcollection可以參考 https://blog.csdn.net/ma_jiang/article/details/54561684, go channel 可以參考https://blog.csdn.net/ma_jiang/article/details/84497607

channel 和BlockingCollection 可以再多線程之間通信,尤其是在同步通信 都是運用它們阻塞的特定來做了。比如常見的接力賽: 使用無緩沖的通道,在 goroutine 之間同步數據,來模擬接力比賽。在接力比賽里,4 個跑步者圍繞賽道輪流跑。第二個、第三個和第四個跑步者要接到前一位跑步者的接力棒后才能起跑。比賽中最重要的部分是要傳遞接力棒,要求同步傳遞。在同步接力棒的時候,參與接力的兩個跑步者必須在同一時刻准備好交接。

go的代碼:

package main

import (
    "fmt"
    "sync"
    "time"
)

// wg 用來等待程序結束
var wg sync.WaitGroup

// main 是所有Go 程序的入口
func main() {
    // 創建一個無緩沖的通道
    baton := make(chan int)
    // 為最后一位跑步者將計數加1
    wg.Add(1)
    // 第一位跑步者持有接力棒
    go Runner(baton)
    // 開始比賽
    baton <- 1
    // 等待比賽結束
    wg.Wait()
}

// Runner 模擬接力比賽中的一位跑步者
func Runner(baton chan int) {
    var newRunner int
    // 等待接力棒
    runner := <-baton
    // 開始繞着跑道跑步
    fmt.Printf("Runner %d Running With Baton\n", runner)
    // 創建下一位跑步者
    if runner != 4 {
        newRunner = runner + 1
        fmt.Printf("Runner %d To The Line\n", newRunner)
        go Runner(baton)
    }
    // 圍繞跑道跑
    time.Sleep(100 * time.Millisecond)
    // 比賽結束了嗎?
    if runner == 4 {
        fmt.Printf("Runner %d Finished, Race Over\n", runner)
        wg.Done()
        return
    }
    // 將接力棒交給下一位跑步者
    fmt.Printf("Runner %d Exchange With Runner %d\n",
        runner,
        newRunner)
    baton <- newRunner
}

C#代碼:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace demo
{
    class Program
    {
        static void Main(string[] args)
        {
            // 創建一個無緩沖的通道
            var baton =new BlockingCollection<int>(1);
            // 第一位跑步者持有接力棒
            Task.Factory.StartNew(x => Runner((BlockingCollection<int>)x), baton);
            baton.Add(1);
            while(!baton.IsCompleted){
                Thread.Sleep(1000);
            }
            Console.Read();
        }
        static void Runner(BlockingCollection<int> baton){
           int  newRunner=0 ;
            // 等待接力棒 
            int runner=baton.Take();
            // 開始繞着跑道跑步
            Console.WriteLine($"Runner {runner} Running With Baton");
            // 創建下一位跑步者
            if (runner!=4){
                newRunner=runner+1;
                Console.WriteLine($"Runner {runner} To The Line");
                Task.Factory.StartNew(x=>Runner((BlockingCollection<int>)x),baton);
            }
            // 圍繞跑道跑
            Thread.Sleep(100);
            // 比賽結束了嗎?
            if(runner==4){
                Console.WriteLine($"Runner {runner} Finished, Race Over");
                 baton.CompleteAdding();
                return;
            }
            Console.WriteLine($"Runner {runner} Exchange With Runner {newRunner}");
            baton.Add(newRunner);

        }
    }
}

運行結果:

Python, 首先python 有隊列queue 但是它不是線程安全的, 也沒有阻塞的功能, 因此 我們需要自己實現一個 線程安全的隊列, 並且具有阻塞功能threadSafeQueue.py 如下:

import time
import threading
 
# 線程安全的隊列
class ThreadSafeQueue(object):
 
    def __init__(self, max_size=0):
        self.queue = []
        self.max_size = max_size
        self.lock = threading.Lock()
        self.condition = threading.Condition()
 
    # 當前隊列元素的數量
    def size(self):
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size
 
    # 往隊列里面放入元素
    def put(self, item):
        if self.max_size != 0 and self.size() > self.max_size:
            return Exception()
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()
        pass
 
    def batch_put(self, item_list):
        if not isinstance(item_list, list):
            item_list = list(item_list)
        for item in item_list:
            self.put(item)
 
    # 從隊列取出元素
    def pop(self, block=True, timeout=None):
        if self.size() == 0:
            # 需要阻塞等待
            if block:
                self.condition.acquire()
                self.condition.wait(timeout=timeout)
                self.condition.release()
            else:
                return None
        self.lock.acquire()
        item = None
        if len(self.queue) > 0:
            item = self.queue.pop()
        self.lock.release()
        return item
 
    def get(self, index):
        self.lock.acquire()
        item = self.queue[index]
        self.lock.release()
        return item

調用如下:

from threadSafeQueue import ThreadSafeQueue
import threading
import time
 
 
def Runner(baton):
    newRunner=0
    runner=baton.pop()
    print("Runner %s Running With Baton" % runner)
    if runner!=4:
        newRunner=int(runner)+1
        print("Runner %d To The Line" % runner)
        t=threading.Thread(target=Runner,args=(baton,))
        t.start()
    time.sleep(10)
    if runner==4:
        print("Runner %d Finished, Race Over" %runner)
        return
    print("Runner %d Exchange With Runner %d" %(runner,newRunner))
    baton.put(newRunner)
 
 
 
if __name__ == '__main__':
    baton=ThreadSafeQueue(1)
    baton.put(1)
    t=threading.Thread(target = Runner, args=(baton,))
    t.start()
    str1 = input()
    print(str1)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM