2024-11-22|閱讀時間 ‧ 約 0 分鐘

使用 Docker 部署 RabbitMQ 與 Node.js 實現訊息生產與消費

RabbitMQ 安裝與設定

我們將使用 Docker 安裝 RabbitMQ 並啟動服務。

1. 安裝 RabbitMQ: 使用以下命令來啟動 RabbitMQ 容器並設置必要的配置:

docker run -d --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13

這裡,-p 參數將 RabbitMQ 的端口映射到主機上:

  • 15672:RabbitMQ 管理界面
  • 5672:RabbitMQ 的 AMQP 協議端口
  • 5552:RabbitMQ 的管理插件端口

2.啟用 RabbitMQ 插件: 啟用流插件(rabbitmq_stream)和管理插件:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

3.確認服務是否運行: 打開瀏覽器,訪問 http://localhost:15672/,您應該能夠看到 RabbitMQ 的管理界面,並使用預設的帳戶(用戶名:admin,密碼:admin)進行登入。


Node.js 整合 RabbitMQ

在這一部分,我們將使用 Node.js 搭建一個簡單的應用,並通過 RabbitMQ 進行訊息的生產與消費。

1. 安裝必要的套件

在開始之前,需要安裝 expressamqplib 兩個套件:

npm install express amqplib

2. 創建生產者(Producer)與消費者(Consumer)

producer.js:負責將消息發送到 RabbitMQ 隊列。

// producer.js
const amqp = require('amqplib');

const QUEUE = 'test_queue';

async function sendMessageToQueue(message) {
try {
const connection = await amqp.connect('amqp://admin:admin@localhost:5672');
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE, { durable: false });

channel.sendToQueue(QUEUE, Buffer.from(message));
console.log(`[Producer] Message sent: ${message}`);
await channel.close();
await connection.close();
} catch (error) {
console.error('Error sending message to queue:', error);
}
}

module.exports = { sendMessageToQueue };

consumer.js:負責從 RabbitMQ 隊列中接收並處理消息。

// consumer.js
const amqp = require('amqplib');

const QUEUE = 'test_queue';

async function consumeMessages() {
try {
const connection = await amqp.connect('amqp://admin:admin@localhost:5672');
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE, { durable: false });

channel.consume(QUEUE, (msg) => {
if (msg) {
console.log(`[Consumer] Received message: ${msg.content.toString()}`);
channel.ack(msg); // 確認消息
}
}, { noAck: false });

console.log(`[Consumer] Waiting for messages in queue: "${QUEUE}"`);
} catch (error) {
console.error('Error consuming messages:', error);
}
}

module.exports = { consumeMessages };

3. 創建 API 服務

我們將在 server.js 中設置一個 Express 伺服器,並在其中創建一個路由來觸發生產者發送消息,同時啟動消費者來處理隊列中的消息。

server.js

// server.js
const express = require('express');
const { sendMessageToQueue } = require('./producer');
const { consumeMessages } = require('./consumer');

const app = express();
const port = 3000;

// 設置解析 JSON 請求體
app.use(express.json());

// API 路由 - 發送消息到 RabbitMQ
app.post('/send-message', async (req, res) => {
const { message } = req.body;

if (!message) {
return res.status(400).json({ error: 'Message is required' });
}

try {
await sendMessageToQueue(message);
res.status(200).json({ status: 'Message sent to queue', message });
} catch (error) {
res.status(500).json({ error: 'Failed to send message to queue' });
}
});

// 啟動消費者,開始處理消息
consumeMessages();

// 啟動 Express 伺服器
app.listen(port, () => {
console.log(`Server running at http://localhost:${port}`);
});

4. 測試整合

  • 啟動服務器: 執行以下命令啟動 Node.js 伺服器:
node server.js
  • 發送消息: 您可以使用 Postman 或其他 API 測試工具,向 http://localhost:3000/send-message 發送一個 POST 請求,請求體應包含 message 字段:
{
"message": "Hello RabbitMQ from API!"
}

如果消息成功發送,您將收到以下回應:

{
"status": "Message sent to queue",
"message": "Hello RabbitMQ from API!"
}
  • 消費者處理消息: 當消息發送到 RabbitMQ 隊列後,消費者會接收到消息並處理。您應該會在控制台中看到類似如下的輸出:
[Consumer] Received message: Hello RabbitMQ from API!

總結

  • 生產者(Producer)producer.js 負責將消息發送到 RabbitMQ 隊列。
  • 消費者(Consumer)consumer.js 用來從 RabbitMQ 隊列中接收並處理消息。
  • API 整合:在 server.js 中,我們提供了一個 API 路由來觸發消息生產,同時啟動消費者處理消息。

這個範例展示了如何將 RabbitMQ 與 Node.js 整合,建立一個簡單的生產者和消費者模型,並通過 API 發送消息。這樣的架構可應用於多種場景,例如事件驅動架構、分佈式系統等。






分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.