更新於 2024/10/24閱讀時間約 11 分鐘

【💊 Python的解憂錦囊】如何在multithread/multiprocess傳遞固定參數?

raw-image


撰寫Python的朋友都知道multithread/multiprocess能為我們帶來效能的改進,減少硬體資源的閒置,但在撰寫的過程中常常會發現到我們所設計的工作池模式會需要將「待辦清單」的工作項目當成參數傳遞進去執行, 除了「待辦清單」之外, 其餘的參數基本上都是固定的, 基於這樣的需求之下, 我們要怎麼完成呢? 讓我們耐心的看完這個篇章。

我們通常會這樣做…

假設我們設計了一個工作但尚未實作工作詳細內容, 僅印出工作資訊如下 :

def job(name: str, action: str, item: str):
"""工作內容

Args :
name (str): 什麼樣的工作
action (str): 工作的行為(加工、蓋房、...)
item: 工作的項目
Retruns:
None
"""
result = f'{name} 正在 {action} {item}'
return result

接著我們在主程序設計好我們要執行「什麼樣的工作」、「工作的行為」, 接著我們會有許多的「待辦事項」需要執行, 接著招聘好工人(num_workers)之後就可以根據作業區擴展廠區(pool),每個作業區獨立運作這些待辦清單, 那我們可能會這樣撰寫程式:


import concurrent.futures

if __name__ == "__main__":
# 設計今天的主題
name = '食品加工廠'
action = '製作'

# 待辦事項
todo_list = ['熱狗', '炸雞', '薯條', '肉乾']

num_workers = 3

# 創建一個多進程池,根據上述的工人數量擴展工作池
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:

# 我們將大量的待辦事項轉換成每個worker需要執行的
params = [(name, action, item) for item in todo_list]
results = executor.map(job, params)

# 印出執行結果
for result in results:
print(result)


但上述的作法真的好嗎?

我們的params不會隨著「待辦清單」越多(上千萬個事項), 導致記憶體爆掉嗎? 不妨來看看我們更改後的範例, 假設有1000萬的「待辦清單」時會發生什麼狀況? 我們也順便埋入執行的估測時間來實際看看結果。

import multiprocessing
import time
import random
import sys

def job(name: str, action: str, item: str):
"""工作內容

Args :
name (str): 工作名稱
action (str): 動作
item: 工作的項目
Retruns:
None
"""
secs = random.random()
result = f'{name} 正在 {action} {item} 花費了 {secs} 秒'
time.sleep(secs)
return result

if __name__ == "__main__":
# 固定的參數
name = '食品加工廠'
action = '製作'

# 主程序提供的待辦事項清單
# todo_list = ['熱狗', '炸雞', '薯條', '肉乾']
todo_list = [f"工作{i}" for i in range(1, 10000000)]

num_workers = 3

# 創建一個多進程池,這裡使用3個進程
with multiprocessing.Pool(processes=num_workers) as pool:
start_time = time.time()
params = [(name, action, item) for item in todo_list]
end_time = time.time()

use_bytes = sys.getsizeof(params)
use_mb = use_bytes / 1048576
print(f'渲染參數花費的時間: {end_time - start_time} 秒, 耗用的記憶體: {use_mb} MB')

results = pool.starmap(job, params)
for result in results:
print(result)

我們會發現以下光是簡單的參數就花費如此之多的記憶體耗用量, 那面對大數據時怎麼辦?

因此我們可以這樣做…

我們在「【Python 軍火庫🧨 - functools】使用partial來設計函數樣板」有介紹到「functools.partial」這個工具庫, 我們可以利用partial的技巧製造出固定參數的新函式, 以不變應萬變,套用到multiprocess之前就不需要一堆複製的資源耗費…。

那在進入主題之前, 我們先來複習一下關於functools.partial函式, 他可以幫我們製作出固定參數的樣版。


但聰明的大家有沒有觀察到一個狀況, 那就是動態的參數通常在前面(a, b), 而固定的參數放在後段(c), 因此底下我們的job的參數設計勢必要改一改, 由於我們的item會隨著todolist而變化, 因此需要將函式參數順序稍微修改一下成「job(item: str, name: str, action: str)」。

另外在於multiprocess的部份, 我們原先使用的是「starmap」接受多參數的模式, 但經上述演示之後覺得對於大數據的處理不太妥當, 因此我們可以更換成「map(func, iterable[, chunksize])」, 他們的差異主要在於

import multiprocessing
import time
import random
import sys
from functools import partial

def job(item: str, name: str, action: str):
"""工作內容

Args :
item (str): 工作的項目 [可變]
name (str): 工作名稱 [固定]
action (str): 動作 [固定]

Retruns:
None
"""
secs = random.random()
result = f'{name} 正在 {action} {item} 花費了 {secs} 秒'
return result

if __name__ == "__main__":
# 固定的參數
name = '食品加工廠'
action = '製作'

# 主程序提供的待辦事項清單
# todo_list = ['熱狗', '炸雞', '薯條', '肉乾']
todo_list = [f"工作{i}" for i in range(1, 10000000)]

num_workers = 24

# 創建一個多進程池,這裡使用3個進程
with multiprocessing.Pool(processes=num_workers) as pool:
start_time = time.time()
job_func = partial(job, name=name, action=action)
end_time = time.time()

use_bytes = sys.getsizeof(job_func)
use_mb = use_bytes / 1048576
print(f'渲染參數花費的時間: {end_time - start_time} 秒, 耗用的記憶體: {use_mb} MB')

results = pool.map(job_func, todo_list)
for result in results:
print(result)

乍看之下是不是會誤以為花費更久的時間了呢? 請仔細看一下它的數字是否怪怪的, 有負數的出現…, 這是科學記號, 代表是一個非常小的數字, 也就是幾乎沒有耗損的過程, 這面對於大數據的處理, 相對能夠帶來極大的效能增幅。

結語

上述的例子仍有一些缺陷, 雖然實際上仍是以multiprocess的方式並行處理我們的運算, 但必須等待全部的工作完成後, 才回到主線程, 這對於我們跟蹤程式進度的應用會稍微不利, 不過沒關係, 我們後續也會針對這個部份進行重點分享, 歡迎持續追蹤, 讓我們一起探究軟體開發的大小事。

學習軟體開發的路上常常苦於網路資訊爆炸嗎? 教學何其多,但卻遇到無法明確選擇的困境呢? 歡迎加入「🔒 阿Han的軟體心法實戰營」, 這裡不給您冗餘的雜訊, 單刀直入直接送您業界開發重點, 避開選擇障礙的困境, 讓您獲得業界標準的開發起手式, 成為Top 1的頂尖人才。

分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.