Python之threading模塊的使用


作用:同一個進程空間並發運行多個操作,專業術語簡稱為:【多線程】

1、任務函數不帶參數多線程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading

def worker():
    print('worker...')

threads = []
for i in range(3):
    task = threading.Thread(target=worker)
    threads.append(task)
    task.start()

print(threads)
threading_simple.py

 運行效果

[root@ mnt]# python3 threading_simple.py 
worker...
worker...
worker...
[<Thread(Thread-1, stopped 140040860006144)>, <Thread(Thread-2, stopped 140040860006144)>, <Thread(Thread-3, stopped 140040860006144)>]

 2、任務函數帶參數多線程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading

def worker(num):
    print('worker %s...' % num)

threads = []
for i in range(3):
    task = threading.Thread(target=worker, args=(i,))
    threads.append(task)
    task.start()

print(threads)
threading_simple_args.py

運行效果

[root@ mnt]# python3 threading_simple_args.py 
worker 0...
worker 1...
worker 2...
[<Thread(Thread-1, stopped 140326829315840)>, <Thread(Thread-2, stopped 140326829315840)>, <Thread(Thread-3, stopped 140326829315840)>]

 3、線程標識名字設置和獲取

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time

def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')

def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')

my_service_task = threading.Thread(name='my_service', target=my_service)
worker_task = threading.Thread(name='worker', target=worker)
default_task = threading.Thread(target=worker)  # 使用默認的名字Thread-1 

my_service_task.start()
worker_task.start()
default_task.start()
threading_name.py

運行效果

[root@ mnt]# python3 threading_name.py 
my_service Starting
worker Starting
Thread-1 Starting
worker Exiting
Thread-1 Exiting
my_service Exiting

 4、線程標識名字設置和獲取,利用logging模塊打印出來日志,調試一般不建議用print打印出來

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time
import logging

def worker():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

def my_service():
    logging.debug('Starting')
    time.sleep(0.3)
    logging.debug('Exiting')

logging.basicConfig(
    level=logging.DEBUG,
    # 以下格式化,參考官方文檔:https://docs.python.org/3/library/logging.html
    format='[%(levelname)s] (%(thread)d) (%(threadName)-10s) %(message)s',
)

my_service_task = threading.Thread(name='my_service', target=my_service)
worker_task = threading.Thread(name='worker', target=worker)
default_task = threading.Thread(target=worker)  # 使用默認的名字Thread-1

my_service_task.start()
worker_task.start()
default_task.start()
threading_name_logging.py

運行效果

[root@ mnt]# python3 threading_name_logging.py 
[DEBUG] (140555433457408) (my_service) Starting
[DEBUG] (140555354896128) (worker    ) Starting
[DEBUG] (140555346503424) (Thread-1  ) Starting
[DEBUG] (140555354896128) (worker    ) Exiting
[DEBUG] (140555346503424) (Thread-1  ) Exiting
[DEBUG] (140555433457408) (my_service) Exiting

 5、守護線程隨着主程序退出而關閉線程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time
import logging

def daemon():
    logging.debug('daemon Starting...')
    time.sleep(0.2)
    logging.debug('daemon Exiting...')


def non_daemon():
    logging.debug('non_daemon Starting...')
    logging.debug('non_daemon Exiting...')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s'
)

daemon_task = threading.Thread(name='daemon', target=daemon, daemon=True) #設置為首護線程
non_daemon_task = threading.Thread(name='non_daemon_task', target=non_daemon)

daemon_task.start()
non_daemon_task.start()
threading_daemon.py

運行效果

[root@mnt]# python3 threading_daemon.py 
(daemon    ) daemon Starting...
(non_daemon_task) non_daemon Starting...
(non_daemon_task) non_daemon Exiting...
#由於守護線程還沒有執行完,主進程已退出,守護線程即隨之被終止,從而導致【daemon Exiting...】沒打印出來

 6、等待守護線程運行結束,才關閉主程序

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time
import logging

def daemon():
    logging.debug('daemon Starting...')
    time.sleep(0.2)
    logging.debug('daemon Exiting...')

def non_daemon():
    logging.debug('non_daemon Starting...')
    logging.debug('non_daemon Exiting...')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s'
)

daemon_task = threading.Thread(name='daemon', target=daemon, daemon=True)  # 設置為首護線程
non_daemon_task = threading.Thread(name='non_daemon_task', target=non_daemon)

daemon_task.start()
non_daemon_task.start()

daemon_task.join()
non_daemon_task.join()
threading_daemon_join

運行效果

[root@ mnt]# python3 threading_daemon_join.py 
(daemon    ) daemon Starting...
(non_daemon_task) non_daemon Starting...
(non_daemon_task) non_daemon Exiting...
(daemon    ) daemon Exiting...
#由於使用的join等待線程運行完成,才結束,所以
【daemon Exiting...】可以打印出來

 7、設置守護線程的超時時間,防止進入無限阻塞

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time
import logging

def daemon():
    logging.debug('daemon Starting...')
    time.sleep(0.2)
    logging.debug('daemon Exiting...')


def non_daemon():
    logging.debug('non_daemon Starting...')
    logging.debug('non_daemon Exiting...')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s'
)

daemon_task = threading.Thread(name='daemon', target=daemon, daemon=True)  # 設置為首護線程
non_daemon_task = threading.Thread(name='non_daemon_task', target=non_daemon)

daemon_task.start()
non_daemon_task.start()

daemon_task.join(0.1) #設置的超時時間0.1秒,因為該任務的函數睡眠0.2s,所以沒有運行完成,就已經超時,結束該守護線程
print('isAlive()', daemon_task.isAlive())
non_daemon_task.join()
threading_daemon_join_timeout.py

運行效果

[root@ mnt]# python3 threading_daemon_join_timeout.py 
(daemon    ) daemon Starting...
(non_daemon_task) non_daemon Starting...
(non_daemon_task) non_daemon Exiting...
isAlive() True #這里是判斷線程是否在運行,True表在線程還在運行

 8、利用threading.enumerate()枚舉的方法,設置守護線程等待運行完成

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time
import logging
import random

def worker():
    pause = random.randint(1, 5) / 10
    logging.debug('睡眠 %0.2f 秒', pause)
    time.sleep(pause)
    logging.debug('worker 結束')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s'
)

for i in range(3):
    task = threading.Thread(target=worker, daemon=True)
    task.start()

main_thread = threading.main_thread()
for task_obj in threading.enumerate(): #threading.enumerate():返回當前運行守護線程的實例
    if task_obj is main_thread:
        continue
    logging.debug('等待 %s', task_obj.getName())
    task_obj.join()
threading_enumerate.py

運行效果

[root@ mnt]# python3 threading_enumerate.py 
(Thread-1  ) 睡眠 0.30 秒
(Thread-2  ) 睡眠 0.10 秒
(Thread-3  ) 睡眠 0.10 秒
(MainThread) 等待 Thread-1
(Thread-2  ) worker 結束
(Thread-3  ) worker 結束
(Thread-1  ) worker 結束
(MainThread) 等待 Thread-2
(MainThread) 等待 Thread-3

 9、利用繼承threading.Thread類,實現無參的多線程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging

class MyThread(threading.Thread):
    def run(self):
        logging.debug('運行...')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    task = MyThread()
    task.start()
threading_subclass.py

運行效果

[root@ mnt]# 
[root@python-mysql mnt]# python3 threading_subclass.py 
(Thread-1  ) 運行...
(Thread-2  ) 運行...
(Thread-3  ) 運行...
(Thread-4  ) 運行...
(Thread-5  ) 運行...

  10、利用繼承threading.Thread類,實現有參的多線程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging

class MyThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None, daemon=None, args=(), kwargs=None):
        super(MyThread, self).__init__(group=group, target=target, name=name, daemon=daemon)
        self.args = args
        self.kwargs = kwargs

    def run(self):
        logging.debug('運行...args : %s,kwargs : %s', self.args, self.kwargs)

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    task = MyThread(args=(i,), kwargs={'a': 'A', 'b': 'B'})
    task.start()
threading_subclass_args.py

運行效果

[root@ mnt]# python3 threading_subclass_args.py 
(Thread-1  ) 運行...args : (0,),kwargs : {'a': 'A', 'b': 'B'}
(Thread-2  ) 運行...args : (1,),kwargs : {'a': 'A', 'b': 'B'}
(Thread-3  ) 運行...args : (2,),kwargs : {'a': 'A', 'b': 'B'}
(Thread-4  ) 運行...args : (3,),kwargs : {'a': 'A', 'b': 'B'}
(Thread-5  ) 運行...args : (4,),kwargs : {'a': 'A', 'b': 'B'}

 11、定時器線程threading.Timer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

def delayed():
    logging.debug('delayed運行...')

logging.basicConfig(
    level=logging.DEBUG,
    format="(%(threadName)-10s) %(message)s"
)

task_1 = threading.Timer(0.3, delayed)
task_1.setName('task_1')

task_2 = threading.Timer(0.3, delayed)
task_2.setName('task_2')

logging.debug('開始運行Timer')
task_1.start()
task_2.start()

logging.debug('取消前等待')
time.sleep(0.2)
logging.debug('取消 %s' % task_2.getName())
task_2.cancel()
logging.debug('取消完成')
threading_timer.py

運行效果

[root@ mnt]# python3 threading_timer.py 
(MainThread) 開始運行Timer
(MainThread) 取消前等待
(MainThread) 取消 task_2
(MainThread) 取消完成
(task_1    ) delayed運行...
#task_2已經被取消,所以沒有運行

 12、線程間信號相互傳送threading.Event

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

def wait_for_event(event_obj):
    logging.debug('等待事件的開始')
    event_is_set = event_obj.wait()
    logging.debug('事件設置 %s' % event_is_set)

def wait_for_event_timeout(event_obj, timeout):
    while not event_obj.is_set():
        logging.debug('等待事件超時開始')
        event_is_set = event_obj.wait(timeout)
        if event_is_set:
            logging.debug('處理事件')
        else:
            logging.debug('做其他工作')


logging.basicConfig(
    level=logging.DEBUG,
    format="(%(threadName)-10s) %(message)s"
)

event_obj = threading.Event()
task_1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(event_obj,)
)
task_1.start()

task_2 = threading.Thread(
    name='nonblock',
    target=wait_for_event_timeout,
    args=(event_obj, 2)
)
task_2.start()

logging.debug('在呼叫前等待 Event.set()')
time.sleep(0.3)
event_obj.set()
logging.debug('事件已經設置')
threading_event.py

運行效果

[root@ mnt]# python3 threading_event.py 
(block     ) 等待事件的開始
(nonblock  ) 等待事件超時開始
(MainThread) 在呼叫前等待 Event.set()
(MainThread) 事件已經設置
(block     ) 事件設置 True
(nonblock  ) 處理事件

 13、控制資源訪問_計數器_多線程加阻塞鎖的示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time
import random

class Counter(object):
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        logging.debug('等待鎖')
        self.lock.acquire()
        try:
            logging.debug('獲取鎖')
            self.value += 1
        finally:
            self.lock.release()

def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('睡眠 %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('worker運行結束')

logging.basicConfig(
    level=logging.DEBUG,
    format="(%(threadName)-10s) %(message)s"
)

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('等待線程worker運行結束')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)
threading_lock.py

 運行效果

[root@mnt]# python3 threading_lock.py 
(Thread-1  ) 睡眠 0.36
(Thread-2  ) 睡眠 0.77
(MainThread) 等待線程worker運行結束
(Thread-1  ) 等待鎖
(Thread-1  ) 獲取鎖
(Thread-1  ) 睡眠 0.43
(Thread-2  ) 等待鎖
(Thread-2  ) 獲取鎖
(Thread-2  ) 睡眠 0.12
(Thread-1  ) 等待鎖
(Thread-1  ) 獲取鎖
(Thread-1  ) worker運行結束
(Thread-2  ) 等待鎖
(Thread-2  ) 獲取鎖
(Thread-2  ) worker運行結束
(MainThread) Counter: 4  #運行4次,所以顯示4

  14、控制資源訪問_計數器_多線程非阻塞鎖的示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

def lock_holder(lock_obj):
    logging.debug('lock_holder 開始')
    while True:
        lock_obj.acquire()
        try:
            logging.debug('Holding')
            time.sleep(0.5)
        finally:
            logging.debug('Not Holding')
            lock_obj.release()
        time.sleep(0.5)

def worker(lock_obj):
    logging.debug('worker 開始')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('嘗試獲取鎖')
        have_it = lock_obj.acquire(0)  # 主要核心代碼在這里,不斷嘗試獲取鎖,通過這個判斷鎖是否釋放可以獲取
        try:
            num_tries += 1
            if have_it:
                logging.debug('重試次數 %d: 得到鎖', num_tries)
                num_acquires += 1
            else:
                logging.debug('重試次數 %d: 沒有得到鎖', num_tries)
        finally:
            if have_it:
                lock_obj.release()
    logging.debug('獲取鎖一共嘗試的 %d 次', num_tries)


logging.basicConfig(
    level=logging.DEBUG,
    format="(%(threadName)-10s) %(message)s"
)

lock = threading.Lock()

holder = threading.Thread(
    target=lock_holder,
    args=(lock,),
    name='LockHolder',
    daemon=True,
)
holder.start()

worker = threading.Thread(
    target=worker,
    args=(lock,),
    name='Worker',
)
worker.start()
threading_lock_nonblock.py

運行結果

[root@ mnt]# python3 threading_lock_nonblock.py 
(LockHolder) lock_holder 開始
(LockHolder) Holding
(Worker    ) worker 開始
(LockHolder) Not Holding
(Worker    ) 嘗試獲取鎖
(Worker    ) 重試次數 1: 得到鎖
(LockHolder) Holding
(Worker    ) 嘗試獲取鎖
(Worker    ) 重試次數 2: 沒有得到鎖
(LockHolder) Not Holding
(Worker    ) 嘗試獲取鎖
(Worker    ) 重試次數 3: 得到鎖
(LockHolder) Holding
(Worker    ) 嘗試獲取鎖
(Worker    ) 重試次數 4: 沒有得到鎖
(LockHolder) Not Holding
(Worker    ) 嘗試獲取鎖
(Worker    ) 重試次數 5: 得到鎖
(Worker    ) 獲取鎖一共嘗試的 5 次
#嘗試的次數是隨機,這樣子的好處,不會因為阻塞占有CPU資源

 15、互斥鎖

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading

lock = threading.Lock()

print('第一個鎖', lock.acquire())
print('第二個鎖', lock.acquire(0))
threading_lock_reacquire.py

運行效果

[root@ mnt]# python3 threading_lock_reacquire.py
第一個鎖 True
第二個鎖 False

16、同步鎖

import threading

lock = threading.RLock()

print('第一個鎖', lock.acquire())
print('第二個鎖', lock.acquire(0))
threading_rlock.py

運行效果

[root@ mnt]# python3 threading_rlock.py 
第一個鎖 True
第二個鎖 True

 17、利用with管理鎖,無需每次都釋放鎖lock.release()

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging

def worker_with(lock):
    with lock:
        logging.debug('當前運行是用with 獲取鎖')

def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('當前運行是用 lock.acquire() 獲取鎖')
    finally:
        lock.release()

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()

with_open_lock = threading.Thread(target=worker_with, args=(lock,))
no_with_open_lock = threading.Thread(target=worker_no_with, args=(lock,))

with_open_lock.start()
no_with_open_lock.start()
threading_lock_with.py

運行效果

[root@ mnt]# python3 threading_with_lock.py 
(Thread-1  ) 當前運行是用with 獲取鎖
(Thread-2  ) 當前運行是用 lock.acquire() 獲取鎖

 18、利用threading.Condition(),實現線程同步,下面是生產者和消費者的示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

def consumer(condition_obj):
    """消費者"""
    logging.debug('消費者線程開啟')
    with condition_obj:
        condition_obj.wait()
        logging.debug('資源可供消費者使用。')

def producer(condition_obj):
    """生產者"""
    logging.debug('生產者線程開啟')
    with condition_obj:
        logging.debug('生產可用資源')
        condition_obj.notifyAll()

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

condition_obj = threading.Condition()

# 消費者實例1
c1 = threading.Thread(
    name='c1',
    target=consumer,
    args=(condition_obj,)
)

# 消費者實例1
c2 = threading.Thread(
    name='c2',
    target=consumer,
    args=(condition_obj,)
)

# 生產者實例
p = threading.Thread(
    name='p',
    target=producer,
    args=(condition_obj,)
)

c1.start()
time.sleep(0.2) 
c2.start()
time.sleep(0.2)
p.start()
threading_condition.py

 運行效果

[root@ mnt]# python3 threading_condition.py 
(c1        ) 消費者線程開啟
(c2        ) 消費者線程開啟
(p         ) 生產者線程開啟
(p         ) 生產可用資源
(c1        ) 資源可供消費者使用。
(c2        ) 資源可供消費者使用。

 19、線程同步threading.Barrier() ,作用:等待所有線程一起開啟后,再全部一起執行主要的功能

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

def worker(barrier):
    logging.debug('當前線程名字: %s 與 %s 其他人一起等待后面的功能' % (threading.current_thread().name, barrier.n_waiting))
    worker_id = barrier.wait()
    logging.debug('%s 已經等待完畢 %s' % (threading.current_thread().name, worker_id))

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,)
    ) for i in range(NUM_THREADS)
]  # 這里是實例化3個線程,[<Thread(worker-0, initial)>, <Thread(worker-1, initial)>, <Thread(worker-2, initial)>]

for thread_obj in threads:
    logging.debug('%s 開啟' % thread_obj.name)
    thread_obj.start()
    time.sleep(0.2)

for thread_obj in threads:
    thread_obj.join()
threading_barrier.py

運行效果

[root@ mnt]# python3 threading_barrier.py 
(MainThread) worker-0 開啟
(worker-0  ) 當前線程名字: worker-00 其他人一起等待后面的功能
(MainThread) worker-1 開啟
(worker-1  ) 當前線程名字: worker-11 其他人一起等待后面的功能
(MainThread) worker-2 開啟
(worker-2  ) 當前線程名字: worker-22 其他人一起等待后面的功能
(worker-2  ) worker-2 已經等待完畢 2
(worker-0  ) worker-0 已經等待完畢 0
(worker-1  ) worker-1 已經等待完畢 1

 20、線程同步barrier.abort()中斷的操作,作用:等待所有線程運行后面的功能,然后對它進行中斷的操作,使用停止運行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

def worker(barrier):
    logging.debug('當前線程名字: %s 與 %s 其他人一起等待后面的功能' % (threading.current_thread().name, barrier.n_waiting))
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.debug('%s 中斷', threading.current_thread().name)
    else:
        logging.debug('%s 已經等待完畢 %s' % (threading.current_thread().name, worker_id))

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

NUM_THREADS = 3

# barrier.abort()必須多加一個線程,所以這里需要加1
barrier = threading.Barrier(NUM_THREADS + 1)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,)
    ) for i in range(NUM_THREADS)
]  # 這里是實例化3個線程,[<Thread(worker-0, initial)>, <Thread(worker-1, initial)>, <Thread(worker-2, initial)>]

for thread_obj in threads:
    logging.debug('%s 開啟' % thread_obj.name)
    thread_obj.start()
    time.sleep(0.1)

#中斷線程一起同步運行
barrier.abort()

for thread_obj in threads:
    thread_obj.join()
threading_barrier_abort.py

運行效果

[root@ mnt]# python3 threading_barrier_abort.py 
(MainThread) worker-0 開啟
(worker-0  ) 當前線程名字: worker-00 其他人一起等待后面的功能
(MainThread) worker-1 開啟
(worker-1  ) 當前線程名字: worker-11 其他人一起等待后面的功能
(MainThread) worker-2 開啟
(worker-2  ) 當前線程名字: worker-22 其他人一起等待后面的功能
(worker-0  ) worker-0 中斷
(worker-1  ) worker-1 中斷
(worker-2  ) worker-2 中斷

 21、threading.Semaphore(),自定義線程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import logging
import time

class ActivePool(object):
    """活動池"""

    def __init__(self):
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self, name):
        """獲取一個鎖,把活動的名字增加於列表中"""
        with self.lock:
            self.active.append(name)
            logging.debug('運行:%s' % self.active)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('運行:%s', self.active)


def worker(semaphore_obj, pool):
    logging.debug('正在等待加入池')
    with semaphore_obj:
        name = threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-10s) %(message)s',
)

pool = ActivePool()
semaphore_obj = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(
        target=worker,
        name=str(i),
        args=(semaphore_obj, pool)
    )
    t.start()
threading_semaphore.py

運行效果

[root@ mnt]# python3 threading_semaphore.py 
2019-12-08 15:31:47,387 (0         ) 正在等待加入池
2019-12-08 15:31:47,387 (0         ) 運行:['0']
2019-12-08 15:31:47,388 (1         ) 正在等待加入池
2019-12-08 15:31:47,389 (1         ) 運行:['0', '1']
2019-12-08 15:31:47,389 (2         ) 正在等待加入池
2019-12-08 15:31:47,390 (3         ) 正在等待加入池
2019-12-08 15:31:47,489 (0         ) 運行:['1']
2019-12-08 15:31:47,491 (1         ) 運行:[]
2019-12-08 15:31:47,492 (2         ) 運行:['2']
2019-12-08 15:31:47,494 (3         ) 運行:['2', '3']
2019-12-08 15:31:47,593 (2         ) 運行:['3']
2019-12-08 15:31:47,596 (3         ) 運行:[]

 22、threading.local(),本地線程任務運行隔離

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import threading
import logging

def show_value(local_obj):
    try:
        val = local_obj.value
    except AttributeError:
        logging.debug('值不存在')
    else:
        logging.debug('value=%s', val)

def worker(local_obj):
    show_value(local_obj)
    local_obj.value = random.randint(1, 100)
    show_value(local_obj)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-10s) %(message)s',
)

local_obj = threading.local()
# 第一次運行沒有設置value值,會報AttributeError異常
show_value(local_obj)

# 第二次運行有設置value值,所以會顯示出值
local_obj.value = 1000
show_value(local_obj)

for i in range(2):
    t = threading.Thread(
        target=worker,
        args=(local_obj,),
    )
    t.start()
threading_local.py

運行效果

[root@ mnt]# python3 threading_local.py
2019-12-08 16:23:17,696 (MainThread) 值不存在
2019-12-08 16:23:17,697 (MainThread) value=1000
2019-12-08 16:23:17,697 (Thread-1  ) 值不存在
2019-12-08 16:23:17,698 (Thread-1  ) value=26
2019-12-08 16:23:17,698 (Thread-2  ) 值不存在
2019-12-08 16:23:17,698 (Thread-2  ) value=39

 23、threading.local(),本地線程任務運行隔離,初始化全局變量值

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import threading
import logging

def show_value(local_obj):
    try:
        val = local_obj.value
    except AttributeError:
        logging.debug('值不存在')
    else:
        logging.debug('value=%s', val)

def worker(local_obj):
    show_value(local_obj)
    local_obj.value = random.randint(1, 100)
    show_value(local_obj)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-10s) %(message)s',
)

class MyLocal(threading.local):
    def __init__(self, value, *args, **kwargs):
        super(MyLocal, self).__init__(*args, **kwargs)
        logging.debug('初始化值 %r', self)
        self.value = value

local_obj = MyLocal(1000)
show_value(local_obj)

for i in range(2):
    t = threading.Thread(
        target=worker,
        args=(local_obj,)
    )
    t.start()
threading_local.default.py

測試效果

[root@ mnt]# python3 threading_local.default.py 
2019-12-08 16:28:48,168 (MainThread) 初始化值 <__main__.MyLocal object at 0x7fe58b6e1408>
2019-12-08 16:28:48,168 (MainThread) value=1000
2019-12-08 16:28:48,169 (Thread-1  ) 初始化值 <__main__.MyLocal object at 0x7fe58b6e1408>
2019-12-08 16:28:48,169 (Thread-1  ) value=1000
2019-12-08 16:28:48,169 (Thread-1  ) value=11
2019-12-08 16:28:48,170 (Thread-2  ) 初始化值 <__main__.MyLocal object at 0x7fe58b6e1408>
2019-12-08 16:28:48,170 (Thread-2  ) value=1000
2019-12-08 16:28:48,170 (Thread-2  ) value=7


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM