新手實戰 RabbitMQ:使用 Python 實作五種常見架構

更新於 發佈於 閱讀時間約 23 分鐘
RabbitMQ 賽博龐克風格 - by ChatGPT-5

RabbitMQ 賽博龐克風格 - by ChatGPT-5

上篇『新手實戰 RabbitMQ:安裝與 Server 的建立』我們將 RabbitMQ Server 成功啟動,本篇實現幾種常見的使用模式:

  1. Simple(簡單模式)
  2. Work Queue(工作佇列)
  3. Publish/Subscribe(發布/訂閱)
  4. Routing(路由模式)
  5. Topic(主題模式)

以上的模式都會使用 Python 來進行,完整的程式碼也會在我的 GitHub 上,那我們就開始吧!


在此之前,我們先把 RabbitMQ 的 Server 開啟

啟動 RabbitMQ Server

啟動 RabbitMQ Server

我們回到程式內,開啟你習慣使用的 IDE,這邊我使用的是 VSCode。開啟一個新專案、建立虛擬環境
*虛擬環境的指令 python -m venv (環境名稱),因為不是重點就不展開說明了

首先安裝必要的套件

pip install pika
安裝 pika 套件

安裝 pika 套件

確認安裝後就 OK 了,我們接著介紹五種常見模式:

  • Simple
    Simple 模式是 RabbitMQ 最基本的訊息傳遞模式,適合在普通一對一的場景。這個模式下一個 Producer 傳輸到 Queue 內,Consumer 再接收訊息並處理,訊息是透過預設的 Exchange = '' 直接傳輸,沒有任何限制。

    我們要分別完成 Producer 與 Consumer,這邊建立兩個 .py 檔
# Producer(Simple)
import pika

# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 宣告一個 queue
channel.queue_declare(queue='hello')

# 發送訊息到 queue
message = 'Hello RabbitMQ!'
channel.basic_publish(exchange='',
                     routing_key='hello',
                     body=message)

print(f" [x] Sent '{message}'")

# 關閉連線
connection.close()
# Producer(Simple)
import pika

# 處理接收的消息
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")

# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 宣告一個 queue (與 send.py 中相同)
channel.queue_declare(queue='hello')

# 設定 consumer,使用 callback 處理接收到的消息
channel.basic_consume(queue='hello',
                      on_message_callback=callback,
                      auto_ack=True)

# 開始處理訊息
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

接著開兩個 Terminal 依序執行

python consumer.py​
啟動 consumer - simple

啟動 consumer - simple

python producer.py
啟動 producer - simple

啟動 producer - simple

此時再回頭看 consumer 執行的 Terminal,可以發現收到訊息了

consumer 接收訊息 - simple

consumer 接收訊息 - simple

以上就是最基礎的 RabbitMQ 運行模式,當然在實務上很少這麼簡易,可能會加上 Exchange 或是 routing_key 來指定傳送的 Queue,以下我們接著介紹其他方法。

  • Work Queue
    Work Queue 是 RabbitMQ 用於任務分發的模式,允許將訊息分發給多個 Consumer(這裡稱為 worker 比較正確)。此方法在實務上常用於大量或耗時任務的場景中。

    這邊一樣建立兩個 .py 檔
# Producer(Work Queue)
import pika
import sys

# 連接到 RabbitMQ 服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 宣告一個 queue
channel.queue_declare(queue='task_queue', durable=True)

# 發送訊息到 queue
message = ' '.join(sys.argv[1:]) or "Hello RabbitMQ!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue', # 指定 queue 名稱
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # 設定消息持久化,避免關閉 Server 後資料遺失
    ))

print(f" [x] Sent '{message}'")

# 關閉連線
connection.close()
# Consumer(Work Queue)
import pika
import time

def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
   
# 模擬處理時間
    time.sleep(body.count(b'.'))  
    print(" [x] Done")

    # 確認消息已被處理
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 連接到 RabbitMQ 服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 宣告一個持久化的 queue
channel.queue_declare(queue='task_queue', durable=True)

# 設定公平分發,限制每次只處理一條訊息
channel.basic_qos(prefetch_count=1)

# 開始處理訊息
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

為了模擬多工的場景,我們開啟三個 Terminal,其中兩個是 Cousumer;一個是 Producer。

首先,執行 Producer.py 把 task_queue 建立起來,然後再分別開啟兩個Consumer.py,此時會發現其中一個 Consumer 已收到訊息:

consumer 多工模擬 1 - Work queue

consumer 多工模擬 1 - Work queue

我們再執行一次 Producer 送出訊息,可以發現另一邊也收到,這樣即達成分流效果,可以多次執行觀察

consumer 多工模擬  2 - Work queue

consumer 多工模擬 2 - Work queue

接著介紹第三種模式

  • Pub/Sub
    發布/訂閱(Publish/Subscribe, 簡稱 Pub/Sub),這個模式跟第二種很像,只是它會把訊息發送到所有訂閱的 Consumer 手上,而非只有其中一個。使用的場景就是像訊息即時通知、推播類型的分發,每個訂閱的 Consumer 都會有一份相同的資料。一樣新增兩個 .py:
# Producer(Pub/Sub)
import pika
import sys

# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個 exchange,類型為 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 從命令列參數獲取訊息,若無則使用預設訊息
message = ' '.join(sys.argv[1:]) or "info: Hello RabbitMQ!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent '{message}'")

# 關閉連線
connection.close()
# Consumer(Pub/Sub)
import pika

# 處理訊息
def callback(ch, method, properties, body):
    print(f" [x] Received '{body.decode()}'")

# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個 exchange,類型為 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 創建一個臨時佇列(獨立且隨機生成)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 將佇列綁定到交換器
channel.queue_bind(exchange='logs', queue=queue_name)

# 設定回調函數並開始消費訊息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

一樣開啟三個 Terminal 來觀察狀態,啟動兩個 Consumer(訂閱者)

訂閱者模擬 1 - Pub/Sub

訂閱者模擬 1 - Pub/Sub

接著啟用 Producer 丟出訊息

訂閱者模擬 2 -Pub/Sub

訂閱者模擬 2 -Pub/Sub

可以看到,雖然只發送了一次訊息,但兩個 Consumer 都成功接收到。這主要是將Exchange 設定為 fanout,確保訊息會推播到所有綁定的 Consumer 上。

當然還有其他的進階參數可以調整,像是 routing_key 用來控制符合條件的 Consumer 才能收到對應訊息。

  • Routing
    Routing 的模式是使用直接交換器(Direct Exchange),允許 Producer 根據不同的 routing key 將訊息發送到特定的 Queue,Consumer 再選擇特定匹配的訊息,從而過濾、分配訊息到不同地方。適用的場景像是發生警報的嚴重性分級(info、warning、error),程式碼如下:
# Producer(Routing)
import pika
import sys

# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個直接交換器
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 從命令列參數獲取路由鍵和訊息
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello RabbitMQ!"
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent '{severity}:{message}'")

# 關閉連線
connection.close()
# Consumer(Routing)
import pika
import sys

# 處理訊息
def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body.decode()}")

# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個直接交換器
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 創建一個 queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 從命令列參數獲取匹配的 routing keys
severities = sys.argv[1:] if len(sys.argv) > 1 else ['info']
for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

# 處理消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

一樣,開啟三個 Terminal,這邊執行需要攜帶參數,先執行兩個 Consumer,後面記得攜帶參數:
*如果直接執行 Consumer 沒反應的話,可以先執行一次 Producer 試試看,有可能是宣告問題

# 只接收 info 有關的消息
python​ consumer.py info

# 只接收 error 有關的消息
python consumer.py error​
配對 routing key 1 - Routing

配對 routing key 1 - Routing

執行後我們就有只接收 info 與只接收 error 的 Consumer,接著在第三個 Terminal 輸入:

python producer.py info "info test"
python producer.py error "error test"
配對 routing key 2 - Routing

配對 routing key 2 - Routing

輸入後可以發現兩個 Consumer 會接收對應的訊息。接下來說明最後一種

  • Topics
    Topics 模式主要使用主題交換器(Topic Exchange),也使用 routing key 來做匹配,與 Routing 比較不同的是使用的 key 有點像模糊匹配的概念,類似於 *.info 可以匹配到 A.info 與 B.info;或是 info.# 可以匹配到 info.A 與 info.A.B。
    *Routing 採用完全匹配,Topics 則支援部分模糊匹配。

    以下是程式範例:
# Producer(Topic)
import pika
import sys

# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個主題交換器
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 從命令列參數獲取路由鍵和訊息
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or "Hello RabbitMQ!"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent '{routing_key}:{message}'")

# 關閉連線
connection.close()
# Consumer(Topic)
import pika
import sys

# 處理訊息
def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body.decode()}")

# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告一個主題交換器
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 創建一個臨時 queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 從命令列參數獲取匹配的 routing keys
binding_keys = sys.argv[1:] if len(sys.argv) > 1 else ['#.info']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

# 處理訊息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

Routing 類似,開啟三個 Terminal,其中兩個設定不同 routing key:

python consumer.py "*.info"
python consumer.py "#.error"
不同的 routing key 1  - Topics

不同的 routing key 1 - Topics

接著在第三個 Terminal 輸入:

python producer.py auth.info "Login info"​
python producer.py gate.A.info "Gate A error"
不同的 routing key 2  - Topics

不同的 routing key 2 - Topics

就可以看到匹配後的結果,分別收到對應的資料。

相信看到這邊的讀者,已經對 RabbitMQ 的運作機制有了基本的理解。從簡單的 Simple 一對一通訊開始,再到 Work Queue 的負載分配、Pub/Sub 的推播模式、Routing 的精準配對,以及 Topics 靈活的模糊匹配。

當然,實務應用中可能遇到更複雜的需求,不過只要基於這五種模式的基礎,相信再怎麼變化也難不倒你們。此系列暫時在這邊告一個段落,未來有使用到類似的技術會再加開番外篇,感謝看到這邊的各位,謝謝!

留言
avatar-img
留言分享你的想法!
avatar-img
Alan的沙龍
0會員
12內容數
不定期技術文章、旅遊、人生見解分享
Alan的沙龍的其他內容
2025/08/02
上篇『零基礎入門 RabbitMQ:Producer、Exchange、Queue 和 Consumer 的基本介紹』我們初步認識了 RabbitMQ 的核心部件,本篇我們從安裝到建立 RabbitMQ Server 都會詳細介紹。
Thumbnail
2025/08/02
上篇『零基礎入門 RabbitMQ:Producer、Exchange、Queue 和 Consumer 的基本介紹』我們初步認識了 RabbitMQ 的核心部件,本篇我們從安裝到建立 RabbitMQ Server 都會詳細介紹。
Thumbnail
2025/07/26
RabbitMQ 是一個基於 AMQP 協定的開源訊息佇列應用程式,用於接收、暫存和傳遞訊息。 本文介紹 RabbitMQ 的核心概念,包括 Producer、Exchange、Queue 和 Consumer 的角色和功能。
Thumbnail
2025/07/26
RabbitMQ 是一個基於 AMQP 協定的開源訊息佇列應用程式,用於接收、暫存和傳遞訊息。 本文介紹 RabbitMQ 的核心概念,包括 Producer、Exchange、Queue 和 Consumer 的角色和功能。
Thumbnail
2025/07/19
本篇文章介紹如何使用 HeidiSQL 備份 MySQL 資料庫,除了說明透過圖形介面手動匯出資料之外,也實作了利用 Python 腳本結合 mysqldump 指令,實現每日自動備份的流程,包含參數設定、錯誤處理與檔案命名範例。文章適合希望將備份自動化的開發者參考使用。
Thumbnail
2025/07/19
本篇文章介紹如何使用 HeidiSQL 備份 MySQL 資料庫,除了說明透過圖形介面手動匯出資料之外,也實作了利用 Python 腳本結合 mysqldump 指令,實現每日自動備份的流程,包含參數設定、錯誤處理與檔案命名範例。文章適合希望將備份自動化的開發者參考使用。
Thumbnail
看更多
你可能也想看
Thumbnail
常常被朋友問「哪裡買的?」嗎?透過蝦皮分潤計畫,把日常購物的分享多加一個步驟,就能轉換成現金回饋。門檻低、申請簡單,特別適合學生與上班族,讓零碎時間也能創造小確幸。
Thumbnail
常常被朋友問「哪裡買的?」嗎?透過蝦皮分潤計畫,把日常購物的分享多加一個步驟,就能轉換成現金回饋。門檻低、申請簡單,特別適合學生與上班族,讓零碎時間也能創造小確幸。
Thumbnail
嗨!歡迎來到 vocus vocus 方格子是台灣最大的內容創作與知識變現平台,並且計畫持續拓展東南亞等等國際市場。我們致力於打造讓創作者能夠自由發表、累積影響力並獲得實質收益的創作生態圈!「創作至上」是我們的核心價值,我們致力於透過平台功能與服務,賦予創作者更多的可能。 vocus 平台匯聚了
Thumbnail
嗨!歡迎來到 vocus vocus 方格子是台灣最大的內容創作與知識變現平台,並且計畫持續拓展東南亞等等國際市場。我們致力於打造讓創作者能夠自由發表、累積影響力並獲得實質收益的創作生態圈!「創作至上」是我們的核心價值,我們致力於透過平台功能與服務,賦予創作者更多的可能。 vocus 平台匯聚了
Thumbnail
打開 jupyter notebook 寫一段 python 程式,可以完成五花八門的工作,這是玩程式最簡便的方式,其中可以獲得很多快樂,在現今這種資訊發達的時代,幾乎沒有門檻,只要願意,人人可享用。 下一步,希望程式可以隨時待命聽我吩咐,不想每次都要開電腦,啟動開發環境,只為完成一個重複性高
Thumbnail
打開 jupyter notebook 寫一段 python 程式,可以完成五花八門的工作,這是玩程式最簡便的方式,其中可以獲得很多快樂,在現今這種資訊發達的時代,幾乎沒有門檻,只要願意,人人可享用。 下一步,希望程式可以隨時待命聽我吩咐,不想每次都要開電腦,啟動開發環境,只為完成一個重複性高
Thumbnail
在網路速度有限的情況下,依序記錄不斷產生的資訊,能統計使用者在頁面上操作了哪些功能。
Thumbnail
在網路速度有限的情況下,依序記錄不斷產生的資訊,能統計使用者在頁面上操作了哪些功能。
Thumbnail
利用文字紀錄,明確寫下自己的採購項目......
Thumbnail
利用文字紀錄,明確寫下自己的採購項目......
Thumbnail
Function的使用方式
Thumbnail
Function的使用方式
Thumbnail
以銷售解決方案為目標的網路軟體業務們
Thumbnail
以銷售解決方案為目標的網路軟體業務們
Thumbnail
觀察者模式透過主題訂閱/訊息通知的機制,極度增強系統的可擴展性、靈活性以及降低組件間的耦合度。概念直觀簡單,是非常實用的設計模式。
Thumbnail
觀察者模式透過主題訂閱/訊息通知的機制,極度增強系統的可擴展性、靈活性以及降低組件間的耦合度。概念直觀簡單,是非常實用的設計模式。
Thumbnail
#底層邏輯 #百萬網紅也是這樣教 單純製作短視頻的順序~(簡要版) . #規劃與觀察 第一、選擇賽道、觀察對手先開帳號去跟有興趣的帳號互動,了解不同平台熱門的賽道,進而選擇適合自己的賽道找出競品差異,建立精準的人設。 . #決心很重要 第二、準備設備和決心其實穩定器、指向麥克風、手機幾乎就是全部的
Thumbnail
#底層邏輯 #百萬網紅也是這樣教 單純製作短視頻的順序~(簡要版) . #規劃與觀察 第一、選擇賽道、觀察對手先開帳號去跟有興趣的帳號互動,了解不同平台熱門的賽道,進而選擇適合自己的賽道找出競品差異,建立精準的人設。 . #決心很重要 第二、準備設備和決心其實穩定器、指向麥克風、手機幾乎就是全部的
Thumbnail
當我們在撰寫一套系統的時候, 總是會提供一個介面讓使用者來觸發功能模組並回傳使用者所需的請求, 而傳統的安裝包模式總是太侷限, 需要個別主機獨立安裝, 相當繁瑣, 但隨著時代的演進與互聯網的崛起, 大部分的工作都可以藉由網頁端、裝置端來觸發, 而伺服端則是負責接收指令、運算與回傳結果, 雲端
Thumbnail
當我們在撰寫一套系統的時候, 總是會提供一個介面讓使用者來觸發功能模組並回傳使用者所需的請求, 而傳統的安裝包模式總是太侷限, 需要個別主機獨立安裝, 相當繁瑣, 但隨著時代的演進與互聯網的崛起, 大部分的工作都可以藉由網頁端、裝置端來觸發, 而伺服端則是負責接收指令、運算與回傳結果, 雲端
Thumbnail
提到後端工程師,似乎就只是開發 API,但一個複雜的系統其實不太可能只透過 API 就能完成,例如一個簡單的功能,註冊會員,其實是由好幾個不同類型的工作互相配合,您才能收到開通信,才確保資料庫不會有一堆未開通帳號等。所以今天就來聊聊一個系統有幾種不同執行方式的工作。
Thumbnail
提到後端工程師,似乎就只是開發 API,但一個複雜的系統其實不太可能只透過 API 就能完成,例如一個簡單的功能,註冊會員,其實是由好幾個不同類型的工作互相配合,您才能收到開通信,才確保資料庫不會有一堆未開通帳號等。所以今天就來聊聊一個系統有幾種不同執行方式的工作。
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News