【Message Queue - RabbitMQ】訊息的如何插隊?(Priority)

【Message Queue - RabbitMQ】訊息的如何插隊?(Priority)

更新於 發佈於 閱讀時間約 7 分鐘

圖片來源

雖然我們的一般的狀況下我們都希望訊息能夠按照順序被處理, 但有時候我們仍希望某些重要的訊息能夠優先被處理,也就是插隊的概念,正好RabbitMQ也有提供這樣需求的解決方案,以下是需要知道的幾個重點。

  • 宣告Queue的時候必須要設定 x-max-priority, 通常10就夠用了。
  • 數字越大優先序越高。
  • 一般不要設定太大,因為這算是特殊狀況,正常狀況下應該照順序來處理。
  • 沒有指定優先序的訊息會預設為0。
  • 生產端發送的訊息優先序若超過配置的最大值,則會以最大值做為優先序的標示。

配置範例

這邊會以管理界面作為範例, 其他配置方式則大同小異, 可以參考官方文件。

  • 首先進到管理界面的Queues,並加入新的Queue,並配置優先序最大號碼為20。
  • 參數的欄位填上: x-max-priority:20 。
  • 型態記得要選number。

圖片來源

  • 配置完我們的Queue會有Pri的標記。
  • 接著我們將Queue綁定到交換機上。

圖片來源

開始進行實驗

  • 實驗方向:依照訊息重要程度分為1~20個號碼, 雖然我們先送出一個19號的訊息,再送出20號的訊息,但正確堆積在佇列的訊息應該要確保20號的訊息優先於19號。
  • Producer端會送出1、5、3、20的訊息。
(async () => {
const conn = require("amqplib").connect(
"amqp://test:test@127.0.0.1:5672/test"
);
const channel = await (await conn).createChannel();
const priorityNums = [1, 5, 3, 20];
const routingKey = "priority";
const exchange = "qa_exchange_direct";
for (let i = 0; i < priorityNums.length; i++) {
const priority = priorityNums[i];
const msg = `Priority_${priority}`;
channel.publish(exchange, routingKey, Buffer.from(msg), {
priority,
});
console.log(`Send Message: ${msg}`)
}
})();Send Message: Priority_1
Send Message: Priority_5
Send Message: Priority_3
Send Message: Priority_20
  • Comsumer端則會依照優先序接收20、5、3、1的訊息
(async() => {
const conn = require('amqplib').connect('amqp://inuqa:inuqa@127.0.0.1:5672/inuqa');
const channel = await (await conn).createChannel();
await channel.prefetch(1); // 最多能消費unacked的數量
channel.consume("優先序隊列", (msg) => {
setTimeout(() => {
console.log(msg.content.toString());
channel.ack(msg);
}, 1000)
}, {
noAck: false
})
})()Priority_20
Priority_5
Priority_3
Priority_1

注意事項

  • RabbitMQ預設是不支援優先序的, 因為每個隊列的每個優先級都會有一些CPU跟磁碟的開銷成本。
  • 官方建議不要使用Policy的方式來配置, 因為Policy是動態的, 可以在Queue宣告後進行參數的修改, 但優先序隊列在宣告後就無法更改其數量。
  • 假設一開始Queue沒有指定優先序,後續要加上優先序會需要將原本的Queue移除再重新宣告,因此最好一開始規劃功能時就將這一塊給考慮進去!!!!!!!!!!

如何取得訊息中某個優先序號碼剩下的數量?

雖然有了優先序可以幫我們依照訊息的重要程度來分配工作,但就應用面上來看, 我們可能會需要去掌握某個優先序號碼在Queue裡面還剩下幾個未處理, 此時我們可以藉由Management Plugin的API來進行查詢, 查詢方式如下:

<http://$>{ip}/api/queues/${vhost}/${隊列名稱}
// <http://127.0.0.1:8083/api/queues/test/優先序隊列>

GET之後會得到以下的Response, 我們只需要取backing_queue_status.priority_lengths.${優先序號碼}, 就能知道剩餘數量有多少:

{
...,
"backing_queue_status": {
...,
"priority_lengths": {
"0": 0,
"1": 2,
"2": 0,
"3": 2,
"4": 0,
"5": 2,
"6": 0,
"7": 0,
"8": 0,
"9": 0,
"10": 0,
"11": 0,
"12": 0,
"13": 0,
"14": 0,
"15": 0,
"16": 0,
"17": 0,
"18": 0,
"19": 0,
"20": 2
},
...
},
...
}
avatar-img
阿Han的沙龍
126會員
280內容數
哈囉,我是阿Han,是一位 👩‍💻 軟體研發工程師,喜歡閱讀、學習、撰寫文章及教學,擅長以圖代文,化繁為簡,除了幫助自己釐清思路之外,也希望藉由圖解的方式幫助大家共同學習,甚至手把手帶您設計出高品質的軟體產品。
留言
avatar-img
留言分享你的想法!
阿Han的沙龍 的其他內容
相信對於這一篇感興趣的朋友們都已經玩過kafka的Schema Registry了吧! 沒玩過得朋友也沒關係, 歡迎至「【🔒Message Queue - Kafka】傳輸訊息的標準格式制定者 Schema Registry」了解一下這是什麼玩意兒, 好了, 廢話不多說, 讓我們直接切入主題吧
為什麼要用Docker安裝? Docker是一個容器化平台, 就類似於我們早期虛擬機的VMWare、Virtual Box…等, 虛擬機平台一般, 只是面向的是伺服端, 供企業快速、簡單、輕量的佈署開發完成的程式軟體, 並將相關的環境依賴皆封裝成一包所謂的映像檔(image), 透過這樣的方式減少因
對於軟體世界中Message Queue有興趣的朋友可以先閱讀這一篇「【資訊軟體知識】井然有序的處理機制 - Message Queue」建立基礎知識之後,再來看看這一篇會更容易進入情境唷! 這次就進入我們一般常見的MQ軟體「RabbitMQ」, 我們先以圖示來了解RabbitMQ的模型架構, 之後
相信對於這一篇感興趣的朋友們都已經玩過kafka的Schema Registry了吧! 沒玩過得朋友也沒關係, 歡迎至「【🔒Message Queue - Kafka】傳輸訊息的標準格式制定者 Schema Registry」了解一下這是什麼玩意兒, 好了, 廢話不多說, 讓我們直接切入主題吧
為什麼要用Docker安裝? Docker是一個容器化平台, 就類似於我們早期虛擬機的VMWare、Virtual Box…等, 虛擬機平台一般, 只是面向的是伺服端, 供企業快速、簡單、輕量的佈署開發完成的程式軟體, 並將相關的環境依賴皆封裝成一包所謂的映像檔(image), 透過這樣的方式減少因
對於軟體世界中Message Queue有興趣的朋友可以先閱讀這一篇「【資訊軟體知識】井然有序的處理機制 - Message Queue」建立基礎知識之後,再來看看這一篇會更容易進入情境唷! 這次就進入我們一般常見的MQ軟體「RabbitMQ」, 我們先以圖示來了解RabbitMQ的模型架構, 之後