相信對於這一篇感興趣的朋友們都已經玩過kafka的Schema Registry了吧! 沒玩過得朋友也沒關係, 歡迎至「【🔒Message Queue - Kafka】傳輸訊息的標準格式制定者 Schema Registry」了解一下這是什麼玩意兒, 好了, 廢話不多說, 讓我們直接切入主題吧!
我們根據 【🔒Message Queue - Kafka】傳輸訊息的標準格式制定者 Schema Registry 提供的範例, 進行Producer的執行之後, 接著我們因為需求的變更需要增加一個欄位, 因此在「avro_producer.py」調整了schema如下:
schema = """
{
"name": "Payment",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "amount",
"type": "double"
},
{
"name": "currency",
"type": "string",
}
]
}
"""
接著再執行一次「avro_producer.py」, 就發生了以下的訊息:
confluent_kafka.schema_registry.error.SchemaRegistryError:
Schema being registered is incompatible with an earlier schema for subject "avro_test-value",
details: [
{
errorType:'READER_FIELD_MISSING_DEFAULT_VALUE',
description:'The field 'currency' at path '/fields/2' in the new schema has no default value and is missing in the old schema',
additionalInfo:'currency'
},
{
oldSchemaVersion: 1
},
{
oldSchema: '{"type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}'},
{
validateFields: 'false',
compatibility: 'BACKWARD'}
] (HTTP status code 409, SR code 409)
這個錯誤訊息主要告訴我們schema的兼容性出了問題, 但究竟為什麼呢? kafka的Schema Registry不是號稱很彈性, 容易擴充嗎? 怎麼今天一擴充下去就GG了呢😢 ?
讓我們再仔細看看內容, 發現到這樣的關鍵字:
new schema has no default value and is missing in the old schema
這意思就是我們新的Schema欄位並沒有「預設值」, 因此很容易發生向後兼容的問題, 建議我們設計個「預設值」才能夠讓新的Schema成功更新。
讓我們重新調整一下Schema, 對於新欄位「currency」設定「default」值即可:
schema = """
{
"name": "Payment",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "amount",
"type": "double"
},
{
"name": "currency",
"type": "string",
"default": "null"
}
]
}
"""
最後我們重新執行一下「avro_producer.py 」。
非常棒👍👍👍 成功了!!!
仔細的觀察每一條訊息的內文, 我們都能夠從中找出蛛絲馬跡, 再請試著搭配一些AI技巧, 可以讓我們的問題排查速度變得更快, 這篇我們要推薦您閱讀一下「【🔒 江湖一點訣】年中復盤 - AI 潮流下的學習模式轉變與升級」, 幫助您提升效率, 精準的解決問題!