Python之线程和cpu多道技术


一 多道技术

   1 技术背景

      cpu在执行一个任务过程中,若需要操作硬盘的指令,指令一旦发出,硬盘上的机械手臂滑动读取数据到内存中,这一段时间,cpu需要等待,时间可能很短,但对于cpu来说已经很长很长,长到可以让cpu做很多其他的任务,如果我们让cpu在这段时间内切到去做其他任务,这样cpu不就充分利用了吗?这正是多道技术产生的技术背景.

   2 多道技术的含义

    多到技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用.

    空间上的复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序.

 

 

 

 时间上的复用:当一个程序在等待I/O,另一个程序可以使用cpu,如果内存中可以同时存放足够多的作业,则cpu的利用率可以接近100%.(操作系统采用了多道技术后,可以控制进程的切换,或者说进程之间去争抢cpu的执行权限.这种切换不仅会在一个进程遇到io时进行,若一个进程占用cpu时间过程也会切换,或者说被操作系统夺走cpu的执行权限)

 

 在A程序计算时,I/O空闲,A程序I/O操作时,CPU空闲(B程序也是同样);必须A工作完成后,B 才能进入内存中开始工作,两者是串行的,全部完成共需时间=T1+T2

 

 空间上复用最大的问题是:程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制,若内存彼此不分割,则一个程序可以访问另一个程序的内存.

 首先丧失的是安全性,比如qq程序可以访问操作系统的内存,这意味着qq可以拿到操作系统的所有权限.

 其次丧失的是稳定性,某个程序崩溃时可能把别的程序的内存也回收了,比方说把操作系统的内存给回收了,则操作系统崩溃

3 分时系统

 由于CPU速度不断提高和采用分时技术,一台计算机可同时连接多个用户终端,而每个用户可在自己的终端上联机使用计算机,好像自己独占机器一样.

 分时技术:把处理机的运行时间分成很短的时间片,按时间片轮流把处理机分配给各联机作业使用.

       若某个作业在分配给它的时间片内不能完成其计算,则该作业暂时中断,把处理机让给另一个作业使用,等待下一轮时再继续其运行.由于计算机速度很快作业轮转得很快,给每个用户的印象是,好像他独占了一台计算机.而每个用户可以通过自己的终端向系统发出各种操作控制命令,在充分的人机交互情况下,完成作业的运行.

       具有上述特征的计算机系统称为分时系统,它允许多个用户同时联机使用计算机.

      特点:

        ①多路性.若干个用户同时使用一台计算机.微观上看是各用户轮流使用计算机;宏观上看是各用户并行工作.

        ②交互性.用户可根据系统对请求的响应结果,进一步向系统提出新的请求.这种能使用户与系统进行人机对话的工作方式,明显地有别于批处理系统,因而,分时系统又被称为交互式系统.

        ③独立性.用户之间可以相互独立操作,互不干扰.系统保证各用户程序运行的完整性.不会发生相互混淆或破坏现象.

        ④及时性.系统可对用户的输入及时作出响应.分时系统性能的主要指标之一是响应时间.它是指:从终端发出命令到系统予以应答所需的时间.

二 线程

 1 什么是线程

    在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程.

        线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程

        车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线,所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位.

   多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源.

   Python3对多线程支持的是threading模块,应用这个模块,可以创建多线程程序,并且在多线程间进行同步和通信.在Python3中,可以通过以两种方式来创建线程.即通过threading.Thread直接在线程中运行函数;通过继承threading.Thread类来创建线程

 2 为何要用多线程

  多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程.详细的讲分为4点:

    ①多线程共享一个进程的地址空间.

    ②线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用.

   ③若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量I/O处理,拥有多个线程允许这些活动批次重叠运行,从而加快程序执行的速度.

  ④在多cou系统中,为了最大限度的利用多核,可以开启多个线程,比开启进程开销小的多(这一条并不适用于python.详见GIL)

 3 开启线程的两种方式

  ①,使用threading.Thread直接在线程中运行函数

    threading.Thread的基本使用方法如下:

     Thread(group=None,target=None,name=None,args=(),kwargs={},*,demon=None)

     其中target参数就是要运行的函数,args是传入函数的参数元组.

#!/usr/bin/env python
#-*coding:utf-8-*-
import threading
import time


def sayhi(num):
    print('running on number:%s'%num)
    time.sleep(3)

if __name__=='__main__':
    t1=threading.Thread(target=sayhi,args=(1,))
    t2=threading.Thread(target=sayhi,args=(2,))
    t1.start()
    t2.start()
通过threading.Thread直接在线程中运行函数

   ②通过继承threading.Thread类来创建线程

 这种方法只要重载threading.Thread类的run方法,然后调用类的start()就能够创建线程,并运行run()函数中的代码;继承threading.Thread()类中的子类中,如果需要重载__init__()方法,必须首先调用父类的__init__()方法,否则会引发AttributeError异常.

#!/usr/bin/env python
#-*coding:utf-8-*-
import threading
import time

class MyThread(threading.Thread):
    def __init__(self,num):
        super().__init__()
        self.num=num

    def run(self):
        print('running on number:%s'%self.num)
        time.sleep(3)
if __name__=='__main__':
    t1=MyThread(1)
    t2=MyThread(2)
    t1.start()
    t2.start()
    print('ending......')
通过继承的方式创建线程

 4 线程相关的方法

   (1)threading.thread的实例方法

      ①join方法

           作用:当某个线程或函数执行时需等待另一个线程完成操作后才能继续,则应调用另一个线程的join()方法;其中的可选参数timeout用于指定线程运行的最长时间

#!/usr/bin/env python
#-*coding:utf-8-*-
import threading
import time

def thrfun(x,y,thr=None):
    if thr:
        thr.join()
    else:
        time.sleep(2)
    for i in range(x,y):
        print(str(i*i)+';')

ta=threading.Thread(target=thrfun,args=(1,6))
tb=threading.Thread(target=thrfun,args=(16,21,ta))
ta.start()
tb.start()

----------------------运行结果--------------------
D:\代码\MyDjango\venv\Scripts\python.exe D:/代码/MyDjango/Python基础学习/进程和线程/thread1.py
1;
4;
9;
16;
25;
256;
289;
324;
361;
400;

Process finished with exit code 0
join方法示例

      ②isAlive()方法

           作用:用于查看线程是否运行

      ③daemon&setDaemon(True)

           作用:daemon属性和setDaemon(True)方法作用一样,将线程声明为守护线程,必须在start()方法调用之前设置.当我们在程序运行时,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就兵分两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成.若子线程未完成,则主线程会等待子线程完成后再退出.但是有时候我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemin方法或daemon属性了,即用来设置线程是否随主线程退出而退出,一般来说,其属性值为True时会随主线程退出而退出;

#!/usr/bin/env python
#-*coding:utf-8-*-
import threading
import time

class myThread(threading.Thread):
    def __init__(self,mynum):
        super().__init__()
        self.mynum=mynum

    def run(self):
        time.sleep(1)
        for i in range(self.mynum,self.mynum+5):
            print(str(i*i)+';')

def main():
    print('start...')
    ma=myThread(1)
    mb=myThread(16)
    #ma.daemon=True
    ma.setDaemon(True)
    #mb.daemon=True
    mb.setDaemon(True)
    ma.start()
    mb.start()
    print('end...')
if __name__=='__main__':
    main()
setdaemon&daemon示例
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必须在t.start()之前设置
    t.start()

    print('主线程')
    print(t.is_alive())
    '''
    主线程
    True
    '''
守护线程示例2

 注: 关于守护线程的注意点

      ①无论是进程还是线程,都遵循:守护XXX会等待主XXX运行完毕后被销毁.

      需要强调的是:运行完毕并非终止运行

#1.对主进程来说,运行完毕指的是主进程代码运行完毕

#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕


-------详细解释--------
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,

#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
运行完毕的解释

 (2) threading模块提供的一些方法

  threading.currentThread(): 返回当前的线程变量。

  threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

     threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果

5 同步锁(Lock)

import time
import threading

def addNum():
    #在每个线程中都获取这个全局变量
    global num
    temp=num
    time.sleep(0.001)
    #对此公共变量进行-1操作
    num=temp-1
num=100 #设定一个共享变量
thread_list=[]

for i in range(100):
    t=threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:#等待所有子线程执行完毕
    t.join()
print('final num:',num)


--------------执行结果----------------
预期结果为0,但实际不为0
原因:当一个子线程正在执行函数时,还没有执行完毕cpu轮询时间就到了,CPU的使用权强制交给了下一个子线程,造成共享数据破坏
问题引出

 

 

 

 

解决方案:可使用同步锁来解决上述问题

    当一个进程拥有多个线程之后,如果它们各做各的任务没有关系还行,可是既然同属于一个进程,它们之间总是具有一定关系的.比如多个线程都要多某个数据进行修改,则可能会出现不可预料的结果.为了保证操作正确,就要对多个线程进行同步.

  Python中可以使用threading模块中的对象Lock和RLock进行简单的线程同步.对于同一时刻只允许一个线程操作的数据对象,可以把操作过程放在Lock和RLock的acquire方法和release方法之间.RLock可以在同一调用链中多次请求而不会锁死,Lock则会锁死.(详见下面的死锁)

 注:acquire和release这一对方法若前一个调用n次,后一个也要调用n次,锁才能真正地释放.

import time
import threading

R=threading.Lock()

def addNum():
    #在每个线程中都获取这个全局变量
    global num
    #加入锁
    R.acquire()
    temp=num
    time.sleep(0.001)
    #对此公共变量进行-1操作
    num=temp-1
    #释放锁
    R.release()
num=100 #设定一个共享变量
thread_list=[]

for i in range(100):
    t=threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:#等待所有子线程执行完毕
    t.join()
print('final num:',num)
解决方案

6 死锁现象与递归锁

死锁:是指两个或两个以上的进程或线程在执行过程中,因抢夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这样永远在互相等待的进程称为死锁进程,如下就是死锁现象:

#!/usr/bin/env python
#-*coding:utf-8-*-
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''
死锁现象

解决方案:递归锁,在Python中为了支持在同一线程中多次请求同一资源,Python提供了可重入锁RLock.

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require.直到一个线程所有的acqiure都被release,其他的线程才能获得资源.若使用RLock代替Lock,则不会发生死锁:

#!/usr/bin/env python
#-*coding:utf-8-*-
import threading
import time
'''
一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,
这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
'''

mutexA=mutexB=threading.RLock()


class MyThread(threading.Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()
        
        mutexA.release()
       

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
死锁解决方案

7 信号量 Semaphore

      信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

      计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

      BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

from threading import Thread,Semaphore
import threading
import time

def func(sm):
    sm.acquire()
    print('%s get sm'%threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__=='__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func,args=(sm,))
        t.start()
信号量示例

 8 同步条件(Event)

 线程的一个关键特性是每个线程都是独立运行且状态不可预测.如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手.为了解决这些问题,我们需要使用threading库中的Event对象.对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生.在初始情况下,Event对象中的信号标志设置为假.若有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程会被一种阻塞直至该标志为真.一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程.如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行.

 

event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

 

import threading
import time
#演示了两个线程通过Event来唤醒对方,模拟任务对话

evt=threading.Event()
class myThreada(threading.Thread):
    def run(self):
        #print(self.name,'当前状态:', evt.is_set())
        evt.wait()
        print(self.name,':Good morning!')
        evt.clear()
        time.sleep(1)
        evt.set()
        time.sleep(1)
        evt.wait()
        print(self.name,"I'm fine,thank you.")

class myThreadb(threading.Thread):

    def run(self):
        #print(self.name,'当前状态:', evt.is_set())
        print(self.name,':Good morning!')
        evt.set()
        time.sleep(1)
        evt.wait()
        print(self.name,'How are you?')
        evt.clear()
        time.sleep(1)
        evt.set()
def main():
    John=myThreada()
    John.name="John"
    Smith=myThreadb()
    Smith.name='Smith'
    John.start()
    Smith.start()
if __name__=='__main__':
    main()
Event示例

 9 多线程利器---队列(queue)

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except Exception as e:
            print('----',a,e)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()
列表是不安全的数据结构
import queue
'''
队列的长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.
若maxsize小于1就表示队列长度无限
'''

q=queue.Queue(maxsize=3)

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。   class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。               class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。        class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
队列的相关方法

 

10 GIL全局解释锁

 <1>介绍

 

'''
定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''
结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

 

首先需要明确一点是GIL并不是Python的特性,它是在实现Python解释器(CPython)时所引入的一个概念.但是JPython就没有GIL.然而因为CPython是大部分环境下默认的Python执行环境.所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷.所以这里要明确一点:GIL并不是Python的特性,Python完全可以不依赖GIL

 <2>GIL介绍

   GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全.

   可以肯定的一点是:保护不同的数据安全,就应该加不同的锁.

   要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程.例如python aaa.py python bbb.py python ccc.py会产生3个不同的Python进程

   在一个python的进程中,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程中

#1 所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。

#2 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。

 综上:

    如果多个线程的target=work,那么执行流程是

    多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行

    解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图GIL,保证Python解释器同一时间只能执行一个任务的代码

 

 <3>GIL与Lock

GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图:

 

GIL与多线程的介绍:https://www.cnblogs.com/xiaoyuanqujing/protected/articles/11715730.html  密码:xiaoyuanqujing@666

线程的其他知识点:https://www.cnblogs.com/xiaoyuanqujing/protected/articles/11715702.html


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM