要如何使用unicorn啟動多個FastAPI服務, 歡迎參考我們的「【💊 Python的解憂錦囊 - FastAPI】如何啟動多個Workers」。
我們在「【💊 Python的解憂錦囊 - FastAPI】使用 lifespan 來共享資料與管理生命週期」有分享FastAPI如何共享資料, 我們依樣畫葫蘆來試著寫一段代碼如下:
import uvicorn
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection
def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args
def run(args: dict):
@asynccontextmanager
async def lifespan(app: FastAPI):
# 載入ML模型
app.state.args = args
yield
app = FastAPI(lifespan=lifespan)
@app.get("/args")
async def get_args(args: dict = Depends(get_api_shared_args)):
return {"args": args}
uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=1,
)
if __name__ == '__main__':
# 假設這邊我們設計了args使用者輸入參數, 可能會是來自其他模組的輸入參數或者callback, 所以我們會將args放在這
args = {
'arg1': 'arg1'
}
run(args)
執行後會發生找不到模組的狀況:
這是因為我們的app被包在函數裡面了, 所以才會這樣, 那我們再試著改改, 把所有app邏輯搬移到最外層:
import uvicorn
from contextlib import asynccontextmanager
from typing import Dict
from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection
# 創建一個全局變量來存儲args
_shared_args: Dict = {}
def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args
@asynccontextmanager
async def lifespan(app: FastAPI):
# 使用全局變量中的args
app.state.args = _shared_args
yield
app = FastAPI(lifespan=lifespan)
@app.get('/args')
async def get_args(args: dict = Depends(get_api_shared_args)):
return {'args': args}
def run(args: dict):
# 將args存儲到全局變量
global _shared_args
_shared_args = args
uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=1,
)
if __name__ == '__main__':
args = {'arg1': 'arg1'}
run(args)
但一樣存取不到我們的 _shared_args:
大致上的架構會如下圖, 我們會先透過multiprocessing的shared_memory來儲存我們需要共享的參數, 並封裝成IPCManager, 最後藉由fastapi的lifespan來傳遞共同參數到各個API裡面。
上圖有一個IPCManager主要是將SharedMemory抽出去獨立一個模組, 這部份就交由各為自行抽離囉, 如有任何問題歡迎留言討論, 我們這邊為了讓您容易理解, 會將所有代碼都盡量放在同一個模組之中,我們也不希望需要的朋友單純只是複製貼上, 而是要能夠真正吸收, 內化成自己的知識點。
import os
import uvicorn
import json
from contextlib import asynccontextmanager
from typing import Dict
from multiprocessing import shared_memory
import numpy as np
from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection
# 創建共享內存的名稱
SHARED_MEMORY_NAME = "fastapi_args"
# 預設分配的共享內存大小(根據需要調整)
SHARED_MEMORY_SIZE = 1024
def dict_to_bytes(d: dict) -> bytes:
return json.dumps(d).encode('utf-8')
def bytes_to_dict(b: bytes) -> dict:
return json.loads(b.decode('utf-8').rstrip('\\x00'))
def create_shared_memory(data: dict):
# 將字典轉換為bytes
data_bytes = dict_to_bytes(data)
# 創建共享內存
try:
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME, create=True, size=SHARED_MEMORY_SIZE)
except FileExistsError:
# 如果已存在,則先刪除再創建
shared_memory.SharedMemory(name=SHARED_MEMORY_NAME).unlink()
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME, create=True, size=SHARED_MEMORY_SIZE)
# 將數據寫入共享內存
shm.buf[:len(data_bytes)] = data_bytes
return shm
def get_shared_memory_data() -> dict:
try:
# 連接到現有的共享內存
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME)
# 讀取數據直到遇到空字節
data = bytes(shm.buf[:SHARED_MEMORY_SIZE])
return bytes_to_dict(data)
except FileNotFoundError:
return {}
def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args
@asynccontextmanager
async def lifespan(app: FastAPI):
# 從共享內存中讀取參數
app.state.args = get_shared_memory_data()
yield
# 應用關閉時清理共享內存
try:
shared_memory.SharedMemory(name=SHARED_MEMORY_NAME).unlink()
except FileNotFoundError:
pass
app = FastAPI(lifespan=lifespan)
@app.get('/args')
async def get_args(args: dict = Depends(get_api_shared_args)):
return {'args': args}
def run(args: dict):
# 創建共享內存並存儲參數
shm = create_shared_memory(args)
main_pid = os.getpid()
try:
uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=5,
)
finally:
if os.getpid() == main_pid:
try:
shm.close()
shm.unlink()
except FileNotFoundError:
pass
if __name__ == '__main__':
args = {'arg1': 'arg1'}
run(args)
答案是有的, 我們可以透過Memcached或者Redis會是比較好的選擇, 尤其是多台伺服器的情況之下, 但因為我們這邊僅針對單一伺服器底下又不想架設Proxy來分流時, 可以直接使用unicorn來完成多個worker的功能, 只是共享參數會有一些需要注意的地方。
這個案例我們也是卡關了好一段時間, 不明白明明app就寫的好好的呀, 怎麼會透過字串帶入unicorn之後就變了樣了呢? 原來是我們對於生命週期及多行程沒有理解到透徹才會這樣, 過程中也是不斷的詢問ChatGPT, 多虧了AI的指導讓我們能夠快速定位問題, 並找出解決方案, 善用AI是我們這個時代必備的技能了, 我們在「🔒 阿Han的軟體心法實戰營 - 📜 江湖一點訣」也會持續分享我們的開發經驗與AI時代下必要的技能, 歡迎一同加入學習, 共同成長。