Go語言同步和異步執行多個任務封裝


同步適合多個連續執行的,每一步的執行依賴於上一步操作,異步執行則和任務執行順序無關(如從10個站點抓取數據)

同步執行類RunnerAsync

支持返回超時檢測,系統中斷檢測

錯誤常量定義,task/err.go

package task

import "errors"


//超時錯誤
var ErrTimeout = errors.New("received timeout")

//操作系統系統中斷錯誤
var ErrInterrupt = errors.New("received interrupt")

實現代碼如下,task/runner_async.go

package task

import (
    "os"
    "os/signal"
    "time"
)

//同步執行任務
type RunnerAsync struct {
    //操作系統的信號檢測
    interrupt chan os.Signal

    //記錄執行完成的狀態
    complete chan error

    //超時檢測
    timeout <-chan time.Time

    //保存所有要執行的任務,順序執行
    tasks []func(id int)
}

//new一個RunnerAsync對象
func NewRunnerAsync(d time.Duration) *RunnerAsync {
    return &RunnerAsync{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
    }
}

//添加一個任務
func (this *RunnerAsync) Add(tasks ...func(id int)) {
    this.tasks = append(this.tasks, tasks...)
}

//啟動RunnerAsync,監聽錯誤信息
func (this *RunnerAsync) Start() error {

    //接收操作系統信號
    signal.Notify(this.interrupt, os.Interrupt)

    //執行任務
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回執行結果
    case err := <-this.complete:
        return err

        //超時返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//順序執行所有的任務
func (this *RunnerAsync) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }
        //執行任務
        task(id)
    }
    return nil
}

//判斷是否接收到操作系統中斷信號
func (this *RunnerAsync) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收別的信號
        signal.Stop(this.interrupt)
        return true
        //正常執行
    default:
        return false
    }
}

使用方法    

Add添加一個任務,任務為接收int類型的一個閉包

Start開始執行傷,返回一個error類型,nil為執行完畢, ErrTimeout代表執行超時,ErrInterrupt代表執行被中斷(類似Ctrl + C操作)

 

測試代碼

task/runner_async_test.go

package task

import (
	"fmt"
	"os"
	"runtime"
	"testing"
	"time"
)

func TestRunnerAsync_Start(t *testing.T) {

	//開啟多核
	runtime.GOMAXPROCS(runtime.NumCPU())

	//創建runner對象,設置超時時間
	runner := NewRunnerAsync(8 * time.Second)
	//添加運行的任務
	runner.Add(
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
	)

	fmt.Println("同步執行任務")

	//開始執行任務
	if err := runner.Start(); err != nil {
		switch err {
		case ErrTimeout:
			fmt.Println("執行超時")
			os.Exit(1)
		case ErrInterrupt:
			fmt.Println("任務被中斷")
			os.Exit(2)
		}
	}

	t.Log("執行結束")

}

//創建要執行的任務
func createTaskAsync() func(id int) {
	return func(id int) {
		fmt.Printf("正在執行%v個任務\n", id)
		//模擬任務執行,sleep兩秒
		//time.Sleep(1 * time.Second)
	}
}

執行結果

同步執行任務
正在執行0個任務
正在執行1個任務
正在執行2個任務
正在執行3個任務
正在執行4個任務
正在執行5個任務
正在執行6個任務
正在執行7個任務
正在執行8個任務
正在執行9個任務
正在執行10個任務
正在執行11個任務
正在執行12個任務

  

異步執行類Runner

支持返回超時檢測,系統中斷檢測

實現代碼如下,task/runner.go

package task

import (
    "os"
    "time"
    "os/signal"
    "sync"
)

//異步執行任務
type Runner struct {
    //操作系統的信號檢測
    interrupt chan os.Signal

    //記錄執行完成的狀態
    complete chan error

    //超時檢測
    timeout <-chan time.Time

    //保存所有要執行的任務,順序執行
    tasks []func(id int) error

    waitGroup sync.WaitGroup

    lock sync.Mutex

    errs []error
}

//new一個Runner對象
func NewRunner(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
        waitGroup: sync.WaitGroup{},
        lock:      sync.Mutex{},
    }
}

//添加一個任務
func (this *Runner) Add(tasks ...func(id int) error) {
    this.tasks = append(this.tasks, tasks...)
}

//啟動Runner,監聽錯誤信息
func (this *Runner) Start() error {

    //接收操作系統信號
    signal.Notify(this.interrupt, os.Interrupt)

    //並發執行任務
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回執行結果
    case err := <-this.complete:
        return err
        //超時返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//異步執行所有的任務
func (this *Runner) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }

        this.waitGroup.Add(1)
        go func(id int) {
            this.lock.Lock()

            //執行任務
            err := task(id)
            //加鎖保存到結果集中
            this.errs = append(this.errs, err)

            this.lock.Unlock()
            this.waitGroup.Done()
        }(id)
    }
    this.waitGroup.Wait()

    return nil
}

//判斷是否接收到操作系統中斷信號
func (this *Runner) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收別的信號
        signal.Stop(this.interrupt)
        return true
        //正常執行
    default:
        return false
    }
}

//獲取執行完的error
func (this *Runner) GetErrs() []error {
    return this.errs
}

使用方法    

Add添加一個任務,任務為接收int類型,返回類型error的一個閉包

Start開始執行傷,返回一個error類型,nil為執行完畢, ErrTimeout代表執行超時,ErrInterrupt代表執行被中斷(類似Ctrl + C操作)

getErrs獲取所有的任務執行結果

 

測試示例代碼

task/runner_test.go

package task

import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)

func TestRunner_Start(t *testing.T) {
    //開啟多核心
    runtime.GOMAXPROCS(runtime.NumCPU())

    //創建runner對象,設置超時時間
    runner := NewRunner(18 * time.Second)
    //添加運行的任務
    runner.Add(
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
    )

    fmt.Println("異步執行任務")

    //開始執行任務
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("執行超時")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任務被中斷")
            os.Exit(2)
        }
    }

    t.Log("執行結束")

    t.Log(runner.GetErrs())

}

//創建要執行的任務
func createTask() func(id int) error {
    return func(id int) error {
        fmt.Printf("正在執行%v個任務\n", id)
        //模擬任務執行,sleep
        //time.Sleep(1 * time.Second)
        return nil
    }
}

執行結果

異步執行任務
正在執行2個任務
正在執行1個任務
正在執行4個任務
正在執行3個任務
正在執行6個任務
正在執行5個任務
正在執行9個任務
正在執行7個任務
正在執行10個任務
正在執行13個任務
正在執行8個任務
正在執行11個任務
正在執行12個任務
正在執行0個任務 

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM