解決完Inert的問題之後,下一個想在測試的流程就是SQL中的Join,是因為這方式在原本的ETL工具中使用最多功能。
簡單的說明流程:

Join Flow
在測試Join的時候,遇到了很多問題,除了Solution非常的少,大多的建議都是Nifi並不適合使用SQL中的Join。有幸可以找到這篇文章,雖然我們使用的資料來源不一樣,但依照相同的設定與概念建立Nifi,還是可以達到一樣的效果。
Reference : https://medium.com/@surajnagendra/merge-csv-files-apache-nifi-21ba44e1b719
在Nifi的設計如下:

Nifi Join Setting — 1

Nifi Join Setting — 2

Nifi Insert Setting
設定的方式區分為兩個區塊,區塊一是指Process表面的設定,區塊二是指Flow Configure中Reader和Write的設定。
區塊一說明:
1. ExcuteSQL: 從DB倒出資料的Process. 其實在Nifi中,可以導出SQL的Process 很多,除了這個以外,還有先前有提到的QueryDatabaseTable,我並沒有很專研Nifi,但就功能性來說ExcuteSQL還有包含Pre-SQL與Post-SQL,在設計上因為最終是希望可以將資料寫入DB中,因此可以在這Process搭配Delete SQL。另一方面,這Process也可以搭配控制流程的GenerateFlowFile,因此就選擇了這個Process。但相信越是簡單的Process一定也有它的道理。

ExecuteSQL Setting
2. UpdateAttribte for classfing ata : 在流程中有兩段UpdateAttribute的Processes,在這一段主要是區分資料流,各自給予參數與相對的值,此設定是要在流程後利用此參數來區分資料,才能進行Join。
Attribute Description:
- fragment.index = 0, 1. → 此參數為MergeContent使用。
- metric = a, b.
MergeContent詳細說明可參考官方文件: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.MergeContent/index.html

UpdateAttribute Setting for classifying
3. UpdateAttribute for Joining style : 流程中的第二段參數設定,主要是要設定Join 的型態。
Attribute Description:
- fragment.count = 2.
- fragment.idenifier = X,以上的參數設定,也適用於MergeContent
- schema.name = metric.

UpdateAttribute for Joining style
4. UpdateRecord:對 record 設定參數。
Attribute Description:
- /m = ${metric}. ← 此寫法是抓取此流程以上的Processes中有參數設定寫入的參數值。

${metric} value

UpdateRecord Setting
5. MergeContent : 這個Process是讓我在測試Join時,撞到最多牆的一個地方,在merge時,是利用先前設定的參數進行的,相關的參數可以參考官方文件。

MergeContent description

MergeContent Setting

the view after merging
6. QueryRecord: 使用SQL來將已經做分類的資料流,利用參數設定來進行Join。
Attribute Description:
- query:

attribute query setting

QueryRecord Setting
以上就是Join"表面"的區塊一設定,細節藏在魔鬼中。
關鍵的設定看到在UpdateRecord中的Record Reader和Record Write,這就是Join真正的驅動者。
區塊二設定:
在第一個出現Record Reader and Write的Process是在UpdateRecord.
- Record Reader : AvroReader.

AvroReader Setting
- Record Write : AvroRecordSetWriter.
Setting Description:
a. Schema Access Strategy (very IMPORTANT): AvroSchemaRegistry.
b. Schema Name : ${schema.name}. 這部分一樣式擷取傳入的FileFlow中參數 schema.name的值。

AvroRecordSetWriter Setting
第二次出現Reader and Write的設定是在QueryRecord,設定內容如下:
- Record Reader: AvroReader-Query.
Setting Description:
a. Schema Registry: AvroSchemaRegistry.
b. Schema Name : ${schema.name}.這部分一樣式擷取傳入的FileFlow中參數 schema.name的值。

AvroReader-Query Setting
- Record Write : AvroRecordSetWriter-Query.
Setting Description:
a. Schema Registry: AvroSchemaRegistry.
b. Schema Name: metric.

AvroRecordSetWriter-Query Setting
設定以上兩組Flow Configuration時,都會設定到Schema Registry,在Join中,就是根據這個設定,來確定最終需要顯示的欄位是什麼,使用的是Apache特有格式Avor.
Schema Registry設定如下:
- metric =

attribute metric Setting

AvroSchemaRegistry Setting
最後Join的結果如下:

Result
Nifi的測試就到這裡了,在測試的期間,可以感受到Nifi的功能很廣,但測試目的主要是處理DB的基本功能,例如Join, Group by等等,因此Nifi並不適合。