【💊 Python的解憂錦囊 - FastAPI】多個worker如何共享數據?

閱讀時間約 13 分鐘

要如何使用unicorn啟動多個FastAPI服務, 歡迎參考我們的「【💊 Python的解憂錦囊 - FastAPI】如何啟動多個Workers」。


當我們試著設計帶入模組化時…

我們在「【💊 Python的解憂錦囊 - FastAPI】使用 lifespan 來共享資料與管理生命週期」有分享FastAPI如何共享資料, 我們依樣畫葫蘆來試著寫一段代碼如下:

import uvicorn
from contextlib import asynccontextmanager

from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection

def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args

def run(args: dict):
@asynccontextmanager
async def lifespan(app: FastAPI):
# 載入ML模型
app.state.args = args
yield



app = FastAPI(lifespan=lifespan)

@app.get("/args")
async def get_args(args: dict = Depends(get_api_shared_args)):
return {"args": args}

uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=1,
)

if __name__ == '__main__':
# 假設這邊我們設計了args使用者輸入參數, 可能會是來自其他模組的輸入參數或者callback, 所以我們會將args放在這
args = {
'arg1': 'arg1'
}
run(args)


執行後會發生找不到模組的狀況:

raw-image


這是因為我們的app被包在函數裡面了, 所以才會這樣, 那我們再試著改改, 把所有app邏輯搬移到最外層:

import uvicorn
from contextlib import asynccontextmanager
from typing import Dict

from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection

# 創建一個全局變量來存儲args
_shared_args: Dict = {}

def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args

@asynccontextmanager
async def lifespan(app: FastAPI):
# 使用全局變量中的args
app.state.args = _shared_args
yield

app = FastAPI(lifespan=lifespan)

@app.get('/args')
async def get_args(args: dict = Depends(get_api_shared_args)):
return {'args': args}

def run(args: dict):
# 將args存儲到全局變量
global _shared_args
_shared_args = args

uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=1,
)

if __name__ == '__main__':
args = {'arg1': 'arg1'}
run(args)


但一樣存取不到我們的 _shared_args:

raw-image



⭐ 我們可以怎麼解決?

大致上的架構會如下圖, 我們會先透過multiprocessing的shared_memory來儲存我們需要共享的參數, 並封裝成IPCManager, 最後藉由fastapi的lifespan來傳遞共同參數到各個API裡面。

raw-image


上圖有一個IPCManager主要是將SharedMemory抽出去獨立一個模組, 這部份就交由各為自行抽離囉, 如有任何問題歡迎留言討論, 我們這邊為了讓您容易理解, 會將所有代碼都盡量放在同一個模組之中,我們也不希望需要的朋友單純只是複製貼上, 而是要能夠真正吸收, 內化成自己的知識點。

import os
import uvicorn
import json
from contextlib import asynccontextmanager
from typing import Dict
from multiprocessing import shared_memory
import numpy as np

from fastapi import FastAPI, Depends
from fastapi.requests import HTTPConnection

# 創建共享內存的名稱
SHARED_MEMORY_NAME = "fastapi_args"
# 預設分配的共享內存大小(根據需要調整)
SHARED_MEMORY_SIZE = 1024

def dict_to_bytes(d: dict) -> bytes:
return json.dumps(d).encode('utf-8')

def bytes_to_dict(b: bytes) -> dict:
return json.loads(b.decode('utf-8').rstrip('\\x00'))

def create_shared_memory(data: dict):
# 將字典轉換為bytes
data_bytes = dict_to_bytes(data)
# 創建共享內存
try:
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME, create=True, size=SHARED_MEMORY_SIZE)
except FileExistsError:
# 如果已存在,則先刪除再創建
shared_memory.SharedMemory(name=SHARED_MEMORY_NAME).unlink()
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME, create=True, size=SHARED_MEMORY_SIZE)

# 將數據寫入共享內存
shm.buf[:len(data_bytes)] = data_bytes
return shm

def get_shared_memory_data() -> dict:
try:
# 連接到現有的共享內存
shm = shared_memory.SharedMemory(name=SHARED_MEMORY_NAME)
# 讀取數據直到遇到空字節
data = bytes(shm.buf[:SHARED_MEMORY_SIZE])
return bytes_to_dict(data)
except FileNotFoundError:
return {}

def get_api_shared_args(connection: HTTPConnection) -> dict:
return connection.app.state.args

@asynccontextmanager
async def lifespan(app: FastAPI):
# 從共享內存中讀取參數
app.state.args = get_shared_memory_data()
yield
# 應用關閉時清理共享內存
try:
shared_memory.SharedMemory(name=SHARED_MEMORY_NAME).unlink()
except FileNotFoundError:
pass

app = FastAPI(lifespan=lifespan)

@app.get('/args')
async def get_args(args: dict = Depends(get_api_shared_args)):
return {'args': args}

def run(args: dict):
# 創建共享內存並存儲參數
shm = create_shared_memory(args)
main_pid = os.getpid()

try:
uvicorn.run(
'test:app',
host='0.0.0.0',
port=9880,
workers=5,
)
finally:
if os.getpid() == main_pid:
try:
shm.close()
shm.unlink()
except FileNotFoundError:
pass

if __name__ == '__main__':
args = {'arg1': 'arg1'}
run(args)



除了SharedMemory之外還有別的方法嗎?

答案是有的, 我們可以透過Memcached或者Redis會是比較好的選擇, 尤其是多台伺服器的情況之下, 但因為我們這邊僅針對單一伺服器底下又不想架設Proxy來分流時, 可以直接使用unicorn來完成多個worker的功能, 只是共享參數會有一些需要注意的地方。


結語

這個案例我們也是卡關了好一段時間, 不明白明明app就寫的好好的呀, 怎麼會透過字串帶入unicorn之後就變了樣了呢? 原來是我們對於生命週期及多行程沒有理解到透徹才會這樣, 過程中也是不斷的詢問ChatGPT, 多虧了AI的指導讓我們能夠快速定位問題, 並找出解決方案, 善用AI是我們這個時代必備的技能了, 我們在「🔒 阿Han的軟體心法實戰營 - 📜 江湖一點訣」也會持續分享我們的開發經驗與AI時代下必要的技能, 歡迎一同加入學習, 共同成長。

avatar-img
118會員
264內容數
哈囉,我是阿Han,是一位 👩‍💻 軟體研發工程師,喜歡閱讀、學習、撰寫文章及教學,擅長以圖代文,化繁為簡,除了幫助自己釐清思路之外,也希望藉由圖解的方式幫助大家共同學習,甚至手把手帶您設計出高品質的軟體產品。
留言0
查看全部
avatar-img
發表第一個留言支持創作者!
阿Han的沙龍 的其他內容
我們在「【🔒 Python API框架篇 - FastAPI】Ep.1 啟航」有說明如何使用uvicorn來啟動FastAPI服務, 假設今天我們的API是一個CPU密集型的運算服務時, 通常我們會希望開啟多個行程來幫忙處理, 那麼大致上的撰寫方式會像這樣: app = FastAPI( ti
MinIO 是一個高性能的物件存儲系統,設計用於大規模的數據存儲需求, 甚至是各種非結構化數據也都能往這邊儲存, 也支持群集擴展, 非常適合正在尋找儲存方案的朋友們。 我們在「【💎 Message Queue - Kafka 案例篇】如何將檔案流上傳到minio - 完整檔案 」介紹了如
訊息的即時傳遞已然成為現代社會的趨勢了, 影音也是如此, 即時! 即時! 即時! 已經是目前使用者體驗的必要元素了, 在這邊我們要分享的主題是如何在python程式語言的情境下使用ffmpeg來將音檔串流的轉換格式, 為什麼會有這樣的需求呢? 因為我們處理音檔時可能會需要統一輸出的格式, 當然背後也
我們在學習kafka的過程中最不習慣的就是不管什麼樣的資料, 在kafka的傳輸過程都會是binary的資料格式, 因此我們在撰寫程式的過程中並不是那麼的直觀, 必須將資料從float、int…資料型態轉型成binary才能順利傳送, 那麼基於這樣的前提之下, python這套程式語言可以怎麼做
情境描述 我們在「🔒 阿Han的軟體心法實戰營 - kafka」有關於kafka的教學文章, 那麼在開發過程中我們遇到了 👻 詭異事件, 那就是我們嘗試在做一個檔案串流時, 發現Producer明明傳送了大約16MB檔案大小的封包到kafka, 每一包約(1024 * 1024 ) bytes
更快、更短、更即時是串流傳輸必要的元素, 而我們常常在使用Python請求API時都是等待式回應, 也就是一個請求過去之後, 待對方處理完畢後再行回應, 但假設需要下載的檔案、內容非常大時, 是不是使用者只能傻傻的等待整個傳輸結束後才能顯示? 這樣的使用者體驗也實在太糟糕了, 對於使用者來說除了完全
我們在「【🔒 Python API框架篇 - FastAPI】Ep.1 啟航」有說明如何使用uvicorn來啟動FastAPI服務, 假設今天我們的API是一個CPU密集型的運算服務時, 通常我們會希望開啟多個行程來幫忙處理, 那麼大致上的撰寫方式會像這樣: app = FastAPI( ti
MinIO 是一個高性能的物件存儲系統,設計用於大規模的數據存儲需求, 甚至是各種非結構化數據也都能往這邊儲存, 也支持群集擴展, 非常適合正在尋找儲存方案的朋友們。 我們在「【💎 Message Queue - Kafka 案例篇】如何將檔案流上傳到minio - 完整檔案 」介紹了如
訊息的即時傳遞已然成為現代社會的趨勢了, 影音也是如此, 即時! 即時! 即時! 已經是目前使用者體驗的必要元素了, 在這邊我們要分享的主題是如何在python程式語言的情境下使用ffmpeg來將音檔串流的轉換格式, 為什麼會有這樣的需求呢? 因為我們處理音檔時可能會需要統一輸出的格式, 當然背後也
我們在學習kafka的過程中最不習慣的就是不管什麼樣的資料, 在kafka的傳輸過程都會是binary的資料格式, 因此我們在撰寫程式的過程中並不是那麼的直觀, 必須將資料從float、int…資料型態轉型成binary才能順利傳送, 那麼基於這樣的前提之下, python這套程式語言可以怎麼做
情境描述 我們在「🔒 阿Han的軟體心法實戰營 - kafka」有關於kafka的教學文章, 那麼在開發過程中我們遇到了 👻 詭異事件, 那就是我們嘗試在做一個檔案串流時, 發現Producer明明傳送了大約16MB檔案大小的封包到kafka, 每一包約(1024 * 1024 ) bytes
更快、更短、更即時是串流傳輸必要的元素, 而我們常常在使用Python請求API時都是等待式回應, 也就是一個請求過去之後, 待對方處理完畢後再行回應, 但假設需要下載的檔案、內容非常大時, 是不是使用者只能傻傻的等待整個傳輸結束後才能顯示? 這樣的使用者體驗也實在太糟糕了, 對於使用者來說除了完全
你可能也想看
Google News 追蹤
Thumbnail
嘿,大家新年快樂~ 新年大家都在做什麼呢? 跨年夜的我趕工製作某個外包設計案,在工作告一段落時趕上倒數。 然後和兩個小孩過了一個忙亂的元旦。在深夜時刻,看到朋友傳來的解籤網站,興致勃勃熬夜體驗了一下,覺得非常好玩,或許有人玩過了,但還是想寫上來分享紀錄一下~
Thumbnail
相信大家現在都有在使用網銀的習慣 以前因為打工和工作的關係,我辦過的網銀少說也有5、6間,可以說在使用網銀App方面我可以算是個老手了。 最近受邀參加國泰世華CUBE App的使用測試 嘿嘿~殊不知我本身就有在使用他們的App,所以這次的受測根本可以說是得心應手
上兩篇有關List的文章,此篇文上兩章的延續,整理一些常用的方法和操作。 [Python]List(列表)新增、修改、刪除元素 [Python基礎]容器 list(列表),tuple(元組) 還有一些常用的 list 方法和操作,讓你能更靈活地處理列表數據
Thumbnail
打開 jupyter notebook 寫一段 python 程式,可以完成五花八門的工作,這是玩程式最簡便的方式,其中可以獲得很多快樂,在現今這種資訊發達的時代,幾乎沒有門檻,只要願意,人人可享用。 下一步,希望程式可以隨時待命聽我吩咐,不想每次都要開電腦,啟動開發環境,只為完成一個重複性高
Thumbnail
更快、更短、更即時是串流傳輸必要的元素, 而我們常常在使用Python請求API時都是等待式回應, 也就是一個請求過去之後, 待對方處理完畢後再行回應, 但假設需要下載的檔案、內容非常大時, 是不是使用者只能傻傻的等待整個傳輸結束後才能顯示? 這樣的使用者體驗也實在太糟糕了, 對於使用者來說除了完全
Thumbnail
我們在「【🔒 Python API框架篇 - FastAPI】Ep.1 啟航」有分享 FastAPI 這套API框架, 那麼當我們想要在應用程式剛執行時就註冊一些事件或者共享GPU運算模型、變數…等,當整個應用程式關閉時也進行釋放作業, 這樣的一個週期循環就是所謂的生命週期, 而在FastAPI這
Thumbnail
關於FastAPI這個框架為什麼有什麼樣的優勢, 為什麼會這麼熱門? 歡迎參考「【Python 技術選型】如何選出適合的API框架呢?」。 站在巨人的肩膀上 FastAPI主要基於以下兩個重要的元件組成, Starlette與Pydantic, 就讓我們來看看兩者的關係吧! 安裝 pip
Thumbnail
當你需要在 Python 中執行多個任務,但又不希望它們相互阻塞時,可以使用 threading 模組。 threading 模組允許你在單個程序中創建多個執行緒,這些執行緒可以同時運行,從而實現並行執行多個任務的效果。
Thumbnail
當我們在撰寫一套系統的時候, 總是會提供一個介面讓使用者來觸發功能模組並回傳使用者所需的請求, 而傳統的安裝包模式總是太侷限, 需要個別主機獨立安裝, 相當繁瑣, 但隨著時代的演進與互聯網的崛起, 大部分的工作都可以藉由網頁端、裝置端來觸發, 而伺服端則是負責接收指令、運算與回傳結果, 雲端
Thumbnail
先前幾篇筆記介紹了網路請求,瀏覽器儲存資料的方式,那麼實務上,前端最常需要發送網路請求的時候,就是透過呼叫 API,去向後端工程師發送/請求資料,所以今天來記錄什麼是 API吧!
Thumbnail
關於多執行緒/多行程的使用方式 在Python 3.2版本之後加入了「concurrent.futures」啟動平行任務, 它可以更好的讓我們管理多執行緒/多行程的應用場景,讓我們在面對這種併發問題時可以不必害怕, 用一個非常簡單的方式就能夠處裡, 底下我們將為您展示一段程式碼: imp
Thumbnail
提到後端工程師,似乎就只是開發 API,但一個複雜的系統其實不太可能只透過 API 就能完成,例如一個簡單的功能,註冊會員,其實是由好幾個不同類型的工作互相配合,您才能收到開通信,才確保資料庫不會有一堆未開通帳號等。所以今天就來聊聊一個系統有幾種不同執行方式的工作。
Thumbnail
嘿,大家新年快樂~ 新年大家都在做什麼呢? 跨年夜的我趕工製作某個外包設計案,在工作告一段落時趕上倒數。 然後和兩個小孩過了一個忙亂的元旦。在深夜時刻,看到朋友傳來的解籤網站,興致勃勃熬夜體驗了一下,覺得非常好玩,或許有人玩過了,但還是想寫上來分享紀錄一下~
Thumbnail
相信大家現在都有在使用網銀的習慣 以前因為打工和工作的關係,我辦過的網銀少說也有5、6間,可以說在使用網銀App方面我可以算是個老手了。 最近受邀參加國泰世華CUBE App的使用測試 嘿嘿~殊不知我本身就有在使用他們的App,所以這次的受測根本可以說是得心應手
上兩篇有關List的文章,此篇文上兩章的延續,整理一些常用的方法和操作。 [Python]List(列表)新增、修改、刪除元素 [Python基礎]容器 list(列表),tuple(元組) 還有一些常用的 list 方法和操作,讓你能更靈活地處理列表數據
Thumbnail
打開 jupyter notebook 寫一段 python 程式,可以完成五花八門的工作,這是玩程式最簡便的方式,其中可以獲得很多快樂,在現今這種資訊發達的時代,幾乎沒有門檻,只要願意,人人可享用。 下一步,希望程式可以隨時待命聽我吩咐,不想每次都要開電腦,啟動開發環境,只為完成一個重複性高
Thumbnail
更快、更短、更即時是串流傳輸必要的元素, 而我們常常在使用Python請求API時都是等待式回應, 也就是一個請求過去之後, 待對方處理完畢後再行回應, 但假設需要下載的檔案、內容非常大時, 是不是使用者只能傻傻的等待整個傳輸結束後才能顯示? 這樣的使用者體驗也實在太糟糕了, 對於使用者來說除了完全
Thumbnail
我們在「【🔒 Python API框架篇 - FastAPI】Ep.1 啟航」有分享 FastAPI 這套API框架, 那麼當我們想要在應用程式剛執行時就註冊一些事件或者共享GPU運算模型、變數…等,當整個應用程式關閉時也進行釋放作業, 這樣的一個週期循環就是所謂的生命週期, 而在FastAPI這
Thumbnail
關於FastAPI這個框架為什麼有什麼樣的優勢, 為什麼會這麼熱門? 歡迎參考「【Python 技術選型】如何選出適合的API框架呢?」。 站在巨人的肩膀上 FastAPI主要基於以下兩個重要的元件組成, Starlette與Pydantic, 就讓我們來看看兩者的關係吧! 安裝 pip
Thumbnail
當你需要在 Python 中執行多個任務,但又不希望它們相互阻塞時,可以使用 threading 模組。 threading 模組允許你在單個程序中創建多個執行緒,這些執行緒可以同時運行,從而實現並行執行多個任務的效果。
Thumbnail
當我們在撰寫一套系統的時候, 總是會提供一個介面讓使用者來觸發功能模組並回傳使用者所需的請求, 而傳統的安裝包模式總是太侷限, 需要個別主機獨立安裝, 相當繁瑣, 但隨著時代的演進與互聯網的崛起, 大部分的工作都可以藉由網頁端、裝置端來觸發, 而伺服端則是負責接收指令、運算與回傳結果, 雲端
Thumbnail
先前幾篇筆記介紹了網路請求,瀏覽器儲存資料的方式,那麼實務上,前端最常需要發送網路請求的時候,就是透過呼叫 API,去向後端工程師發送/請求資料,所以今天來記錄什麼是 API吧!
Thumbnail
關於多執行緒/多行程的使用方式 在Python 3.2版本之後加入了「concurrent.futures」啟動平行任務, 它可以更好的讓我們管理多執行緒/多行程的應用場景,讓我們在面對這種併發問題時可以不必害怕, 用一個非常簡單的方式就能夠處裡, 底下我們將為您展示一段程式碼: imp
Thumbnail
提到後端工程師,似乎就只是開發 API,但一個複雜的系統其實不太可能只透過 API 就能完成,例如一個簡單的功能,註冊會員,其實是由好幾個不同類型的工作互相配合,您才能收到開通信,才確保資料庫不會有一堆未開通帳號等。所以今天就來聊聊一個系統有幾種不同執行方式的工作。