Kafka 是一個高效能、可擴展的分布式事件流平台,常用於處理大量的實時數據流。在這篇文章中將介紹如何使用 Docker 部署 Kafka 和 Zookeeper,並通過 Node.js 實現 Kafka 生產者與消費者的簡單範例,協助你快速理解 Kafka 的基本用法。
Kafka 需要 Zookeeper 來協調和管理分布式環境中的服務狀態。可以使用以下的 docker-compose.yml
文件來搭建 Kafka 和 Zookeeper:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://192.168.0.193:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
啟動 Docker 容器後,可以使用以下命令來檢查 Kafka 是否成功啟動:
docker exec kafka kafka-topics.sh --list --bootstrap-server 192.168.0.193:9093
如果成功列出現有的 topic,則表示 Kafka 正常運行
若希望視覺化監控 Kafka 的消息流動,可以使用 Kafka UI 工具來進行監控。使用以下命令來啟動 Kafka UI:
podman run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui
這將啟動一個可以透過瀏覽器訪問的 UI,幫助你查看和管理 Kafka 中的 topic 和消息。
在 Node.js 中,我們可以使用 KafkaJS
來操作 Kafka。首先需要安裝 kafkajs
:
npm install kafkajs
以下是範例專案的目錄結構:
project/
├── producer.js // Kafka 生產者
├── consumer.js // Kafka 消費者
├── server.js // Node.js 伺服器
Kafka 生產者負責將消息發送到 Kafka 的指定 topic 中。以下是簡單的 Kafka 生產者程式碼:
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});
const producer = kafka.producer();
const sendMessage = async (topic, messages) => {
await producer.connect();
await producer.send({
topic,
messages: Array.isArray(messages) ? messages.map((value) => ({ value })) : [{ value: messages }],
});
console.log(`Messages sent to topic: ${topic}`);
await producer.disconnect();
};
// 如果是直接執行,則進行測試發送
if (require.main === module) {
const topic = 'test-topic';
const messages = ['Hello Kafka!', 'This is a standalone message.'];
sendMessage(topic, messages)
.then(() => console.log('Producer finished successfully.'))
.catch((err) => console.error('Producer encountered an error:', err));
}
module.exports = { sendMessage };
Kafka 消費者從 Kafka 中訂閱指定的 topic,並處理收到的消息。以下是簡單的 Kafka 消費者程式碼:
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});
const consumer = kafka.consumer({ groupId: 'test-group' });
const startConsumer = async (topic, messageHandler) => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
messageHandler(message.value.toString());
},
});
console.log(`Consumer started for topic: ${topic}`);
};
// 如果是直接執行,則啟動並列印收到的訊息
if (require.main === module) {
const topic = 'test-topic';
startConsumer(topic, (message) => {
console.log(`Standalone Consumer received message: ${message}`);
}).catch((err) => console.error('Consumer encountered an error:', err));
}
module.exports = { startConsumer };
接下來,我們將使用 Express 框架啟動一個簡單的 HTTP 伺服器,並透過 API 路由觸發 Kafka 生產者和消費者。
const express = require('express');
const { sendMessage } = require('./producer');
const { startConsumer } = require('./consumer');
const app = express();
const PORT = 3000;
const topic = 'test-topic';
// API 路由:發送訊息
app.get('/send', async (req, res) => {
const message = req.query.message || 'Default message';
try {
await sendMessage(topic, [message]);
res.send(`Message "${message}" sent to Kafka.`);
} catch (error) {
console.error('Error sending message:', error);
res.status(500).send('Failed to send message.');
}
});
// API 路由:啟動消費者
app.get('/consume', async (req, res) => {
try {
await startConsumer(topic, (message) => {
console.log(`Received message: ${message}`);
});
res.send('Consumer started. Check logs for received messages.');
} catch (error) {
console.error('Error starting consumer:', error);
res.status(500).send('Failed to start consumer.');
}
});
// 啟動伺服器
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});
node consumer.js
node server.js
你可以透過以下 API 發送和接收 Kafka 消息:
localhost:9092
(或根據實際情況更新為正確的地址)。test-topic
尚未建立,KafkaJS 會自動創建它。這篇文章簡單介紹了如何在 Docker 中部署 Kafka 和 Zookeeper,並且提供了在 Node.js 中使用 Kafka 進行訊息生產與消費的範例。希望能幫助你快速上手 Kafka,並了解如何與 Node.js 整合。