快速了解Python并發編程的工程實現(上)

關于我
一個有思想的程序猿,終身學習實踐者,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是我們團隊的主要技術棧。
Github:https://github.com/hylinux1024
微信公眾號:終身開發者(angrycode)

0x00 前言

前面的文章中Python協程的概念和實現做了簡單地介紹。為了對Python并發編程有更加全面地認識,我也對Python線程和進程的概念和相關技術的使用進行了學習,于是有了這篇文字。

0x01 線程與進程

當我們在手機或者PC上打開一個應用時,操作系統就會創建一個進程實例,并開始執行進程里的主線程,它有獨立的內存空間和數據結構。線程是輕量級的進程。在同一個進程里,多個線程共享內存和數據結構,線程間的通信也更加容易。

0x02 使用線程實現并發

熟悉Java編程的同學就會發現Python中的線程模型與Java非常類似。下文我們將主要使用Python中的線程???code>threading包。(對于低級別的API???code>thread不推薦初學者使用。本文所有代碼將使用Python 3.7的環境)

threading

要使用線程我們要導入threading包,這個包是在_thread包(即上文提到的低級別thread??椋┑幕∩戲庾傲誦磯喔嘸兜?code>API,在開發中應當首選threading包。

常見地,有兩種方式構建一個線程:通過Thread的構造函數傳遞一個callable對象,或繼承Thread類并重寫run方法。

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)


if __name__ == '__main__':
    start_time = time.time()
    
    t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
    t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')

    t1.start()
    t2.start()
    
    # join方法讓主線程等待子線程執行完畢
    t1.join()
    t2.join()
    print("\nduration {} ".format(time.time() - start_time))
    
# do in thread 1
# do in thread 2
# duration 2.001628875732422 

還可以通過繼承threading.Thread類定義線程

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)

    
class MyThread(threading.Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):
        start_time = time.time()
        do_in_thread(self.arg)
        print("duration {} ".format(time.time() - start_time))


def start_thread_2():
    start_time = time.time()

    print("duration {} ".format(time.time() - start_time))


if __name__ == '__main__':
    mt1 = MyThread(3)
    mt2 = MyThread(4)

    mt1.start()
    mt2.start()
    
    # join方法讓主線程等待子線程執行完畢
    mt1.join()
    mt2.join() 
    
    
# do in thread 3
# do in thread 4
# duration 2.004937171936035 

join方法的作用是讓調用它的線程等待其執行完畢。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

定義線程時可以通過指定構造方法的name參數設置線程名稱。
target用于指定callable對象,將在run方法中被調用。
args設置target對象被調用時的參數,類型是元組(),例如上文中的do_in_thread(arg)方法的參數。
kwargs是一個字典類型的參數,也用于target對象的參數。
daemon設置守護線程的標識,如果設置為True那么這個線程就是守護線程,此時如果主線程結束了,那么守護線程也會立即被殺死。所以當有在守護線程中打開文件、數據庫等資源操作時,釋放資源就有可能出錯。

線程池

程序中若有大量的線程創建和銷毀,則對性能影響較大。我們可以使用線程池。同樣地,它的APIJava極為相似。

Executor
concurrent.futures.Executor

這是一個抽象類,定義了線程池的接口。

  • submit(fn, *args, **kwargs)
    執行fn(args,kwargs) 并會返回一個future對象,通過future可獲取到執行結果
  • map(func, *iterables, timeout=None, chunksize=1)
    這個方法與map(func,*iterables)類似
  • shutdown(wait=True)
    關閉線程池
from concurrent.futures import ThreadPoolExecutor
# 使用max_workers參數指定線程池中線程的最大數量為2
with ThreadPoolExecutor(max_workers=2) as executor:
    # 提交任務到線程池
    future = executor.submit(pow, 2, 31) # 計算2^31
    future2 = executor.submit(pow, 1024, 2)
    # 使用future 獲取執行結果
    print(future.result())
    print(future2.result())

# 執行結果
# 2147483648
# 1048576
同步

若有多個線程對同一個資源或內存進行訪問或操作就有會產生競爭條件。
Python提供了鎖、信號量、條件和事件等同步原語可幫忙我們實現線程的同步機制。

Lock

Lock有兩種狀態:lockedunlocked。它有兩個基本方法:acquire()release(),且都是原子操作的。
一個線程通過acquire()獲取到鎖,Lock的狀態變成locked,而其它線程調用acquire()時只能等待鎖被釋放。 當線程調用了release()Lock的狀態就變成了unlocked,此時其它等待線程中只有一個線程將獲得鎖。

import threading

share_mem_lock = 0
share_mem = 0
count = 1000000

locker = threading.Lock()


def add_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock += 1
        locker.release()


def minus_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock -= 1
        locker.release()


def add_in_thread():
    global share_mem

    for i in range(count):
        share_mem += 1


def minus_in_thread():
    global share_mem

    for i in range(count):
        share_mem -= 1


if __name__ == '__main__':
    t1 = threading.Thread(target=add_in_thread_with_lock)
    t2 = threading.Thread(target=minus_in_thread_with_lock)

    t3 = threading.Thread(target=add_in_thread)
    t4 = threading.Thread(target=minus_in_thread)

    t1.start()
    t2.start()

    t3.start()
    t4.start()

    t1.join()
    t2.join()

    t3.join()
    t4.join()

    print("share_mem_lock : ", share_mem_lock)
    print("share_mem : ", share_mem)

# 執行結果
# share_mem_lock :  0
# share_mem :  51306

沒有使用鎖機制的代碼執行后最后的值很有可能就不為0。而有鎖的代碼則可以保證同步。

RLock

RLockReentrant Lock,就是可以重復進入的鎖,也叫遞歸鎖。它有3個特點:

  • 誰獲取鎖誰釋放。如A線程獲取了鎖,那么只有A線程才能釋放該鎖
  • 同一線程可重復多次獲取該鎖。即可以調用acquire多次
  • acquire多少次,對應release就多少次,且最后一次release才會釋放鎖。
Condition

條件是另一種同步原語機制。其實它的內部是封裝了RLock,它的acquire()release()方法就是RLock的方法。
Condition常用的API還有wait()、notify()notify_all()方法。 wait()方法會釋放鎖,然后進入阻塞狀態直到其它線程通過notify()notify_all()喚醒自己。wait()方法重新獲取到鎖就會返回。
notify()會喚醒其中一個等待的線程,而notify_all()會喚醒所有等待的線程。
需要注意的是notify()notify_all()執行后并不會釋放鎖,只有調用了release()方法后鎖才會釋放。
讓我們看一個來自于《Python并行編程手冊》中的一個生產者與消費者例子

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        # 獲取鎖
        condition.acquire()
        if len(items) == 0:
            # 當items為空時,釋放了鎖,并等待生產者notify
            condition.wait()
            print("Consumer notify : no item to consume")
        # 開始消費
        items.pop()
        print("Consumer notify : consumed 1 item")
        print("Consumer notify : items to consume are " + str(len(items)))
        # 消費之后notify喚醒生產者,因為notify不會釋放鎖,所以還要調用release釋放鎖
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(2)
            self.consume()


class producer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 5:
            # 若items時滿的,則執行wait,釋放鎖,并等待消費者notify
            condition.wait()
            print("Producer notify : items producted are " + str(len(items)))
            print("Producer notify : stop the production!!")
        # 開始生產
        items.append(1)
        print("Producer notify : total items producted " + str(len(items)))
        # 生產后notify消費者,因為notify不會釋放鎖,所以還執行了release釋放鎖。
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
Semaphore

信號量內部維護一個計數器。acquire()會減少這個計數,release()會增加這個計數,這個計數器永遠不會小于0。當計數器等于0時,acquire()方法就會等待其它線程調用release()。
還是借助一個生產者與消費者的例子來理解

# -*- coding: utf-8 -*-

"""Using a Semaphore to synchronize threads"""
import threading
import time
import random

# 默認情況內部計數為1,這里設置為0。
# 若設置為負數則會拋出ValueError
semaphore = threading.Semaphore(0)


def consumer():
    print("consumer is waiting.")
    # 獲取一個信號量,因為初始化時內部計數設置為0,所以這里一開始時是處于等待狀態
    semaphore.acquire()
    # 開始消費
    print("Consumer notify : consumed item number %s " % item)


def producer():
    global item
    time.sleep(2)
    # create a random item
    item = random.randint(0, 1000)
    # 開始生產
    print("producer notify : produced item number %s" % item)
    # 釋放信號量, 內部計數器+1。當有等待的線程發現計數器大于0時,就會喚醒并從acquire方法中返回
    semaphore.release()


if __name__ == '__main__':
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("program terminated")

信號量經?;嵊糜謐試慈萘咳范ǖ某【?,比如數據庫連接池等。

Event

事件在線程間的通信方式非常簡單。一個線程發送事件另一個線程等待接收。
Event對象內部維護了一個bool變量flag。通過set()方法設置該變量為True,clear()方法設置flagFalse。wait()方法會一直等待直到flag變成True

結合例子

# -*- coding: utf-8 -*-

import time
from threading import Thread, Event
import random

items = []
event = Event()

class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            # 等待事件
            self.event.wait()
            # 開始消費
            item = self.items.pop()
            print('Consumer notify : %d popped from list by %s' % (item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        while True:
            time.sleep(2)
            # 開始生產
            item = random.randint(0, 256)
            self.items.append(item)
            print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
            print('Producer notify : event set by %s' % self.name)
            # 發送事件通知消費者消費
            self.event.set()
            print('Produce notify : event cleared by %s ' % self.name)
            # 設置事件內部變量為False,隨后消費者線程調用wait()方法時,進入阻塞狀態
            self.event.clear()


if __name__ == '__main__':
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
Timer

定時器TimerThread的子類。用于處理定時執行的任務。啟動定時器使用start(),取消定時器使用cancel()。

from threading import Timer

def hello():
    print("hello, world")

t = Timer(3.0, hello)
t.start()  # 3秒后 打印 "hello, world"
with語法

Lock、RLock、ConditionSemaphore可以使用with語法。
這幾個對象都實現擁有acquire()release()方法,且都實現了上下文管理協議。

with some_lock:
    # do something...

等價于

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

0x03 小結

本文主要介紹Python中線程的使用,主要是對threading??櫓?code>Thread對象、線程池Executor常見用法的展示?;沽私飭訟叱痰耐皆?code>Lock、RLock、Condition、Semaphore、Event以及TimerAPI的使用。

0x04 引用

posted @ 2019-08-21 16:40 hylinux1024 閱讀(...) 評論(...) 編輯 收藏