2024-11-22|閱讀時間 ‧ 約 12 分鐘

使用 Docker 部署 Kafka 與 Node.js 實現訊息生產與消費

Kafka 是一個高效能、可擴展的分布式事件流平台,常用於處理大量的實時數據流。在這篇文章中將介紹如何使用 Docker 部署 Kafka 和 Zookeeper,並通過 Node.js 實現 Kafka 生產者與消費者的簡單範例,協助你快速理解 Kafka 的基本用法。

Kafka 和 Zookeeper 部署

Kafka 需要 Zookeeper 來協調和管理分布式環境中的服務狀態。可以使用以下的 docker-compose.yml 文件來搭建 Kafka 和 Zookeeper:

version: '3'

services:

zookeeper:

image: wurstmeister/zookeeper:latest

container_name: zookeeper

ports:

- "2181:2181"

kafka:

image: wurstmeister/kafka:latest

container_name: kafka

ports:

- "9093:9093"

environment:

KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://192.168.0.193:9093

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT

KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093

KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

depends_on:

- zookeeper

服務啟動後確認 Kafka 是否正常運行

啟動 Docker 容器後,可以使用以下命令來檢查 Kafka 是否成功啟動:

docker exec kafka kafka-topics.sh --list --bootstrap-server 192.168.0.193:9093

如果成功列出現有的 topic,則表示 Kafka 正常運行

使用 Kafka UI 監控消息發送情況

若希望視覺化監控 Kafka 的消息流動,可以使用 Kafka UI 工具來進行監控。使用以下命令來啟動 Kafka UI:

podman run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

這將啟動一個可以透過瀏覽器訪問的 UI,幫助你查看和管理 Kafka 中的 topic 和消息。

Node.js 與 Kafka 整合

安裝所需套件

在 Node.js 中,我們可以使用 KafkaJS 來操作 Kafka。首先需要安裝 kafkajs

npm install kafkajs

目錄結構

以下是範例專案的目錄結構:

project/
├── producer.js // Kafka 生產者
├── consumer.js // Kafka 消費者
├── server.js // Node.js 伺服器

Kafka 生產者 (producer.js)

Kafka 生產者負責將消息發送到 Kafka 的指定 topic 中。以下是簡單的 Kafka 生產者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const producer = kafka.producer();

const sendMessage = async (topic, messages) => {
await producer.connect();
await producer.send({
topic,
messages: Array.isArray(messages) ? messages.map((value) => ({ value })) : [{ value: messages }],
});
console.log(`Messages sent to topic: ${topic}`);
await producer.disconnect();
};

// 如果是直接執行,則進行測試發送
if (require.main === module) {
const topic = 'test-topic';
const messages = ['Hello Kafka!', 'This is a standalone message.'];

sendMessage(topic, messages)
.then(() => console.log('Producer finished successfully.'))
.catch((err) => console.error('Producer encountered an error:', err));
}

module.exports = { sendMessage };

Kafka 消費者 (consumer.js)

Kafka 消費者從 Kafka 中訂閱指定的 topic,並處理收到的消息。以下是簡單的 Kafka 消費者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const consumer = kafka.consumer({ groupId: 'test-group' });

const startConsumer = async (topic, messageHandler) => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });

await consumer.run({
eachMessage: async ({ message }) => {
messageHandler(message.value.toString());
},
});
console.log(`Consumer started for topic: ${topic}`);
};

// 如果是直接執行,則啟動並列印收到的訊息
if (require.main === module) {
const topic = 'test-topic';

startConsumer(topic, (message) => {
console.log(`Standalone Consumer received message: ${message}`);
}).catch((err) => console.error('Consumer encountered an error:', err));
}

module.exports = { startConsumer };

Node.js 伺服器 (server.js)

接下來,我們將使用 Express 框架啟動一個簡單的 HTTP 伺服器,並透過 API 路由觸發 Kafka 生產者和消費者。

const express = require('express');
const { sendMessage } = require('./producer');
const { startConsumer } = require('./consumer');

const app = express();
const PORT = 3000;
const topic = 'test-topic';

// API 路由:發送訊息
app.get('/send', async (req, res) => {
const message = req.query.message || 'Default message';
try {
await sendMessage(topic, [message]);
res.send(`Message "${message}" sent to Kafka.`);
} catch (error) {
console.error('Error sending message:', error);
res.status(500).send('Failed to send message.');
}
});

// API 路由:啟動消費者
app.get('/consume', async (req, res) => {
try {
await startConsumer(topic, (message) => {
console.log(`Received message: ${message}`);
});
res.send('Consumer started. Check logs for received messages.');
} catch (error) {
console.error('Error starting consumer:', error);
res.status(500).send('Failed to start consumer.');
}
});

// 啟動伺服器
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});

啟動服務

  1. 啟動 Kafka 伺服器,確保 Kafka 正常運行。
  2. 啟動 Kafka 消費者(選擇性,僅用於查看消息):
    node consumer.js
  3. 啟動 Express 伺服器:
    node server.js

測試 API

你可以透過以下 API 發送和接收 Kafka 消息:

注意事項

  1. Kafka 設定:確保 Kafka Broker 運行於 localhost:9092(或根據實際情況更新為正確的地址)。
  2. Topic 設定:若 test-topic 尚未建立,KafkaJS 會自動創建它。
  3. 錯誤處理:生產者和消費者應加入完整的錯誤處理邏輯,以應對網絡問題或 Kafka 錯誤。



這篇文章簡單介紹了如何在 Docker 中部署 Kafka 和 Zookeeper,並且提供了在 Node.js 中使用 Kafka 進行訊息生產與消費的範例。希望能幫助你快速上手 Kafka,並了解如何與 Node.js 整合。



分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.