在Python中,queue
是一個非常有用的模块。
它提供了多種佇列(queue)實現,用於在多線程環境中安全地交換信息或者數據。
佇列(queue)是一種先進先出(FIFO)的數據結構,允許在佇列的一端插入元素,另一端取出元素。(FIFO 是First In, First Out 的縮寫)
queue
模塊的基本用法及相關說明:使用Queue
類來創建一個佇列對象。可以指定佇列的最大長度,如果不指定,則默認為無限長度的佇列。
import queue
q = queue.Queue(maxsize=10) # 創建一個最大長度為10的佇列
放入元素:
使用put()
方法向佇列中放入一個元素。
q.put(item)
取出元素:
使用get()
方法從佇列中取出一個元素,如果佇列為空,則會阻塞直到有元素可用。
item = q.get()
判斷佇列是否為空:
使用empty()
方法可以判斷佇列是否為空。
if q.empty():
print("Queue is empty")
判斷佇列是否已滿:
使用full()
方法可以判斷佇列是否已滿,僅適用於有限長度的佇列。
設置阻塞與非阻塞操作:
默認情況下,佇列的put()
和get()
操作是阻塞的,如果你想要非阻塞的行為,可以使用put_nowait()
和get_nowait()
方法。
q.put_nowait(item)
item = q.get_nowait()
佇列的阻塞操作:
佇列的阻塞操作意味着當佇列為空時,get()
操作將會等待直到有元素可用;當佇列已滿時,put()
操作將會等待直到有空間可用。這樣可以很容易地實現生產者-消費者模式。
非阻塞操作
import queue
my_queue = queue.Queue(maxsize=2)
# 使用 put_nowait() 方法放入元素,即使佇列已滿也不會等待
try:
my_queue.put_nowait("Item 1")
my_queue.put_nowait("Item 2")
my_queue.put_nowait("Item 3") # 這裡會引發異常,因為佇列已滿
except queue.Full:
print("Queue is full, unable to put item.")
# 使用 get_nowait() 方法取出元素,即使佇列為空也不會等待
try:
item1 = my_queue.get_nowait()
item2 = my_queue.get_nowait()
item3 = my_queue.get_nowait() # 這裡會引發異常,因為佇列為空
except queue.Empty:
print("Queue is empty, unable to get item.")
Queue is full, unable to put item.
Queue is empty, unable to get item.
阻塞操作
import queue
import time
my_queue = queue.Queue(maxsize=2)
# 使用 put_nowait() 方法放入元素,即使佇列已滿也不會等待
# 時間計數來確認,等待多久才,等待多久才引發異常
start_time = time.time()
try:
my_queue.put("Item 1", timeout=1) #若未設定timout將會無限期等待
my_queue.put("Item 2", timeout=1)
my_queue.put("Item 3", timeout=1) # 這裡會引發異常,因為佇列已滿,等待一秒若無資料則會引發異常
except queue.Full:
print("Queue is full, unable to put item.")
end_time = time.time()
print(f'put處理時間 :{(end_time - start_time):6f}秒')
# 使用 get_nowait() 方法取出元素,即使佇列為空也不會等待
start_time = time.time()
try:
item1 = my_queue.get(timeout=1)
item2 = my_queue.get(timeout=1)
item3 = my_queue.get(timeout=1) # 這裡會引發異常,因為佇列為空
except queue.Empty:
print("Queue is empty, unable to get item.")
end_time = time.time()
print(f'put處理時間 :{(end_time - start_time):6f}秒')
Queue is full, unable to put item.
put處理時間 :1.001525秒
Queue is empty, unable to get item.
put處理時間 :1.000238秒
my_queue
,並將10個資料放入佇列中Plan
執行緒 Plan_A
和Plan_B
,並將佇列物件和執行緒編號傳遞給它們。join()
方法等待兩個執行緒完成處理。import time
import threading
import queue
# Plan 類別
class Plan(threading.Thread):
def __init__(self, queue, num):
threading.Thread.__init__(self)
self.queue = queue
self.num = num
def run(self):
while self.queue.qsize() > 0:
# 取得新的資料
msg = self.queue.get()
# 印出資料
print(f'Plan_{self.num} : {msg}')
time.sleep(1)
# 建立佇列
my_queue = queue.Queue()
for i in range(10):
my_queue.put(f'Data : {i}')
print(f'查看佇列資料長度: {my_queue.qsize()}')
# 建立兩個 Plan
Plan_A = Plan(my_queue, 'A')
Plan_B = Plan(my_queue, 'B')
# 讓 Worker 開始取出資料
Plan_A.start()
Plan_B.start()
# 等待所有 Worker 結束
Plan_A.join()
Plan_B.join()
print("Done.")
查看佇列資料長度: 10
Plan_A : Data : 0
Plan_B : Data : 1
Plan_A : Data : 2
Plan_B : Data : 3
Plan_A : Data : 4
Plan_B : Data : 5
Plan_A : Data : 6
Plan_B : Data : 7
Plan_A : Data : 8
Plan_B : Data : 9
Done.
[Python][微進階]threading 多執行緒平行處理
https://docs.python.org/zh-tw/3/library/queue.html#module-queue