如何在 AWS 使用 SASL/SCRAM 建立 MSK

如何在 AWS 使用 SASL/SCRAM 建立 MSK

wang alan-avatar-img
發佈於AWS
更新於 發佈於 閱讀時間約 3 分鐘

👨‍💻簡介

Amazon Managed Streaming for Apache Kafka(Amazon MSK)是 Amazon 推出的完全代管、具有高可用以及安全的 Apache Kafka 服務。在這篇文章中,會使用 AWS 建立 MSK,並使用 SASL/SCRAM 的驗證方式來完成整個設定。

🎯setup

建立叢集

進入到 MSK 頁面,然後創建叢集

raw-image
  1. 叢集設定

創建時選自訂建立,叢集類型我這邊使用的是已佈建,規格可以選最小的,版本使用 AWS 建議的,區域數量為3,達到高可用,最後叢集組態先使用預設即可

raw-image
raw-image
raw-image

2. 聯網

聯網的部分可以使用 AWS 幫你建立好的預設 VPC,我在前一步選擇三個可用區,因此會需要有三條不同區域的子網,安全群組也選擇預設的即可

raw-image

3. 安全

安全這邊選擇 SASL/SCRAM 身分驗證,加密靜態資料選擇使用 AWS 受管金鑰,待會會提到

SASL/SCRAM(Simple Authentication andSecurity Layer/ Salted Challenge Response Mechanism),是一種使用帳號和密碼來完成驗證的方式,但它的前提是 Client 端與 Proxy 之間必須使用 TLS 加密。

raw-image

4. 監控和標籤

這裡監控只要選基本的即可

raw-image

接著等到叢集建立完成後開始設定密鑰,建立叢集大概需要15分鐘

建立KMS Key

AmazonMSK 使用 Amazon Secret Manager 來儲存 MSK 使用的帳號與密碼。在設定 Secret Manager 之前,必須先在 Amazon KMS(Key Management Service) 創建一個使用者託管的 Key。

KMS是 Amazon 託管的密鑰管理服務

raw-image
  1. 設定金鑰

建立過程都先選默認的,如果後續要調整可以再改。

raw-image

2. 新增標籤

別名叫kafka-kms

raw-image

3. 定義金鑰管理許可

選擇自己的 IAM user

raw-image

4. 定義金鑰使用許可

一樣選擇自己的 IAM user 即可

raw-image

回到主頁就可以看到自己的金鑰建立完成了

raw-image

建立 SecretsManager

Amazon Secrets Manager 是密碼管理服務,可以把敏感資料儲存在這,不需要放在程式裡,可以透過 API 調用取得,避免在程式外洩。 AmazonMSK 通過 Secrets Manager 來儲存帳號與密碼。

這裡要注意一點,使用預設 AWS KMS 金鑰建立的密碼無法與 Amazon MSK 叢集搭配使用。

raw-image
raw-image
  1. 選擇機密類型
    在 Secrets Manager 中選擇其他類型的密碼,這邊要打上我們需要驗證的 username 以及 password,加密的金鑰要選擇前面建立的 KMS 密鑰。
raw-image

2. 設定機密

在建立密碼時,密碼名稱必須包含前綴為 AmazonMSK_

raw-image

3. 設定輪換

選擇預設即可

raw-image

關聯密鑰到 MSK Cluster

回到主頁面的屬性,往下滑找到關聯機密,選擇剛剛建立的機密即可

raw-image

設定權限

建立 topic 之前,我們需要先透過 Broker 去修改帳號的權限,接著開放公開存取。 在相同子網路上建立 ec2,透過私有連線下去修改權限,先去 Client 端主機下載相對應版本的 Kafka,可以透過 [此連結](Apache Kafka)查看。

wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

下載完成後解壓縮。

tar -xzf kafka_2.13-3.8.0.tgz

如果機器上沒有安裝 java 也一併安裝

sudo yum -y install java-11

接著要先取得 BootstrapConnectString,將使用者權限設定完成。 在 MSK 主頁面點擊檢視用戶端資訊可以取得 BootstrapConnectString。

raw-image
raw-image

再來到 Client 端機器上的 kafka 目錄下,因為在執行權限設定時,需要進行身份驗證,所以需要建立一份驗證檔,取名為 client_sasl.properties,裡面會有 Secrets Manager 設定的帳號密碼,將這檔案建立在 kafka 目錄下

client_sasl.properties 內容如下

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";

執行以下命令來修改權限,替換 BootStrapConnectString 為剛剛複製的。如果 Cluster 的節點數小於3個,replication-factor要設定小於3。

export TOPIC="*"
export USER="alan"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-3.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096"

## 允許使用者所有權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALL \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

以下是單獨權限設定

## 授予指定的使用者建立指定的主題。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Create \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定使用者對所有群組和指定主題的讀取權限。
## 該使用者可以從指定的主題讀取消息,並且適用於所有消費者群組。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Read --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對指定的主題的寫入權限。
## 該使用者可以向指定的主題發送消息。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Write \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對所有群組和指定的主題的描述權限。
## 該使用者可以查看主題和群組的元數據信息,例如消費者的偏移量和配置詳情。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Describe --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對指定的主題的刪除權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation DELETE \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 允許使用者修改資源(如 topic 的配置)。這個權限對於更改 topic partition 數或設定非常重要。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALTER \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

設定完權限後,即可開始透過外網建立 topic

開放外網存取

使用外網存取 msk,必須先開啟公開存取,因此須先設定叢集組態檔,將 allow.everyone.if.no.acl.found=false 加上去,讓叢集使用這份組態檔

點擊左側菜單欄的叢集組態 -> 建立叢集組態 -> 加上 allow.everyone.if.no.acl.found=false

raw-image
raw-image
raw-image

接著回到叢集裡面的屬性,替換剛剛新增的設定檔

raw-image
raw-image

好了之後回到叢集裡面的屬性,編輯聯網設定,開啟公開存取

raw-image
raw-image

使用帳號與密碼連接到MSK Cluster

可以存取外網後,就可以開始測試外部 Client 端是否可以操作了。先到一台需要透過外網存取的機器,先取得外網 ip,將這 ip 加到MSK 的 SG 裡,開放可以連到 9196 port

raw-image
raw-image

安裝kafka以及java,並到 kafka 目錄下設定 client_sasl.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";

首先先建立 topic,一樣使用 Broker 的連線資訊,但是是使用外網,先到叢集那邊複製公有端點

raw-image

到 kafka 目錄下執行指令建立 topic

export TOPIC="mytopic"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-topics.sh --create --bootstrap-server $BootStrapConnectString \
--replication-factor 3 \
--partitions 1 \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

接著開啟兩個終端視窗,使用 kafka-console,讓第一個視窗成為producer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-producer.sh --broker-list $BootStrapConnectString \
--topic $TOPIC \
--producer.config client_sasl.properties

接著讓第二個視窗成為consumer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-consumer.sh --bootstrap-server $BootStrapConnectString \
--topic $TOPIC \
--consumer.config client_sasl.properties

執行後可以看到 Producer 會將訊息傳送出去,並且 Consumer 可以成功接收到訊息

raw-image

如果要用 python 來測試,須先安裝 boto3 以及 kafka-python package,並且設定好 aws-cli,因為需要取得 secret

pip install boto3
pip install kafka-python

建立一份 producer.py,成為producer

import boto3
import base64
import json
from kafka import KafkaProducer
from kafka import TopicPartition

def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return json.loads(secret)
topic = 'mytopic'
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
producer = KafkaProducer(bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password'],
)
try:
while True:
message = input("請輸入您要發送的消息 (輸入 'exit' 退出): ")
if message.lower() == 'exit':
print("結束輸入,關閉生產者...")
break
# 將用戶輸入的消息編碼為 bytes
producer.send(topic, message.encode('utf-8'))
producer.flush() # 確保消息已經被發送
print("消息發送成功")
except Exception as e:
print(f"發送消息失敗: {e}")
finally:
producer.close()
print("Kafka 生產者已關閉。")

建立一份consumer.py,成為 consumer

import boto3
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition

def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return json.loads(secret)
TOPIC = "mytopic"
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
consumer = KafkaConsumer(TOPIC, bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password']
)
print("開始接收消息...")
try:
for message in consumer:
print(f"接收到消息: {message.value.decode('utf-8')}")
except Exception as e:
print(f"接收消息失敗: {e}")
finally:
consumer.close()
raw-image

📚Reference

avatar-img
Alan的開發者天地
18會員
83內容數
golang
留言
avatar-img
留言分享你的想法!