如何在 AWS 使用 SASL/SCRAM 建立 MSK

閱讀時間約 3 分鐘

👨‍💻簡介

Amazon Managed Streaming for Apache Kafka(Amazon MSK)是 Amazon 推出的完全代管、具有高可用以及安全的 Apache Kafka 服務。在這篇文章中,會使用 AWS 建立 MSK,並使用 SASL/SCRAM 的驗證方式來完成整個設定。

🎯setup

建立叢集

進入到 MSK 頁面,然後創建叢集

raw-image
  1. 叢集設定

創建時選自訂建立,叢集類型我這邊使用的是已佈建,規格可以選最小的,版本使用 AWS 建議的,區域數量為3,達到高可用,最後叢集組態先使用預設即可

raw-image
raw-image
raw-image

2. 聯網

聯網的部分可以使用 AWS 幫你建立好的預設 VPC,我在前一步選擇三個可用區,因此會需要有三條不同區域的子網,安全群組也選擇預設的即可

raw-image

3. 安全

安全這邊選擇 SASL/SCRAM 身分驗證,加密靜態資料選擇使用 AWS 受管金鑰,待會會提到

SASL/SCRAM(Simple Authentication andSecurity Layer/ Salted Challenge Response Mechanism),是一種使用帳號和密碼來完成驗證的方式,但它的前提是 Client 端與 Proxy 之間必須使用 TLS 加密。

raw-image

4. 監控和標籤

這裡監控只要選基本的即可

raw-image

接著等到叢集建立完成後開始設定密鑰,建立叢集大概需要15分鐘

建立KMS Key

AmazonMSK 使用 Amazon Secret Manager 來儲存 MSK 使用的帳號與密碼。在設定 Secret Manager 之前,必須先在 Amazon KMS(Key Management Service) 創建一個使用者託管的 Key。

KMS是 Amazon 託管的密鑰管理服務

raw-image
  1. 設定金鑰

建立過程都先選默認的,如果後續要調整可以再改。

raw-image

2. 新增標籤

別名叫kafka-kms

raw-image

3. 定義金鑰管理許可

選擇自己的 IAM user

raw-image

4. 定義金鑰使用許可

一樣選擇自己的 IAM user 即可

raw-image

回到主頁就可以看到自己的金鑰建立完成了

raw-image

建立 SecretsManager

Amazon Secrets Manager 是密碼管理服務,可以把敏感資料儲存在這,不需要放在程式裡,可以透過 API 調用取得,避免在程式外洩。 AmazonMSK 通過 Secrets Manager 來儲存帳號與密碼。

這裡要注意一點,使用預設 AWS KMS 金鑰建立的密碼無法與 Amazon MSK 叢集搭配使用。

raw-image
raw-image
  1. 選擇機密類型
    在 Secrets Manager 中選擇其他類型的密碼,這邊要打上我們需要驗證的 username 以及 password,加密的金鑰要選擇前面建立的 KMS 密鑰。
raw-image

2. 設定機密

在建立密碼時,密碼名稱必須包含前綴為 AmazonMSK_

raw-image

3. 設定輪換

選擇預設即可

raw-image

關聯密鑰到 MSK Cluster

回到主頁面的屬性,往下滑找到關聯機密,選擇剛剛建立的機密即可

raw-image

設定權限

建立 topic 之前,我們需要先透過 Broker 去修改帳號的權限,接著開放公開存取。 在相同子網路上建立 ec2,透過私有連線下去修改權限,先去 Client 端主機下載相對應版本的 Kafka,可以透過 [此連結](Apache Kafka)查看。

wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

下載完成後解壓縮。

tar -xzf kafka_2.13-3.8.0.tgz

如果機器上沒有安裝 java 也一併安裝

sudo yum -y install java-11

接著要先取得 BootstrapConnectString,將使用者權限設定完成。 在 MSK 主頁面點擊檢視用戶端資訊可以取得 BootstrapConnectString。

raw-image
raw-image

再來到 Client 端機器上的 kafka 目錄下,因為在執行權限設定時,需要進行身份驗證,所以需要建立一份驗證檔,取名為 client_sasl.properties,裡面會有 Secrets Manager 設定的帳號密碼,將這檔案建立在 kafka 目錄下

client_sasl.properties 內容如下

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";

執行以下命令來修改權限,替換 BootStrapConnectString 為剛剛複製的。如果 Cluster 的節點數小於3個,replication-factor要設定小於3。

export TOPIC="*"
export USER="alan"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-3.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096"

## 允許使用者所有權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALL \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

以下是單獨權限設定

## 授予指定的使用者建立指定的主題。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Create \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定使用者對所有群組和指定主題的讀取權限。
## 該使用者可以從指定的主題讀取消息,並且適用於所有消費者群組。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Read --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對指定的主題的寫入權限。
## 該使用者可以向指定的主題發送消息。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Write \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對所有群組和指定的主題的描述權限。
## 該使用者可以查看主題和群組的元數據信息,例如消費者的偏移量和配置詳情。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Describe --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對指定的主題的刪除權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation DELETE \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 允許使用者修改資源(如 topic 的配置)。這個權限對於更改 topic partition 數或設定非常重要。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALTER \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

設定完權限後,即可開始透過外網建立 topic

開放外網存取

使用外網存取 msk,必須先開啟公開存取,因此須先設定叢集組態檔,將 allow.everyone.if.no.acl.found=false 加上去,讓叢集使用這份組態檔

點擊左側菜單欄的叢集組態 -> 建立叢集組態 -> 加上 allow.everyone.if.no.acl.found=false

raw-image
raw-image
raw-image

接著回到叢集裡面的屬性,替換剛剛新增的設定檔

raw-image
raw-image

好了之後回到叢集裡面的屬性,編輯聯網設定,開啟公開存取

raw-image
raw-image

使用帳號與密碼連接到MSK Cluster

可以存取外網後,就可以開始測試外部 Client 端是否可以操作了。先到一台需要透過外網存取的機器,先取得外網 ip,將這 ip 加到MSK 的 SG 裡,開放可以連到 9196 port

raw-image
raw-image

安裝kafka以及java,並到 kafka 目錄下設定 client_sasl.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";

首先先建立 topic,一樣使用 Broker 的連線資訊,但是是使用外網,先到叢集那邊複製公有端點

raw-image

到 kafka 目錄下執行指令建立 topic

export TOPIC="mytopic"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-topics.sh --create --bootstrap-server $BootStrapConnectString \
--replication-factor 3 \
--partitions 1 \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

接著開啟兩個終端視窗,使用 kafka-console,讓第一個視窗成為producer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-producer.sh --broker-list $BootStrapConnectString \
--topic $TOPIC \
--producer.config client_sasl.properties

接著讓第二個視窗成為consumer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-consumer.sh --bootstrap-server $BootStrapConnectString \
--topic $TOPIC \
--consumer.config client_sasl.properties

執行後可以看到 Producer 會將訊息傳送出去,並且 Consumer 可以成功接收到訊息

raw-image

如果要用 python 來測試,須先安裝 boto3 以及 kafka-python package,並且設定好 aws-cli,因為需要取得 secret

pip install boto3
pip install kafka-python

建立一份 producer.py,成為producer

import boto3
import base64
import json
from kafka import KafkaProducer
from kafka import TopicPartition

def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return json.loads(secret)
topic = 'mytopic'
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
producer = KafkaProducer(bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password'],
)
try:
while True:
message = input("請輸入您要發送的消息 (輸入 'exit' 退出): ")
if message.lower() == 'exit':
print("結束輸入,關閉生產者...")
break
# 將用戶輸入的消息編碼為 bytes
producer.send(topic, message.encode('utf-8'))
producer.flush() # 確保消息已經被發送
print("消息發送成功")
except Exception as e:
print(f"發送消息失敗: {e}")
finally:
producer.close()
print("Kafka 生產者已關閉。")

建立一份consumer.py,成為 consumer

import boto3
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition

def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return json.loads(secret)
TOPIC = "mytopic"
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
consumer = KafkaConsumer(TOPIC, bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password']
)
print("開始接收消息...")
try:
for message in consumer:
print(f"接收到消息: {message.value.decode('utf-8')}")
except Exception as e:
print(f"接收消息失敗: {e}")
finally:
consumer.close()
raw-image

📚Reference

17會員
83內容數
golang
留言0
查看全部
發表第一個留言支持創作者!
Alan的開發者天地 的其他內容
本文介紹如何使用 MongoDB 的命令行工具 Mongorestore 將先前備份的資料還原到資料庫中。Mongorestore 支援資料庫的整體恢復、特定集合的恢復,以及從遠端伺服器進行恢復等功能。無論是初學者還是有經驗的使用者,都能夠快速掌握如何使用 Mongorestore 工具。
👨‍💻簡介 在資料庫管理和系統維護中,備份是非常重要的一環。對於使用 MongoDB 的開發者和資料庫管理員來說,mongodump 是一個非常實用的命令行工具,能夠快速且輕鬆地完成資料庫的備份和恢復。無論是進行資料遷移、系統升級,還是面對突發的故障,mongodump 都能提供穩定的資料保護
本文介紹如何對 Telegram 憑證監控機器人的代碼進行優化,包括新增指令、讀取變數、提高可讀性和可維護性。
在這篇文章中,將繼續介紹 TG Bot 整合 MongoDB 的相關操作。主要包括對 domain 進行驗證操作,使用的工具有 Python 、MongoDB 和 TG Bot。具體的功能需求包括新增 domain 前檢查 domain 憑證以及透過 TG Bot 檢查所有 domain 是否過期。
本文介紹如何使用 TG Bot 來操作 MongoDB,包括讀取所有 domain、讀取特定 domain、新增 domain、批量新增 domain、修改 domain 和刪除 domain。透過 TG Bot 的指令操作,實現了自動化管理和多環境管理。
👨‍💻 簡介 今天這篇主要是帶大家快速建立屬於自己的 Telegram bot,申請 bot 的部分我會附上網址,請準備好之後再來開始。 🛠️ 使用工具 Python 3.9.6 TG Bot 📝 功能需求 輸入指令讓 TG Bot 回傳訊息 接受傳入參數並進行簡單回傳 設定
本文介紹如何使用 MongoDB 的命令行工具 Mongorestore 將先前備份的資料還原到資料庫中。Mongorestore 支援資料庫的整體恢復、特定集合的恢復,以及從遠端伺服器進行恢復等功能。無論是初學者還是有經驗的使用者,都能夠快速掌握如何使用 Mongorestore 工具。
👨‍💻簡介 在資料庫管理和系統維護中,備份是非常重要的一環。對於使用 MongoDB 的開發者和資料庫管理員來說,mongodump 是一個非常實用的命令行工具,能夠快速且輕鬆地完成資料庫的備份和恢復。無論是進行資料遷移、系統升級,還是面對突發的故障,mongodump 都能提供穩定的資料保護
本文介紹如何對 Telegram 憑證監控機器人的代碼進行優化,包括新增指令、讀取變數、提高可讀性和可維護性。
在這篇文章中,將繼續介紹 TG Bot 整合 MongoDB 的相關操作。主要包括對 domain 進行驗證操作,使用的工具有 Python 、MongoDB 和 TG Bot。具體的功能需求包括新增 domain 前檢查 domain 憑證以及透過 TG Bot 檢查所有 domain 是否過期。
本文介紹如何使用 TG Bot 來操作 MongoDB,包括讀取所有 domain、讀取特定 domain、新增 domain、批量新增 domain、修改 domain 和刪除 domain。透過 TG Bot 的指令操作,實現了自動化管理和多環境管理。
👨‍💻 簡介 今天這篇主要是帶大家快速建立屬於自己的 Telegram bot,申請 bot 的部分我會附上網址,請準備好之後再來開始。 🛠️ 使用工具 Python 3.9.6 TG Bot 📝 功能需求 輸入指令讓 TG Bot 回傳訊息 接受傳入參數並進行簡單回傳 設定
你可能也想看
Google News 追蹤
Thumbnail
這個秋,Chill 嗨嗨!穿搭美美去賞楓,裝備款款去露營⋯⋯你的秋天怎麼過?秋日 To Do List 等你分享! 秋季全站徵文,我們準備了五個創作主題,參賽還有機會獲得「火烤兩用鍋」,一起來看看如何參加吧~
Thumbnail
美國總統大選只剩下三天, 我們觀察一整週民調與金融市場的變化(包含賭局), 到本週五下午3:00前為止, 誰是美國總統幾乎大概可以猜到60-70%的機率, 本篇文章就是以大選結局為主軸來討論近期甚至到未來四年美股可能的改變
Thumbnail
Faker昨天真的太扯了,中國主播王多多點評的話更是精妙,分享給各位 王多多的點評 「Faker是我們的處境,他是LPL永遠繞不開的一個人和話題,所以我們特別渴望在決賽跟他相遇,去直面我們的處境。 我們曾經稱他為最高的山,最長的河,以為山海就是盡頭,可是Faker用他28歲的年齡...
Thumbnail
透過充分利用 AWS Organizations 和 CloudFormation StackSets,您可以更好地實現企業級的雲端管理與控制,為業務的持續發展提供穩固的技術支撐。
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
AWS DataSync 是一種線上資料移動和探索服務,可簡化並加速向 AWS 的資料遷移,以及在內部部署儲存、邊緣節點、其他雲端和 AWS 儲存服務移入和移出資料[1]。 在某些架構上會,使用該服務會需要安裝 DataSync Agent 來傳輸檔案 您需要 DataSync Agen
Thumbnail
為什麼會有Schema Registry的出現? 因為Kafka的零拷貝原則, 也就是kafka本身並不會去碰觸到訊息也不進行資料驗證, 而是bypass的傳送, 預設都以位元組來傳輸資料會比較有效率, 但位元組誰看得懂啊...。 加上Kafka的特性是生產者與消費者並不能直接溝通, 因
Thumbnail
雲端已經成為App開發的核心,而Amazon的AWS(Amazon Web Services是開發者常用的平台,可以幫助開發者建立、整合和擴展App。
Thumbnail
這篇文章教你如何搭建Kubernetes Cluster,包括節點安裝前設定、軟體套件安裝、Control-Plane部署和加入運算節點等步驟。在建置之後,作者會分享一些基礎服務的安裝。希望這篇文章對你有所幫助。
Thumbnail
本文章將說明如果您想要從頭建置一組具有Loadbalancer HA架構的Kubernetes Cluster時,你可能會需要做的事前準備工作。
Thumbnail
在沒有分環境之前,每一隻lambda只有一個code console給所有人一起編輯,開發好了就deploy,根據設定的trigger觸發執行。 現在我們希望能夠在code console開發,然後deploy到不同的stage,目標是不同stage的api gateway能夠調用該lambda的
Thumbnail
這個秋,Chill 嗨嗨!穿搭美美去賞楓,裝備款款去露營⋯⋯你的秋天怎麼過?秋日 To Do List 等你分享! 秋季全站徵文,我們準備了五個創作主題,參賽還有機會獲得「火烤兩用鍋」,一起來看看如何參加吧~
Thumbnail
美國總統大選只剩下三天, 我們觀察一整週民調與金融市場的變化(包含賭局), 到本週五下午3:00前為止, 誰是美國總統幾乎大概可以猜到60-70%的機率, 本篇文章就是以大選結局為主軸來討論近期甚至到未來四年美股可能的改變
Thumbnail
Faker昨天真的太扯了,中國主播王多多點評的話更是精妙,分享給各位 王多多的點評 「Faker是我們的處境,他是LPL永遠繞不開的一個人和話題,所以我們特別渴望在決賽跟他相遇,去直面我們的處境。 我們曾經稱他為最高的山,最長的河,以為山海就是盡頭,可是Faker用他28歲的年齡...
Thumbnail
透過充分利用 AWS Organizations 和 CloudFormation StackSets,您可以更好地實現企業級的雲端管理與控制,為業務的持續發展提供穩固的技術支撐。
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
AWS DataSync 是一種線上資料移動和探索服務,可簡化並加速向 AWS 的資料遷移,以及在內部部署儲存、邊緣節點、其他雲端和 AWS 儲存服務移入和移出資料[1]。 在某些架構上會,使用該服務會需要安裝 DataSync Agent 來傳輸檔案 您需要 DataSync Agen
Thumbnail
為什麼會有Schema Registry的出現? 因為Kafka的零拷貝原則, 也就是kafka本身並不會去碰觸到訊息也不進行資料驗證, 而是bypass的傳送, 預設都以位元組來傳輸資料會比較有效率, 但位元組誰看得懂啊...。 加上Kafka的特性是生產者與消費者並不能直接溝通, 因
Thumbnail
雲端已經成為App開發的核心,而Amazon的AWS(Amazon Web Services是開發者常用的平台,可以幫助開發者建立、整合和擴展App。
Thumbnail
這篇文章教你如何搭建Kubernetes Cluster,包括節點安裝前設定、軟體套件安裝、Control-Plane部署和加入運算節點等步驟。在建置之後,作者會分享一些基礎服務的安裝。希望這篇文章對你有所幫助。
Thumbnail
本文章將說明如果您想要從頭建置一組具有Loadbalancer HA架構的Kubernetes Cluster時,你可能會需要做的事前準備工作。
Thumbnail
在沒有分環境之前,每一隻lambda只有一個code console給所有人一起編輯,開發好了就deploy,根據設定的trigger觸發執行。 現在我們希望能夠在code console開發,然後deploy到不同的stage,目標是不同stage的api gateway能夠調用該lambda的