
RabbitMQ 賽博龐克風格 - by ChatGPT-5
上篇『新手實戰 RabbitMQ:安裝與 Server 的建立』我們將 RabbitMQ Server 成功啟動,本篇實現幾種常見的使用模式:
- Simple(簡單模式)
- Work Queue(工作佇列)
- Publish/Subscribe(發布/訂閱)
- Routing(路由模式)
- Topic(主題模式)
以上的模式都會使用 Python 來進行,完整的程式碼也會在我的 GitHub 上,那我們就開始吧!
在此之前,我們先把 RabbitMQ 的 Server 開啟

啟動 RabbitMQ Server
我們回到程式內,開啟你習慣使用的 IDE,這邊我使用的是 VSCode。開啟一個新專案、建立虛擬環境
*虛擬環境的指令 python -m venv (環境名稱),因為不是重點就不展開說明了
首先安裝必要的套件
pip install pika

安裝 pika 套件
確認安裝後就 OK 了,我們接著介紹五種常見模式:
- Simple
Simple 模式是 RabbitMQ 最基本的訊息傳遞模式,適合在普通一對一的場景。這個模式下一個 Producer 傳輸到 Queue 內,Consumer 再接收訊息並處理,訊息是透過預設的 Exchange = '' 直接傳輸,沒有任何限制。
我們要分別完成 Producer 與 Consumer,這邊建立兩個 .py 檔
# Producer(Simple)
import pika
# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 宣告一個 queue
channel.queue_declare(queue='hello')
# 發送訊息到 queue
message = 'Hello RabbitMQ!'
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(f" [x] Sent '{message}'")
# 關閉連線
connection.close()
# Producer(Simple)
import pika
# 處理接收的消息
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 宣告一個 queue (與 send.py 中相同)
channel.queue_declare(queue='hello')
# 設定 consumer,使用 callback 處理接收到的消息
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
# 開始處理訊息
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
接著開兩個 Terminal 依序執行
python consumer.py

啟動 consumer - simple
python producer.py

啟動 producer - simple
此時再回頭看 consumer 執行的 Terminal,可以發現收到訊息了

consumer 接收訊息 - simple
以上就是最基礎的 RabbitMQ 運行模式,當然在實務上很少這麼簡易,可能會加上 Exchange 或是 routing_key 來指定傳送的 Queue,以下我們接著介紹其他方法。
- Work Queue
Work Queue 是 RabbitMQ 用於任務分發的模式,允許將訊息分發給多個 Consumer(這裡稱為 worker 比較正確)。此方法在實務上常用於大量或耗時任務的場景中。
這邊一樣建立兩個 .py 檔
# Producer(Work Queue)
import pika
import sys
# 連接到 RabbitMQ 服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 宣告一個 queue
channel.queue_declare(queue='task_queue', durable=True)
# 發送訊息到 queue
message = ' '.join(sys.argv[1:]) or "Hello RabbitMQ!"
channel.basic_publish(
exchange='',
routing_key='task_queue', # 指定 queue 名稱
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 設定消息持久化,避免關閉 Server 後資料遺失
))
print(f" [x] Sent '{message}'")
# 關閉連線
connection.close()
# Consumer(Work Queue)
import pika
import time
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 模擬處理時間
time.sleep(body.count(b'.'))
print(" [x] Done")
# 確認消息已被處理
ch.basic_ack(delivery_tag=method.delivery_tag)
# 連接到 RabbitMQ 服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 宣告一個持久化的 queue
channel.queue_declare(queue='task_queue', durable=True)
# 設定公平分發,限制每次只處理一條訊息
channel.basic_qos(prefetch_count=1)
# 開始處理訊息
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
為了模擬多工的場景,我們開啟三個 Terminal,其中兩個是 Cousumer;一個是 Producer。
首先,執行 Producer.py 把 task_queue 建立起來,然後再分別開啟兩個Consumer.py,此時會發現其中一個 Consumer 已收到訊息:

consumer 多工模擬 1 - Work queue
我們再執行一次 Producer 送出訊息,可以發現另一邊也收到,這樣即達成分流效果,可以多次執行觀察

consumer 多工模擬 2 - Work queue
接著介紹第三種模式
- Pub/Sub
發布/訂閱(Publish/Subscribe, 簡稱 Pub/Sub),這個模式跟第二種很像,只是它會把訊息發送到所有訂閱的 Consumer 手上,而非只有其中一個。使用的場景就是像訊息即時通知、推播類型的分發,每個訂閱的 Consumer 都會有一份相同的資料。一樣新增兩個 .py:
# Producer(Pub/Sub)
import pika
import sys
# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個 exchange,類型為 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 從命令列參數獲取訊息,若無則使用預設訊息
message = ' '.join(sys.argv[1:]) or "info: Hello RabbitMQ!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent '{message}'")
# 關閉連線
connection.close()
# Consumer(Pub/Sub)
import pika
# 處理訊息
def callback(ch, method, properties, body):
print(f" [x] Received '{body.decode()}'")
# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個 exchange,類型為 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 創建一個臨時佇列(獨立且隨機生成)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 將佇列綁定到交換器
channel.queue_bind(exchange='logs', queue=queue_name)
# 設定回調函數並開始消費訊息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
一樣開啟三個 Terminal 來觀察狀態,啟動兩個 Consumer(訂閱者)

訂閱者模擬 1 - Pub/Sub
接著啟用 Producer 丟出訊息

訂閱者模擬 2 -Pub/Sub
可以看到,雖然只發送了一次訊息,但兩個 Consumer 都成功接收到。這主要是將Exchange 設定為 fanout,確保訊息會推播到所有綁定的 Consumer 上。
當然還有其他的進階參數可以調整,像是 routing_key 用來控制符合條件的 Consumer 才能收到對應訊息。
- Routing
Routing 的模式是使用直接交換器(Direct Exchange),允許 Producer 根據不同的 routing key 將訊息發送到特定的 Queue,Consumer 再選擇特定匹配的訊息,從而過濾、分配訊息到不同地方。適用的場景像是發生警報的嚴重性分級(info、warning、error),程式碼如下:
# Producer(Routing)
import pika
import sys
# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個直接交換器
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 從命令列參數獲取路由鍵和訊息
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello RabbitMQ!"
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent '{severity}:{message}'")
# 關閉連線
connection.close()
# Consumer(Routing)
import pika
import sys
# 處理訊息
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
# 建立到 RabbitMQ 的連線
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個直接交換器
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 創建一個 queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 從命令列參數獲取匹配的 routing keys
severities = sys.argv[1:] if len(sys.argv) > 1 else ['info']
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
# 處理消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
一樣,開啟三個 Terminal,這邊執行需要攜帶參數,先執行兩個 Consumer,後面記得攜帶參數:
*如果直接執行 Consumer 沒反應的話,可以先執行一次 Producer 試試看,有可能是宣告問題
# 只接收 info 有關的消息
python consumer.py info
# 只接收 error 有關的消息
python consumer.py error

配對 routing key 1 - Routing
執行後我們就有只接收 info 與只接收 error 的 Consumer,接著在第三個 Terminal 輸入:
python producer.py info "info test"
python producer.py error "error test"

配對 routing key 2 - Routing
輸入後可以發現兩個 Consumer 會接收對應的訊息。接下來說明最後一種
- Topics
Topics 模式主要使用主題交換器(Topic Exchange),也使用 routing key 來做匹配,與 Routing 比較不同的是使用的 key 有點像模糊匹配的概念,類似於 *.info 可以匹配到 A.info 與 B.info;或是 info.# 可以匹配到 info.A 與 info.A.B。
*Routing 採用完全匹配,Topics 則支援部分模糊匹配。
以下是程式範例:
# Producer(Topic)
import pika
import sys
# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個主題交換器
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 從命令列參數獲取路由鍵和訊息
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or "Hello RabbitMQ!"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent '{routing_key}:{message}'")
# 關閉連線
connection.close()
# Consumer(Topic)
import pika
import sys
# 處理訊息
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
# 連接到 RabbitMQ 伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告一個主題交換器
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 創建一個臨時 queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 從命令列參數獲取匹配的 routing keys
binding_keys = sys.argv[1:] if len(sys.argv) > 1 else ['#.info']
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
# 處理訊息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
與 Routing 類似,開啟三個 Terminal,其中兩個設定不同 routing key:
python consumer.py "*.info"
python consumer.py "#.error"

不同的 routing key 1 - Topics
接著在第三個 Terminal 輸入:
python producer.py auth.info "Login info"
python producer.py gate.A.info "Gate A error"

不同的 routing key 2 - Topics
就可以看到匹配後的結果,分別收到對應的資料。
相信看到這邊的讀者,已經對 RabbitMQ 的運作機制有了基本的理解。從簡單的 Simple 一對一通訊開始,再到 Work Queue 的負載分配、Pub/Sub 的推播模式、Routing 的精準配對,以及 Topics 靈活的模糊匹配。
當然,實務應用中可能遇到更複雜的需求,不過只要基於這五種模式的基礎,相信再怎麼變化也難不倒你們。此系列暫時在這邊告一個段落,未來有使用到類似的技術會再加開番外篇,感謝看到這邊的各位,謝謝!