更新於 2024/03/22閱讀時間約 8 分鐘

[Python][微進階]Queue佇列中的資料被多個執行緒並行處理

在Python中,queue是一個非常有用的模块。

它提供了多種佇列(queue)實現,用於在多線程環境中安全地交換信息或者數據

佇列(queue)是一種先進先出(FIFO)的數據結構,允許在佇列的一端插入元素,另一端取出元素。(FIFO First In, First Out 的縮寫)

FIFO


以下是Python中queue模塊的基本用法及相關說明:

創建佇列

使用Queue類來創建一個佇列對象。可以指定佇列的最大長度,如果不指定,則默認為無限長度的佇列。

import queue

q = queue.Queue(maxsize=10) # 創建一個最大長度為10的佇列



放入元素

使用put()方法向佇列中放入一個元素。

q.put(item)



取出元素

使用get()方法從佇列中取出一個元素,如果佇列為空,則會阻塞直到有元素可用。

item = q.get()


判斷佇列是否為空

使用empty()方法可以判斷佇列是否為空。

if q.empty():
print("Queue is empty")


判斷佇列是否已滿

使用full()方法可以判斷佇列是否已滿,僅適用於有限長度的佇列。


設置阻塞與非阻塞操作

默認情況下,佇列的put()get()操作是阻塞的,如果你想要非阻塞的行為,可以使用put_nowait()get_nowait()方法。

q.put_nowait(item)
item = q.get_nowait()


佇列的阻塞操作

佇列的阻塞操作意味着當佇列為空時,get()操作將會等待直到有元素可用;當佇列已滿時,put()操作將會等待直到有空間可用。這樣可以很容易地實現生產者-消費者模式。

程式範例說明解釋阻塞與非阻塞操作



非阻塞操作

import queue

my_queue = queue.Queue(maxsize=2)

# 使用 put_nowait() 方法放入元素,即使佇列已滿也不會等待
try:
my_queue.put_nowait("Item 1")
my_queue.put_nowait("Item 2")
my_queue.put_nowait("Item 3") # 這裡會引發異常,因為佇列已滿
except queue.Full:
print("Queue is full, unable to put item.")

# 使用 get_nowait() 方法取出元素,即使佇列為空也不會等待
try:
item1 = my_queue.get_nowait()
item2 = my_queue.get_nowait()
item3 = my_queue.get_nowait() # 這裡會引發異常,因為佇列為空
except queue.Empty:
print("Queue is empty, unable to get item.")



輸出

Queue is full, unable to put item.
Queue is empty, unable to get item.


阻塞操作

import queue
import time
my_queue = queue.Queue(maxsize=2)

# 使用 put_nowait() 方法放入元素,即使佇列已滿也不會等待
# 時間計數來確認,等待多久才,等待多久才引發異常
start_time = time.time()
try:
my_queue.put("Item 1", timeout=1) #若未設定timout將會無限期等待
my_queue.put("Item 2", timeout=1)
my_queue.put("Item 3", timeout=1) # 這裡會引發異常,因為佇列已滿,等待一秒若無資料則會引發異常
except queue.Full:
print("Queue is full, unable to put item.")

end_time = time.time()
print(f'put處理時間 :{(end_time - start_time):6f}秒')
# 使用 get_nowait() 方法取出元素,即使佇列為空也不會等待

start_time = time.time()
try:
item1 = my_queue.get(timeout=1)
item2 = my_queue.get(timeout=1)
item3 = my_queue.get(timeout=1) # 這裡會引發異常,因為佇列為空
except queue.Empty:
print("Queue is empty, unable to get item.")

end_time = time.time()
print(f'put處理時間 :{(end_time - start_time):6f}秒')



輸出

Queue is full, unable to put item.
put處理時間 :1.001525
Queue is empty, unable to get item.
put處理時間 :1.000238

程式範例

主要流程如下:

1.創建一個佇列 my_queue,並將10個資料放入佇列中

放入佇列示意圖

2.創建兩個Plan執行緒 Plan_APlan_B,並將佇列物件和執行緒編號傳遞給它們。

3.啟動兩個執行緒,使其同時開始取出佇列中的資料。

取出Queue資料

4.使用join()方法等待兩個執行緒完成處理。

5.印出 "Done." 表示處理完成

import time
import threading
import queue

# Plan 類別
class Plan(threading.Thread):
def __init__(self, queue, num):
threading.Thread.__init__(self)
self.queue = queue
self.num = num

def run(self):
while self.queue.qsize() > 0:
# 取得新的資料
msg = self.queue.get()
# 印出資料
print(f'Plan_{self.num} : {msg}')
time.sleep(1)

# 建立佇列
my_queue = queue.Queue()

for i in range(10):
my_queue.put(f'Data : {i}')
print(f'查看佇列資料長度: {my_queue.qsize()}')

# 建立兩個 Plan
Plan_A = Plan(my_queue, 'A')
Plan_B = Plan(my_queue, 'B')

# 讓 Worker 開始取出資料
Plan_A.start()
Plan_B.start()

# 等待所有 Worker 結束
Plan_A.join()
Plan_B.join()

print("Done.")

輸出

查看佇列資料長度: 10
Plan_A : Data : 0
Plan_B : Data : 1
Plan_A : Data : 2
Plan_B : Data : 3
Plan_A : Data : 4
Plan_B : Data : 5
Plan_A : Data : 6
Plan_B : Data : 7
Plan_A : Data : 8
Plan_B : Data : 9
Done.

參考文獻

[Python][微進階]threading 多執行緒平行處理

https://docs.python.org/zh-tw/3/library/queue.html#module-queue

分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.