使用 Docker 部署 Kafka 與 Node.js 實現訊息生產與消費

更新於 發佈於 閱讀時間約 12 分鐘
Kafka 是一個高效能、可擴展的分布式事件流平台,常用於處理大量的實時數據流。在這篇文章中將介紹如何使用 Docker 部署 Kafka 和 Zookeeper,並通過 Node.js 實現 Kafka 生產者與消費者的簡單範例,協助你快速理解 Kafka 的基本用法。

Kafka 和 Zookeeper 部署

Kafka 需要 Zookeeper 來協調和管理分布式環境中的服務狀態。可以使用以下的 docker-compose.yml 文件來搭建 Kafka 和 Zookeeper:

version: '3'

services:

zookeeper:

image: wurstmeister/zookeeper:latest

container_name: zookeeper

ports:

- "2181:2181"

kafka:

image: wurstmeister/kafka:latest

container_name: kafka

ports:

- "9093:9093"

environment:

KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://192.168.0.193:9093

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT

KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093

KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

depends_on:

- zookeeper

服務啟動後確認 Kafka 是否正常運行

啟動 Docker 容器後,可以使用以下命令來檢查 Kafka 是否成功啟動:

docker exec kafka kafka-topics.sh --list --bootstrap-server 192.168.0.193:9093

如果成功列出現有的 topic,則表示 Kafka 正常運行

使用 Kafka UI 監控消息發送情況

若希望視覺化監控 Kafka 的消息流動,可以使用 Kafka UI 工具來進行監控。使用以下命令來啟動 Kafka UI:

podman run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

這將啟動一個可以透過瀏覽器訪問的 UI,幫助你查看和管理 Kafka 中的 topic 和消息。

Node.js 與 Kafka 整合

安裝所需套件

在 Node.js 中,我們可以使用 KafkaJS 來操作 Kafka。首先需要安裝 kafkajs

npm install kafkajs

目錄結構

以下是範例專案的目錄結構:

project/
├── producer.js // Kafka 生產者
├── consumer.js // Kafka 消費者
├── server.js // Node.js 伺服器

Kafka 生產者 (producer.js)

Kafka 生產者負責將消息發送到 Kafka 的指定 topic 中。以下是簡單的 Kafka 生產者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const producer = kafka.producer();

const sendMessage = async (topic, messages) => {
await producer.connect();
await producer.send({
topic,
messages: Array.isArray(messages) ? messages.map((value) => ({ value })) : [{ value: messages }],
});
console.log(`Messages sent to topic: ${topic}`);
await producer.disconnect();
};

// 如果是直接執行,則進行測試發送
if (require.main === module) {
const topic = 'test-topic';
const messages = ['Hello Kafka!', 'This is a standalone message.'];

sendMessage(topic, messages)
.then(() => console.log('Producer finished successfully.'))
.catch((err) => console.error('Producer encountered an error:', err));
}

module.exports = { sendMessage };

Kafka 消費者 (consumer.js)

Kafka 消費者從 Kafka 中訂閱指定的 topic,並處理收到的消息。以下是簡單的 Kafka 消費者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const consumer = kafka.consumer({ groupId: 'test-group' });

const startConsumer = async (topic, messageHandler) => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });

await consumer.run({
eachMessage: async ({ message }) => {
messageHandler(message.value.toString());
},
});
console.log(`Consumer started for topic: ${topic}`);
};

// 如果是直接執行,則啟動並列印收到的訊息
if (require.main === module) {
const topic = 'test-topic';

startConsumer(topic, (message) => {
console.log(`Standalone Consumer received message: ${message}`);
}).catch((err) => console.error('Consumer encountered an error:', err));
}

module.exports = { startConsumer };

Node.js 伺服器 (server.js)

接下來,我們將使用 Express 框架啟動一個簡單的 HTTP 伺服器,並透過 API 路由觸發 Kafka 生產者和消費者。

const express = require('express');
const { sendMessage } = require('./producer');
const { startConsumer } = require('./consumer');

const app = express();
const PORT = 3000;
const topic = 'test-topic';

// API 路由:發送訊息
app.get('/send', async (req, res) => {
const message = req.query.message || 'Default message';
try {
await sendMessage(topic, [message]);
res.send(`Message "${message}" sent to Kafka.`);
} catch (error) {
console.error('Error sending message:', error);
res.status(500).send('Failed to send message.');
}
});

// API 路由:啟動消費者
app.get('/consume', async (req, res) => {
try {
await startConsumer(topic, (message) => {
console.log(`Received message: ${message}`);
});
res.send('Consumer started. Check logs for received messages.');
} catch (error) {
console.error('Error starting consumer:', error);
res.status(500).send('Failed to start consumer.');
}
});

// 啟動伺服器
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});

啟動服務

  1. 啟動 Kafka 伺服器,確保 Kafka 正常運行。
  2. 啟動 Kafka 消費者(選擇性,僅用於查看消息):
    node consumer.js
  3. 啟動 Express 伺服器:
    node server.js

測試 API

你可以透過以下 API 發送和接收 Kafka 消息:

注意事項

  1. Kafka 設定:確保 Kafka Broker 運行於 localhost:9092(或根據實際情況更新為正確的地址)。
  2. Topic 設定:若 test-topic 尚未建立,KafkaJS 會自動創建它。
  3. 錯誤處理:生產者和消費者應加入完整的錯誤處理邏輯,以應對網絡問題或 Kafka 錯誤。



這篇文章簡單介紹了如何在 Docker 中部署 Kafka 和 Zookeeper,並且提供了在 Node.js 中使用 Kafka 進行訊息生產與消費的範例。希望能幫助你快速上手 Kafka,並了解如何與 Node.js 整合。



留言
avatar-img
留言分享你的想法!
avatar-img
嘿洽啦
1會員
7內容數
軟體開發 & 金融投資的日常筆記
嘿洽啦的其他內容
2024/11/22
RabbitMQ 和 Kafka 是兩種流行的消息處理工具,各自擅長不同的應用場景。RabbitMQ 以低延遲和靈活的消息路由著稱,適合即時通信和微服務;Kafka 則專注於高吞吐量和數據持久化,適用於大規模數據流和實時分析。本文比較了它們的性能、擴展性和安全性,幫助你選擇最符合需求的解決方案。
Thumbnail
2024/11/22
RabbitMQ 和 Kafka 是兩種流行的消息處理工具,各自擅長不同的應用場景。RabbitMQ 以低延遲和靈活的消息路由著稱,適合即時通信和微服務;Kafka 則專注於高吞吐量和數據持久化,適用於大規模數據流和實時分析。本文比較了它們的性能、擴展性和安全性,幫助你選擇最符合需求的解決方案。
Thumbnail
2024/11/22
Message Queue 和 Streaming Process 是分佈式系統中最重要的技術之一,但它們的應用場景和特性有明顯區別。消息隊列適合可靠的低延遲通信,而流處理專注於大規模數據流的實時分析。本文深入比較兩者特性,幫助你根據需求選擇合適的技術,打造更高效的系統架構!
Thumbnail
2024/11/22
Message Queue 和 Streaming Process 是分佈式系統中最重要的技術之一,但它們的應用場景和特性有明顯區別。消息隊列適合可靠的低延遲通信,而流處理專注於大規模數據流的實時分析。本文深入比較兩者特性,幫助你根據需求選擇合適的技術,打造更高效的系統架構!
Thumbnail
2024/11/22
本文介紹如何使用 Docker 安裝 RabbitMQ,並將其與 Node.js 結合,實現訊息的生產與消費。這個架構可以應用於分佈式系統或事件驅動架構中,幫助讀者理解如何整合 RabbitMQ 與 Node.js。
Thumbnail
2024/11/22
本文介紹如何使用 Docker 安裝 RabbitMQ,並將其與 Node.js 結合,實現訊息的生產與消費。這個架構可以應用於分佈式系統或事件驅動架構中,幫助讀者理解如何整合 RabbitMQ 與 Node.js。
Thumbnail
看更多
你可能也想看
Thumbnail
「欸!這是在哪裡買的?求連結 🥺」 誰叫你太有品味,一發就讓大家跟著剁手手? 讓你回購再回購的生活好物,是時候該介紹出場了吧! 「開箱你的美好生活」現正召喚各路好物的開箱使者 🤩
Thumbnail
「欸!這是在哪裡買的?求連結 🥺」 誰叫你太有品味,一發就讓大家跟著剁手手? 讓你回購再回購的生活好物,是時候該介紹出場了吧! 「開箱你的美好生活」現正召喚各路好物的開箱使者 🤩
Thumbnail
我們在「【Message Queue - Kafka】串流時代的超入門簡介」有介紹到關於Kafka的基礎概念, 那麼本章節主要著重於生產者(Producer)的面向來細部探討, 看看生產者(Producer)究竟是什麼? 有哪些應該要注意的? 我們今天的主題除了說明生產者(Producer)的
Thumbnail
我們在「【Message Queue - Kafka】串流時代的超入門簡介」有介紹到關於Kafka的基礎概念, 那麼本章節主要著重於生產者(Producer)的面向來細部探討, 看看生產者(Producer)究竟是什麼? 有哪些應該要注意的? 我們今天的主題除了說明生產者(Producer)的
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
為什麼要用Docker安裝? Docker是一個容器化平台, 就類似於我們早期虛擬機的VMWare、Virtual Box…等, 虛擬機平台一般, 只是面向的是伺服端, 供企業快速、簡單、輕量的佈署開發完成的程式軟體, 並將相關的環境依賴皆封裝成一包所謂的映像檔(image), 透過這樣的方式減少
Thumbnail
為什麼要用Docker安裝? Docker是一個容器化平台, 就類似於我們早期虛擬機的VMWare、Virtual Box…等, 虛擬機平台一般, 只是面向的是伺服端, 供企業快速、簡單、輕量的佈署開發完成的程式軟體, 並將相關的環境依賴皆封裝成一包所謂的映像檔(image), 透過這樣的方式減少
Thumbnail
Kafka是一個先進的分佈式流處理平臺,具有高吞吐量、可擴展性、容錯性和低延遲特性,提供瞭解耦、非同步和削峰特點。本文介紹了Kafka的通訊模式、適合的應用場景和未來發展趨勢,旨在幫助使用者更好地理解和應用Kafka。
Thumbnail
Kafka是一個先進的分佈式流處理平臺,具有高吞吐量、可擴展性、容錯性和低延遲特性,提供瞭解耦、非同步和削峰特點。本文介紹了Kafka的通訊模式、適合的應用場景和未來發展趨勢,旨在幫助使用者更好地理解和應用Kafka。
Thumbnail
前言 上次我們針對 Docker 這樣容器化技術做了一點介紹,今天我們要來講解 Docker 架構,你是否發現在每次程式上伺服器的流程很麻煩呢 ? 是否發現你寫的程式在別的作業系統不能用呢 ? 如果你遇到這些問題,Docker 都可以幫助你解決這些問題 Docker 架構 在 Docker 這
Thumbnail
前言 上次我們針對 Docker 這樣容器化技術做了一點介紹,今天我們要來講解 Docker 架構,你是否發現在每次程式上伺服器的流程很麻煩呢 ? 是否發現你寫的程式在別的作業系統不能用呢 ? 如果你遇到這些問題,Docker 都可以幫助你解決這些問題 Docker 架構 在 Docker 這
Thumbnail
前言 大家好我們今天要來教 Docker 這項技術,什麼是 Docker ? Docker 可以幫助我們做什麼事情 ? Docker 是一項容器化技術,他可以降低我們在佈署 App 時,讓我們可以有效的分配作業系統資源,降低佈署作業成本,現在讓我們來了解 Docker 要解決的問題 傳統佈署遇
Thumbnail
前言 大家好我們今天要來教 Docker 這項技術,什麼是 Docker ? Docker 可以幫助我們做什麼事情 ? Docker 是一項容器化技術,他可以降低我們在佈署 App 時,讓我們可以有效的分配作業系統資源,降低佈署作業成本,現在讓我們來了解 Docker 要解決的問題 傳統佈署遇
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News