线程和进程

进程是资源分配的最小单位,线程是CPU调度的最小单位。

对比维度 多进程 多线程 总结
数据共享、同步 数据共享复杂,同步简单 数据共享简单,同步复杂 各有优劣
内存、CPU 占用内存多,切换复杂,CPU利用率低 占用内存少,切换简单,CPU利用率高 线程占优
创建、销毁、切换 复杂,速度慢 简单,速度快 线程占优
编程、调试 编程简单,调试简单 编程复杂,调试复杂 进程占优
可靠性 进程间不会互相影响 一个线程挂掉将导致整个进程挂掉 进程占优
分布式 适用于多核、多机,扩展到多台机器简单 适合于多核 进程占优

threading:基于线程的并行

CPython实现细节:由于GIL存在,cpython的多线程不是真的并行,每次只会执行一个线程。

threading.Thread

线程对象封装了要在单独的线程中运行的活动,所谓的活通常是callable对象。

重要的成员/方法:

  • 构造方法:Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) ,target指定要run的callable对象,args是target的位置参数,kwargs是target的关键字参数,name:线程的名称,默认是‘Thread-N’形式。
  • start(): 启动/运行线程,实际调用了run方法;
  • run(): 运行线程,默认执行target指定的callable对象,可以被用户覆盖;、
  • join(timeout=None): 等待线程结束,会阻塞调用的线程;

threading.Lock

线程锁,当一个线程得到锁后,其他尝试获取锁的线程会阻塞。

重要方法:

  • acquire(blocking=True, timeout=-1):非阻塞模式下,返回值表示是否获得锁;
  • release():释放锁;
  • locked(): 检查锁是否已被获得;

multiprocessing:基于进程的并行

该模块提供基于进程的并行接口。

创建管理进程模块:

  • Process(用于创建进程)
  • Pool(用于创建管理进程池)
  • Queue(用于进程通信,资源共享)
  • Value,Array(用于进程通信,资源共享)
  • Pipe(用于管道通信)
  • Manager(用于资源共享)

同步子进程模块:

  • Condition(条件变量)
  • Event(事件)
  • Lock(互斥锁)
  • RLock(可重入的互斥锁(同一个进程可以多次获得它,同时不会造成阻塞)
  • Semaphore(信号量)

context和启动方法

context视作multiprocessing模块的另一个接口。

根据平台不同,multiprocessing模块底层使用不同的方法产生新python进程。

spawn

spawn启动方法创建一个全新的Python解释器进程作为子进程,子进程不会继承主进程的内存状态,它们是完全独立的进程。

支持Unix和Windows平台,Windows和MacOS默认使用方式

fork

fork启动方法通过复制主进程创建一个子进程,子进程是主进程的副本,包括内存状态和资源。它们之间共享同一份内存状态,需要注意并发问题。

仅支持Unix,Unix默认方式

forkserver

创建一个forkserver进程,负责更安全地生成子进程。

支持通过管道传递文件描述符的Unix平台

可通过set_start_method()选择平台支持的启动进程方法,但只能调用一次 为了多次使用不同的启动方法,可以用 get_context() 方法获取一个context对象,跟multiprocessing接口相同

ctx1 = mp.get_context('spawn')
ctx2 = mp.get_context('fork')

The Process class

multiprocessing中,每一个进程都用一个Process类来表示。

构造方法:Process([group [, target [, name [, args [, kwargs]]]]])

调用Process对象的start()方法来生成进程

from multiporcessing import Process

def f(name):
     print('hello', name)

if __name__ == '__main__':
     p = Process(target=f, args=('bob',))
     p.start()
     p.join()

实例方法:

  • start():启动进程,并调用该子进程中的p.run()
  • run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
  • is_alive():返回进程是否在运行。如果p仍然运行,返回True
  • join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。

属性介绍:

  • daemon:默认值为False,如果设为True,代表p为后台运行的守护进程;当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程;必须在p.start()之前设置
  • name:进程的名称
  • pid:进程的pid
  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  • authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

此外,子进程可以通过os模块的方法获取进程相关信息:

  • os.getppid(),获取父进程pid
  • os.getpid(),获取本子进程pid

进程池

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

构造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])


from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

进程间交换对象

multiprocessing 支持进程之间两种通信通道: Queue: 进程间共享队列,线程和进程安全,是queue.Queue 的相似实现;

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Pipe: 提供一对管道,管道是双工的,既可以父发子收,也可以子发父收。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello,father.'])
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send([41, None, 'hello, son.'])
    p.join()

进程间共享状态

注意数据交换和数据共享的区别,应当尽量避免数据共享。

共享内存:Value Array

multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。

服务器进程:Manager

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager模块常与Pool模块一起使用。

进程间同步

这里“同步”指的是各进程不混淆,比如同一时刻只有一个进程打印标准输出。

注意对于通用的概念,既适用于进程,也适用于线程。

Lock(互斥锁)

mutex(互斥量)是锁机制的一种实现,当一个变量被一个进程获取后,除非释放,否则就无非被其他进程获取,为“互斥”,互斥的变量即为互斥量。

Lock锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

from multiprocessing import Process, Lock

def l(lock, num):

    lock.acquire()

    print("Hello Num: %s" % (num))

    lock.release()

if __name__ == '__main__':

    lock = Lock()  # 这个一定要定义为全局

    for num in range(20):

        Process(target=l, args=(lock, num)).start()

RLock(可重入锁)

RLock(可重入锁)是一个可以被同一个进程程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个进程拥有。拥有RLock的进程可以再次调用acquire(),释放锁时需要调用release()相同次数。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

Semaphore(信号量)

信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的进程数超过信号量时进程才阻塞。这允许了多个进程可以同时访问相同的代码区。

Condition(条件变量)

threading.Condition的别名。

条件变量的作用是用于多线程之间的线程同步。线程同步是指线程间需要按照预定的先后顺序进行的行为,比如我想要线程1完成了某个步骤之后,才允许线程2开始工作,这个时候就可以使用条件变量来达到目的。

Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。

构造方法:Condition([lock/rlock])

实例方法:

  • acquire([timeout]):首先进行acquire,然后判断一些条件。如果条件不满足则wait
  • release():释放 Lock
  • wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。处于wait状态的线程接到通知后会重新判断条件。
  • notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
  • notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
import multiprocessing
import time


def stage_1(cond):

    """perform first stage of work,

    then notify stage_2 to continue

    """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()


def stage_2(cond):

    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))


if __name__ == '__main__':
    condition = multiprocessing.Condition()

    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)]

    for c in s2_clients:
        c.start()
        time.sleep(1)

    s1.start()
    s1.join()

    for c in s2_clients:
        c.join()

Event(事件)

threading.Event 的克隆。

Event内部包含了一个标志位,初始的时候为false。可以使用set()来将其设置为true;或者使用clear()将其从新设置为false;可以使用is_set()来检查标志位的状态;另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

concurrent:高级并发API

标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。

本包目前包括的module:

  • futures:为异步执行可调用对象提供高级接口。

concurrent.futures基础组件是executor和future。

Executor

Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

submit()方法

Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。

Executor.submit(fn, *args, **kwargs)

  • fn:需要异步执行的函数
  • *args, **kwargs:fn参数
from concurrent import futures

def test(num):
    import time
    return time.ctime(), num

with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 1)
    print(future.result())

map()方法

除了submit,Exectuor还为我们提供了map方法,这个方法返回一个map(func, *iterables)迭代器,就像经常对列表用的map函数一样,区别是func是异步执行,且立刻收集而非lazy收集。

map()方法会自动处理任务的提交、结果的收集和异常的处理。

Executor.map(func, *iterables, timeout=None, chunksize=1)

  • func:需要异步执行的函数
  • *iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
  • timeout:设置每次异步操作的超时时间,timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。
  • 使用 ProcessPoolExecutor 时,此方法将可迭代对象分割成多个块,并将这些块作为单独的任务提交到池中。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。长列表建议使用大chunksize。
# 同一个线程进行map操作
results = map(task, items)
# 不同的线程进行异步map操作
# create a thread pool
executor = ThreadPoolExecutor(20)
# execute each function call in a separate thread
results = executor.map(task, items)

不同于submit()方法返回Future对象,map()方法返回一个可迭代对象,每个元素是执行后的返回值。

shutdown()方法

释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。

Executor.shutdown(wait=True, *, cancel_futures=False)

  • 如果 wait 为 True,则在所有挂起的 futures执行完毕并且与执行器关联的资源被释放之前,此方法将不会返回。
  • 如果 wait 为 False,则此方法将立即返回,但与执行器关联的资源要在futures都执行完毕后才释放。
  • 如果cancel_futures为True,所有尚未开始运行的futures都会取消执行。

使用with表达式,将默认调用Executor.shutdown(wait=True, cancel_futures=False)

Future

Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

Future类封装了可调用的异步执行。Future 实例通过 Executor.submit()方法创建。

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint


def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)


pool = ThreadPoolExecutor(5)
futures = []

for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

print(1)

for x in as_completed(futures):
    print(x.result())

print(2)

获得返回值

单个Future实例获得返回值,直接调用Future.result() 成员方法,调用会阻塞直到获得返回值。

concurrent.futures.as_completed(fs, timeout=None)

想要异步获得一批Future实例的返回值,可以使用concurrent.futures.as_completed() 方法。它是 concurrent.futures 模块中的一个函数,用于迭代返回已完成的 Future 对象。Future 对象代表一个异步操作的结果,可以是一个线程、进程或其他可执行任务。

as_completed() 函数接受一个 Future 对象的可迭代集合,并返回一个迭代器。在迭代过程中,它会不断返回已经完成的 Future 对象,无论它们是成功完成还是发生了异常。

关于异常

在使用线程/进程池时,如果线程池中的某个线程发生异常,该线程的异常通常不会导致整个线程池终止。相反,线程池会继续执行其他任务,并且异常的线程会被标记为已完成。

当在线程中发生异常时,异常会被捕获并存储在相应的 Future 对象中。

可以通过调用 Future.result() 方法获取 Future 对象的结果,进而处理异常。

subprocess:子进程交互

该模块提供接口生成子进程,连接到它们的输入/输出/错误管道,并获取它们的返回码。

subprocess.run

推荐接口,执行args指定的命令,等待命令执行完毕,返回CompletedProcess 实例对象。其实它包装了PopenPopen.communicate()

>>> subprocess.run(["ls", "-l"])  # doesn't capture output
CompletedProcess(args=['ls', '-l'], returncode=0)

>>> subprocess.run(["ls", "-l", "/dev/null"], capture_output=True)
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0,
stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n', stderr=b'')

>>> import subprocess
>>> import sys
>>> result = subprocess.run(["python", "-c", "print('ocean')"])
ocean

使用run()接口可以实现一些便捷的操作,如:

执行命令,并获得命令的返回值。

run(...).returncode

执行命令,并检查命令是否执行成功。如果失败则抛出异常。

run(..., check=True)

执行命令,并获得命令的标准输出/标准错误。如果失败则抛出异常。

run(..., check=True, stdout=PIPE).stdout

subprocess.Popen

高级接口,可以和子进程的输入/输出交互。创建子进程即开始执行。

import subprocess
# 创建子进程,标准输出和标准错误保存到管道
process = subprocess.Popen(shell_cmd,
                     stdin = subprocess.PIPE,
                     stdout = subprocess.PIPE, 
                     stderr = subprocess.PIPE,
                     text = True,
                     shell = True
                     )
# 可以交互,子进程在显式调用后才会结束
std_out, std_err = process.communicate(input=data)
# 此时子进程才结束,否则会等待直到超时
std_out.strip(), std_err

subprocess.PIPE 本身是一个整数,指代的是管道(文件描述符),即内存中一块空间。它的size是有限的,所以当管道数据满了,未被取走时,要么进程会阻塞(I/O阻塞),要么数据会丢失(网络数据)。

Popen的关键属性和方法:

  • stdin/stdout/stderr:如果开启的话,根据text标志,是一个TextIOWrapperBufferedReader ;
  • pid
  • returncode:communicate之后,子进程结束后才会有返回值。
  • Popen.communicate(input=None, timeout=None) :和子进程进行交互,给标准输入发送输出,从标准输出和标准错误中读取数据直到EOF,并且设置返回码。然后等待子进程执行结束,可设置超时。
  • Popen.wait(timeout=None) : 等待子进程结束,不会取任何数据,所以可能因为管道满了而阻塞。
  • Popen.kill() :杀死子进程,发送SIGKILL。
  • Popen.terminate() :终止子进程,发送SIGTERM。

就像linux的管道用法一样,PIPE可以方便的将两个子命令串联。

p1 = Popen(["dmesg"], stdout=PIPE)
p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]

queue:同步队列

底层利用锁,实现了多进程/线程之间的数据交换安全。

该模块实现了三个数据对象:

  • Queue(maxsize=0):FIFO,先进先出队列;
  • LifoQueue(maxsize=0):LIFO,后进先出队列,类似栈;
  • PriorityQueue(maxsize=0):优先队列,最小值先出队列,底层使用heapq,即堆实现; maxsize表示队列中最多同时存在的对象数量,达到上限插入新对象会被阻塞,等于0表示对象数量无限制。

信号和终止进程

终止进程可以调用p.terminate()或p.kill(),区别在于发送了不同的OS信号,大部分情况等价,都不会终止子进程。

SIGTERM可以被进程捕获并处理,相当于可以被用户代码捕获并作反应。

The SIGTERM signal is sent to a process to request its termination. Unlike the SIGKILL signal, it can be caught and interpreted or ignored by the process. This allows the process to perform nice termination releasing resources and saving state if appropriate. SIGINT is nearly identical to SIGTERM.

python的signal库可以用于捕获信号和注册处理函数。

# SuperFastPython.com
# example of terminating a child process, and handling the signal
from time import sleep
from multiprocessing import Process
from signal import signal
from signal import SIGTERM
import sys

# handle signal
def handler(sig, frame):
    print('Child process cleaning up...')
    sleep(2)
    # kill the process
    sys.exit(0)

# custom task function
def task():
    # handle sigterm
    signal(SIGTERM, handler)
    # execute a task in a loop
    while True:
        # block for a moment
        sleep(1)
        # report a message
        print('Worker process running...', flush=True)

# entry point
if __name__ == '__main__':
    # create a process
    process = Process(target=task)
    # run the process
    process.start()
    # wait for a moment
    sleep(5)
    # terminate the process
    process.terminate()
    # continue on...
    print('Parent is continuing on...')

SIGKILL信号不可以被进程捕获或忽略,相当于强制命令。

The SIGKILL signal is sent to a process to cause it to terminate immediately (kill). In contrast to SIGTERM and SIGINT, this signal cannot be caught or ignored, and the receiving process cannot perform any clean-up upon receiving this signal.


<
Previous Post
元类
>
Next Post
异步IO和协程