2023-04-29|閱讀時間 ‧ 約 7 分鐘

【Message Queue - RabbitMQ】訊息的如何插隊?(Priority)

雖然我們的一般的狀況下我們都希望訊息能夠按照順序被處理, 但有時候我們仍希望某些重要的訊息能夠優先被處理,也就是插隊的概念,正好RabbitMQ也有提供這樣需求的解決方案,以下是需要知道的幾個重點。
  • 宣告Queue的時候必須要設定 x-max-priority, 通常10就夠用了。
  • 數字越大優先序越高。
  • 一般不要設定太大,因為這算是特殊狀況,正常狀況下應該照順序來處理。
  • 沒有指定優先序的訊息會預設為0。
  • 生產端發送的訊息優先序若超過配置的最大值,則會以最大值做為優先序的標示。

配置範例

這邊會以管理界面作為範例, 其他配置方式則大同小異, 可以參考官方文件。
  • 首先進到管理界面的Queues,並加入新的Queue,並配置優先序最大號碼為20。
  • 參數的欄位填上: x-max-priority:20 。
  • 型態記得要選number。
  • 配置完我們的Queue會有Pri的標記。
  • 接著我們將Queue綁定到交換機上。

開始進行實驗

  • 實驗方向:依照訊息重要程度分為1~20個號碼, 雖然我們先送出一個19號的訊息,再送出20號的訊息,但正確堆積在佇列的訊息應該要確保20號的訊息優先於19號。
  • Producer端會送出1、5、3、20的訊息。
(async () => {
 const conn = require("amqplib").connect(
   "amqp://test:test@127.0.0.1:5672/test"
 );
 const channel = await (await conn).createChannel();
 const priorityNums = [1, 5, 3, 20];
 const routingKey = "priority";
 const exchange = "qa_exchange_direct";
 for (let i = 0; i < priorityNums.length; i++) {
   const priority = priorityNums[i];
   const msg = `Priority_${priority}`;
   channel.publish(exchange, routingKey, Buffer.from(msg), {
     priority,
   });
   console.log(`Send Message: ${msg}`)
 }
})();Send Message: Priority_1
Send Message: Priority_5
Send Message: Priority_3
Send Message: Priority_20
  • Comsumer端則會依照優先序接收20、5、3、1的訊息
(async() => {
   const conn = require('amqplib').connect('amqp://inuqa:inuqa@127.0.0.1:5672/inuqa');
   const channel = await (await conn).createChannel();
   await channel.prefetch(1); // 最多能消費unacked的數量
   channel.consume("優先序隊列", (msg) => {
       setTimeout(() => {
           console.log(msg.content.toString());
           channel.ack(msg);
       }, 1000)
   }, {
       noAck: false
   })
})()Priority_20
Priority_5
Priority_3
Priority_1

注意事項

  • RabbitMQ預設是不支援優先序的, 因為每個隊列的每個優先級都會有一些CPU跟磁碟的開銷成本。
  • 官方建議不要使用Policy的方式來配置, 因為Policy是動態的, 可以在Queue宣告後進行參數的修改, 但優先序隊列在宣告後就無法更改其數量。
  • 假設一開始Queue沒有指定優先序,後續要加上優先序會需要將原本的Queue移除再重新宣告,因此最好一開始規劃功能時就將這一塊給考慮進去!!!!!!!!!!

如何取得訊息中某個優先序號碼剩下的數量?

雖然有了優先序可以幫我們依照訊息的重要程度來分配工作,但就應用面上來看, 我們可能會需要去掌握某個優先序號碼在Queue裡面還剩下幾個未處理, 此時我們可以藉由Management Plugin的API來進行查詢, 查詢方式如下:
<http://$>{ip}/api/queues/${vhost}/${隊列名稱}
// <http://127.0.0.1:8083/api/queues/test/優先序隊列>
GET之後會得到以下的Response, 我們只需要取backing_queue_status.priority_lengths.${優先序號碼}, 就能知道剩餘數量有多少:
{
 ...,
 "backing_queue_status": {
   ...,
   "priority_lengths": {
     "0": 0,
     "1": 2,
     "2": 0,
     "3": 2,
     "4": 0,
     "5": 2,
     "6": 0,
     "7": 0,
     "8": 0,
     "9": 0,
     "10": 0,
     "11": 0,
     "12": 0,
     "13": 0,
     "14": 0,
     "15": 0,
     "16": 0,
     "17": 0,
     "18": 0,
     "19": 0,
     "20": 2
   },
    ...
 },
 ...
}
分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.