在Python 3.2版本之後加入了「concurrent.futures」啟動平行任務, 它可以更好的讓我們管理多執行緒/多行程的應用場景,讓我們在面對這種併發問題時可以不必害怕, 用一個非常簡單的方式就能夠處裡, 底下我們將為您展示一段程式碼:
import concurrent.futures
import time
# 定義一個任務,這裡示範了一個簡單的任務,打印數字並暫停一段時間
def task(number):
print(f"Starting task {number}")
time.sleep(2) # 模擬任務執行時間
print(f"Task {number} completed")
return f"Task {number} result"
if __name__ == "__main__":
# 創建 ThreadPoolExecutor,設置最大執行緒數量為3
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 使用 map 方法提交任務
results = executor.map(task, range(5)) # 提交5個任務
# 直接迭代獲取結果
for result in results:
print(f"Task result: {result}")
是不是使用起來非常的簡單又直觀? 單純看看上面的程式碼當然看不出來, 讓我們為您展示一下過往的處理方式:
import threading
import time
# 定義一個任務,這裡示範了一個簡單的任務,打印數字並暫停一段時間
def task(number):
print(f"Starting task {number}")
time.sleep(2) # 模擬任務執行時間
print(f"Task {number} completed")
if __name__ == "__main__":
threads = [] # 創建一個空列表,用於存放執行緒
# 創建並啟動5個執行緒,每個執行緒執行一個任務
for i in range(5):
thread = threading.Thread(target=task, args=(i,))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
print("All tasks completed")
傳統在使用多執行緒處理任務時需要自行管理, 而並非像是「concurrent.futures」那麼的直觀, 簡單的事情簡單做, 錯誤會更少,因此我們應該考慮這種封裝到最簡易使用的技巧, 雖說語法糖在尚未了解其背後原理的狀況之下很危險, 但也不能完全不使用, 而是應該去認識它,並試著採用它, 讓我們複雜的程式更加的乾淨整潔, 以達到永續維護的效果。
P.S 上述的程式碼都是以多執行緒的方式展示, 而多行程也大同小異, 歡迎參考「ProcessPoolExecutor」。
相信在使用AI模型進行任務(NLP、NLU、語音辨識...)時, 最後期望資源最大化時就是multiprocess/multithread派上用場的時刻, 但問題來了,假設我們每個job都重新載入模型到記憶體運作的過程, 真的有比較省時嗎? 光是I/O就耗費大量的時間了, 還不如單核的快,因此我們需要的是如何預先建置多個worker的模型在等待我們的工作, 正巧在在3.7版之後,新增 initializer 與 initargs 引數,就讓我們一起來看看這個神奇的初始化參數吧!
我們可以利用幾個關鍵的元素來完成這件事情...
# pid, 模型變數
models: Dict[int, 模型型別] = {}
def _init_models():
"""為每個worker初始化模型
"""
# 使用process id作為模型的識別碼, 讓正確的process取得正確的模型
pid = multiprocessing.current_process().pid
logger.debug('[模型載入中] %s', pid)
model = Model(...)
models[pid] = model
logger.debug('[模型載入完畢] %s', pid)
if __name__ == "__main__":
# 創建 ThreadPoolExecutor,設置最大執行緒數量為3
with concurrent.futures.ProcessPoolExecutor(
max_workers=3,
initializer=_init_models
) as executor:
# 使用 map 方法提交任務
results = executor.map(task, ...) # 提交N個任務
# 直接迭代獲取結果
for result in results:
print(f"Task result: {result}")
透過這樣的模式就能夠預先規劃資源分配, 最多就幾個模型會被載入到記憶體之中, 我們也知道GPU一張貴貴的, 不可能無限制的載入到記憶體運作,因此這方式可以讓我們在資源最大化的運作之下完成複雜的模型任務。
資源有限的情況之下, 我們只能設法善用資源, 讓資源運用最佳化, 避免浪費, 畢竟硬體也都是成本的堆疊啊! 而在使用「multiprocess/multithread」時也是需要具備許多基礎知識才能夠更好的運用, 這次就針對多行程共用模型的情境進行分享, 若有更進階的使用方式也歡迎留言分享, 讓我們共同學習更進一步吧!