我們在「🔒 阿Han的軟體心法實戰營 - kafka」有關於kafka的教學文章, 那麼在開發過程中我們遇到了 👻 詭異事件, 那就是我們嘗試在做一個檔案串流時, 發現Producer明明傳送了大約16MB檔案大小的封包到kafka, 每一包約(1024 * 1024 ) bytes, 但奇怪的是kafka的查詢介面裡卻只有發現到大約 953KB的訊息…, 照理來說應該要有10幾包才對呀!
我們透過open file 來讀取檔案並且設定一包讀取 1024 * 1024 bytes大小的封包送到kafka, 就以程式碼來看完全沒有錯誤呀! 怎麼會沒有訊息咧?
bootstrap_servers = ["localhost:9092"]
topic_name = "file-stream"
chunk_size = 1024 * 1024 # 1 MB
def send_file(filename):
# Create a producer
producer = kafka.KafkaProducer(bootstrap_servers=bootstrap_servers)
# Read the file in chunks
with open(filename, "rb") as f:
while True:
# Read a chunk of data from the file
chunk = f.read(chunk_size)
# If the chunk is empty, break the loop
if not chunk:
break
print(f"send: {len(chunk)}")
# Send the chunk to Kafka
producer.send(topic_name, key=bytes(filename, 'utf-8'), value=chunk)
# Flush the producer to ensure all messages are sent
producer.flush()
Kafka 主題中每個訊息的預設限制為 1MB,這是因為非常大的消息被認為是低效的。
原來kafka很多背景知識需要了解, 常常我們會在實作的過程中慢慢發現, 就讓我們一步步探索kafka的奧妙吧! 也歡迎訂閱「🔒 阿Han的軟體心法實戰營」一起學習軟體相關的知識與疑難雜症。