【💊 Python的解憂錦囊 - FastAPI】多個worker如何共享數據?

【💊 Python的解憂錦囊 - FastAPI】多個worker如何共享數據?

更新於 發佈於 閱讀時間約 13 分鐘

要如何使用unicorn啟動多個FastAPI服務, 歡迎參考我們的「【💊 Python的解憂錦囊 - FastAPI】如何啟動多個Workers」。


當我們試著設計帶入模組化時…

我們在「【💊 Python的解憂錦囊 - FastAPI】使用 lifespan 來共享資料與管理生命週期」有分享FastAPI如何共享資料, 我們依樣畫葫蘆來試著寫一段代碼如下:

import uvicorn
from contextlib import asynccontextmanager

from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection

def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args

def run(args: dict):
@asynccontextmanager
async def lifespan(app: FastAPI):
# 載入ML模型
app.state.args = args
yield



app = FastAPI(lifespan=lifespan)

@app.get("/args")
async def get_args(args: dict = Depends(get_api_shared_args)):
return {"args": args}

uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=1,
)

if __name__ == '__main__':
# 假設這邊我們設計了args使用者輸入參數, 可能會是來自其他模組的輸入參數或者callback, 所以我們會將args放在這
args = {
'arg1': 'arg1'
}
run(args)


執行後會發生找不到模組的狀況:

raw-image


這是因為我們的app被包在函數裡面了, 所以才會這樣, 那我們再試著改改, 把所有app邏輯搬移到最外層:

import uvicorn
from contextlib import asynccontextmanager
from typing import Dict

from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection

# 創建一個全局變量來存儲args
_shared_args: Dict = {}

def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args

@asynccontextmanager
async def lifespan(app: FastAPI):
# 使用全局變量中的args
app.state.args = _shared_args
yield

app = FastAPI(lifespan=lifespan)

@app.get('/args')
async def get_args(args: dict = Depends(get_api_shared_args)):
return {'args': args}

def run(args: dict):
# 將args存儲到全局變量
global _shared_args
_shared_args = args

uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=1,
)

if __name__ == '__main__':
args = {'arg1': 'arg1'}
run(args)


但一樣存取不到我們的 _shared_args:

raw-image



⭐ 我們可以怎麼解決?

大致上的架構會如下圖, 我們會先透過multiprocessing的shared_memory來儲存我們需要共享的參數, 並封裝成IPCManager, 最後藉由fastapi的lifespan來傳遞共同參數到各個API裡面。

raw-image


上圖有一個IPCManager主要是將SharedMemory抽出去獨立一個模組, 這部份就交由各為自行抽離囉, 如有任何問題歡迎留言討論, 我們這邊為了讓您容易理解, 會將所有代碼都盡量放在同一個模組之中,我們也不希望需要的朋友單純只是複製貼上, 而是要能夠真正吸收, 內化成自己的知識點。

import os
import uvicorn
import json
from contextlib import asynccontextmanager
from typing import Dict
from multiprocessing import shared_memory
import numpy as np

from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection

# 創建共享內存的名稱
SHARED_MEMORY_NAME = "fastapi_args"
# 預設分配的共享內存大小(根據需要調整)
SHARED_MEMORY_SIZE = 1024

def dict_to_bytes(d: dict) -> bytes:
return json.dumps(d).encode('utf-8')

def bytes_to_dict(b: bytes) -> dict:
return json.loads(b.decode('utf-8').rstrip('\\x00'))

def create_shared_memory(data: dict):
# 將字典轉換為bytes
data_bytes = dict_to_bytes(data)
# 創建共享內存
try:
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME, create=True, size=SHARED_MEMORY_SIZE)
except FileExistsError:
# 如果已存在,則先刪除再創建
shared_memory.SharedMemory(name=SHARED_MEMORY_NAME).unlink()
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME, create=True, size=SHARED_MEMORY_SIZE)

# 將數據寫入共享內存
shm.buf[:len(data_bytes)] = data_bytes
return shm

def get_shared_memory_data() -> dict:
try:
# 連接到現有的共享內存
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME)
# 讀取數據直到遇到空字節
data = bytes(shm.buf[:SHARED_MEMORY_SIZE])
return bytes_to_dict(data)
except FileNotFoundError:
return {}

def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args

@asynccontextmanager
async def lifespan(app: FastAPI):
# 從共享內存中讀取參數
app.state.args = get_shared_memory_data()
yield
# 應用關閉時清理共享內存
try:
shared_memory.SharedMemory(name=SHARED_MEMORY_NAME).unlink()
except FileNotFoundError:
pass

app = FastAPI(lifespan=lifespan)

@app.get('/args')
async def get_args(args: dict = Depends(get_api_shared_args)):
return {'args': args}

def run(args: dict):
# 創建共享內存並存儲參數
shm = create_shared_memory(args)
main_pid = os.getpid()

try:
uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=5,
)
finally:
if os.getpid() == main_pid:
try:
shm.close()
shm.unlink()
except FileNotFoundError:
pass

if __name__ == '__main__':
args = {'arg1': 'arg1'}
run(args)



除了SharedMemory之外還有別的方法嗎?

答案是有的, 我們可以透過Memcached或者Redis會是比較好的選擇, 尤其是多台伺服器的情況之下, 但因為我們這邊僅針對單一伺服器底下又不想架設Proxy來分流時, 可以直接使用unicorn來完成多個worker的功能, 只是共享參數會有一些需要注意的地方。


結語

這個案例我們也是卡關了好一段時間, 不明白明明app就寫的好好的呀, 怎麼會透過字串帶入unicorn之後就變了樣了呢? 原來是我們對於生命週期及多行程沒有理解到透徹才會這樣, 過程中也是不斷的詢問ChatGPT, 多虧了AI的指導讓我們能夠快速定位問題, 並找出解決方案, 善用AI是我們這個時代必備的技能了, 我們在「🔒 阿Han的軟體心法實戰營 - 📜 江湖一點訣」也會持續分享我們的開發經驗與AI時代下必要的技能, 歡迎一同加入學習, 共同成長。

avatar-img
阿Han的沙龍
128會員
281內容數
哈囉,我是阿Han,是一位 👩‍💻 軟體研發工程師,喜歡閱讀、學習、撰寫文章及教學,擅長以圖代文,化繁為簡,除了幫助自己釐清思路之外,也希望藉由圖解的方式幫助大家共同學習,甚至手把手帶您設計出高品質的軟體產品。
留言
avatar-img
留言分享你的想法!
阿Han的沙龍 的其他內容
🤔 簡單且靜態就足夠了? 相信我們在開發Python應用程式的過程中, 常常會借用Enum來定義我們可能的選項, 就像顏色紅、綠、黃會有這樣的結構: class Color(str, Enum): RED = 'red' GREED = 'green' YELLOW = 'yel
當我們的系統發展到一定程度時, 難免會面臨到正式上線的問題, 要如何讓維運更加簡易呢? 尤其隨著複雜的客製化配置的出現時, 我們應該如何有效的管理, 甚至驗證配置是否如預期資料型態、格式…, 而正好 pydantic 可以滿足這樣的需求, 就讓我們來看看怎麼使用吧! 需安裝的套件 pip i
我們在「【🔒 Python API框架篇 - FastAPI】Ep.1 啟航」有說明如何使用uvicorn來啟動FastAPI服務, 假設今天我們的API是一個CPU密集型的運算服務時, 通常我們會希望開啟多個行程來幫忙處理, 那麼大致上的撰寫方式會像這樣: app = FastAPI( ti
🤔 簡單且靜態就足夠了? 相信我們在開發Python應用程式的過程中, 常常會借用Enum來定義我們可能的選項, 就像顏色紅、綠、黃會有這樣的結構: class Color(str, Enum): RED = 'red' GREED = 'green' YELLOW = 'yel
當我們的系統發展到一定程度時, 難免會面臨到正式上線的問題, 要如何讓維運更加簡易呢? 尤其隨著複雜的客製化配置的出現時, 我們應該如何有效的管理, 甚至驗證配置是否如預期資料型態、格式…, 而正好 pydantic 可以滿足這樣的需求, 就讓我們來看看怎麼使用吧! 需安裝的套件 pip i
我們在「【🔒 Python API框架篇 - FastAPI】Ep.1 啟航」有說明如何使用uvicorn來啟動FastAPI服務, 假設今天我們的API是一個CPU密集型的運算服務時, 通常我們會希望開啟多個行程來幫忙處理, 那麼大致上的撰寫方式會像這樣: app = FastAPI( ti