2021-10-26|閱讀時間 ‧ 約 8 分鐘

[ETL] Nifi 初體驗 - Join

解決完Inert的問題之後,下一個想在測試的流程就是SQL中的Join,是因為這方式在原本的ETL工具中使用最多功能。
簡單的說明流程:
Join Flow
Join Flow
在測試Join的時候,遇到了很多問題,除了Solution非常的少,大多的建議都是Nifi並不適合使用SQL中的Join。有幸可以找到這篇文章,雖然我們使用的資料來源不一樣,但依照相同的設定與概念建立Nifi,還是可以達到一樣的效果。 Reference : https://medium.com/@surajnagendra/merge-csv-files-apache-nifi-21ba44e1b719

在Nifi的設計如下:
Nifi Join Setting — 1
Nifi Join Setting — 2
Nifi Insert Setting

設定的方式區分為兩個區塊,區塊一是指Process表面的設定,區塊二是指Flow Configure中Reader和Write的設定。
區塊一說明:
1. ExcuteSQL: 從DB倒出資料的Process. 其實在Nifi中,可以導出SQL的Process 很多,除了這個以外,還有先前有提到的QueryDatabaseTable,我並沒有很專研Nifi,但就功能性來說ExcuteSQL還有包含Pre-SQL與Post-SQL,在設計上因為最終是希望可以將資料寫入DB中,因此可以在這Process搭配Delete SQL。另一方面,這Process也可以搭配控制流程的GenerateFlowFile,因此就選擇了這個Process。但相信越是簡單的Process一定也有它的道理。
ExecuteSQL Setting

2. UpdateAttribte for classfing ata : 在流程中有兩段UpdateAttribute的Processes,在這一段主要是區分資料流,各自給予參數與相對的值,此設定是要在流程後利用此參數來區分資料,才能進行Join。 Attribute Description:
  • fragment.index = 0, 1. → 此參數為MergeContent使用。
  • metric = a, b.
UpdateAttribute Setting for classifying

3. UpdateAttribute for Joining style : 流程中的第二段參數設定,主要是要設定Join 的型態。 Attribute Description:
  • fragment.count = 2.
  • fragment.idenifier = X,以上的參數設定,也適用於MergeContent
  • schema.name = metric.
UpdateAttribute for Joining style

4. UpdateRecord:對 record 設定參數。 Attribute Description:
  • /m = ${metric}. ← 此寫法是抓取此流程以上的Processes中有參數設定寫入的參數值。
${metric} value
UpdateRecord Setting

5. MergeContent : 這個Process是讓我在測試Join時,撞到最多牆的一個地方,在merge時,是利用先前設定的參數進行的,相關的參數可以參考官方文件。
MergeContent description
MergeContent Setting
the view after merging
6. QueryRecord: 使用SQL來將已經做分類的資料流,利用參數設定來進行Join。 Attribute Description:
  • query:
attribute query setting
QueryRecord Setting
以上就是Join"表面"的區塊一設定,細節藏在魔鬼中。

關鍵的設定看到在UpdateRecord中的Record Reader和Record Write,這就是Join真正的驅動者。
區塊二設定: 在第一個出現Record Reader and Write的Process是在UpdateRecord.
  • Record Reader : AvroReader.
AvroReader Setting
  • Record Write : AvroRecordSetWriter. Setting Description: a. Schema Access Strategy (very IMPORTANT): AvroSchemaRegistry. b. Schema Name : ${schema.name}. 這部分一樣式擷取傳入的FileFlow中參數 schema.name的值。
AvroRecordSetWriter Setting
第二次出現Reader and Write的設定是在QueryRecord,設定內容如下:
  • Record Reader: AvroReader-Query. Setting Description: a. Schema Registry: AvroSchemaRegistry. b. Schema Name : ${schema.name}.這部分一樣式擷取傳入的FileFlow中參數 schema.name的值。
AvroReader-Query Setting
  • Record Write : AvroRecordSetWriter-Query. Setting Description: a. Schema Registry: AvroSchemaRegistry. b. Schema Name: metric.
AvroRecordSetWriter-Query Setting

設定以上兩組Flow Configuration時,都會設定到Schema Registry,在Join中,就是根據這個設定,來確定最終需要顯示的欄位是什麼,使用的是Apache特有格式Avor. Schema Registry設定如下:
  • metric =
attribute metric Setting
AvroSchemaRegistry Setting
最後Join的結果如下:
Result

Nifi的測試就到這裡了,在測試的期間,可以感受到Nifi的功能很廣,但測試目的主要是處理DB的基本功能,例如Join, Group by等等,因此Nifi並不適合。
分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.