我們將使用 Docker 安裝 RabbitMQ 並啟動服務。
docker run -d --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.13
這裡,-p
參數將 RabbitMQ 的端口映射到主機上:
rabbitmq_stream
)和管理插件:docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
http://localhost:15672/
,您應該能夠看到 RabbitMQ 的管理界面,並使用預設的帳戶(用戶名:admin,密碼:admin)進行登入。在這一部分,我們將使用 Node.js 搭建一個簡單的應用,並通過 RabbitMQ 進行訊息的生產與消費。
在開始之前,需要安裝 express
和 amqplib
兩個套件:
npm install express amqplib
producer.js:負責將消息發送到 RabbitMQ 隊列。
// producer.js
const amqp = require('amqplib');
const QUEUE = 'test_queue';
async function sendMessageToQueue(message) {
try {
const connection = await amqp.connect('amqp://admin:admin@localhost:5672');
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE, { durable: false });
channel.sendToQueue(QUEUE, Buffer.from(message));
console.log(`[Producer] Message sent: ${message}`);
await channel.close();
await connection.close();
} catch (error) {
console.error('Error sending message to queue:', error);
}
}
module.exports = { sendMessageToQueue };
consumer.js:負責從 RabbitMQ 隊列中接收並處理消息。
// consumer.js
const amqp = require('amqplib');
const QUEUE = 'test_queue';
async function consumeMessages() {
try {
const connection = await amqp.connect('amqp://admin:admin@localhost:5672');
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE, { durable: false });
channel.consume(QUEUE, (msg) => {
if (msg) {
console.log(`[Consumer] Received message: ${msg.content.toString()}`);
channel.ack(msg); // 確認消息
}
}, { noAck: false });
console.log(`[Consumer] Waiting for messages in queue: "${QUEUE}"`);
} catch (error) {
console.error('Error consuming messages:', error);
}
}
module.exports = { consumeMessages };
我們將在 server.js
中設置一個 Express 伺服器,並在其中創建一個路由來觸發生產者發送消息,同時啟動消費者來處理隊列中的消息。
server.js:
// server.js
const express = require('express');
const { sendMessageToQueue } = require('./producer');
const { consumeMessages } = require('./consumer');
const app = express();
const port = 3000;
// 設置解析 JSON 請求體
app.use(express.json());
// API 路由 - 發送消息到 RabbitMQ
app.post('/send-message', async (req, res) => {
const { message } = req.body;
if (!message) {
return res.status(400).json({ error: 'Message is required' });
}
try {
await sendMessageToQueue(message);
res.status(200).json({ status: 'Message sent to queue', message });
} catch (error) {
res.status(500).json({ error: 'Failed to send message to queue' });
}
});
// 啟動消費者,開始處理消息
consumeMessages();
// 啟動 Express 伺服器
app.listen(port, () => {
console.log(`Server running at http://localhost:${port}`);
});
node server.js
http://localhost:3000/send-message
發送一個 POST 請求,請求體應包含 message
字段:{
"message": "Hello RabbitMQ from API!"
}
如果消息成功發送,您將收到以下回應:
{
"status": "Message sent to queue",
"message": "Hello RabbitMQ from API!"
}
[Consumer] Received message: Hello RabbitMQ from API!
producer.js
負責將消息發送到 RabbitMQ 隊列。consumer.js
用來從 RabbitMQ 隊列中接收並處理消息。server.js
中,我們提供了一個 API 路由來觸發消息生產,同時啟動消費者處理消息。這個範例展示了如何將 RabbitMQ 與 Node.js 整合,建立一個簡單的生產者和消費者模型,並通過 API 發送消息。這樣的架構可應用於多種場景,例如事件驅動架構、分佈式系統等。