[ETL] Nifi 初體驗 - Join

更新於 發佈於 閱讀時間約 7 分鐘
解決完Inert的問題之後,下一個想在測試的流程就是SQL中的Join,是因為這方式在原本的ETL工具中使用最多功能。

簡單的說明流程:
Join Flow
在測試Join的時候,遇到了很多問題,除了Solution非常的少,大多的建議都是Nifi並不適合使用SQL中的Join。有幸可以找到這篇文章,雖然我們使用的資料來源不一樣,但依照相同的設定與概念建立Nifi,還是可以達到一樣的效果。
Reference : https://medium.com/@surajnagendra/merge-csv-files-apache-nifi-21ba44e1b719

在Nifi的設計如下:
Nifi Join Setting — 1
Nifi Join Setting — 2
Nifi Insert Setting

設定的方式區分為兩個區塊,區塊一是指Process表面的設定,區塊二是指Flow Configure中Reader和Write的設定。
區塊一說明:
1. ExcuteSQL: 從DB倒出資料的Process. 其實在Nifi中,可以導出SQL的Process 很多,除了這個以外,還有先前有提到的QueryDatabaseTable,我並沒有很專研Nifi,但就功能性來說ExcuteSQL還有包含Pre-SQL與Post-SQL,在設計上因為最終是希望可以將資料寫入DB中,因此可以在這Process搭配Delete SQL。另一方面,這Process也可以搭配控制流程的GenerateFlowFile,因此就選擇了這個Process。但相信越是簡單的Process一定也有它的道理。
ExecuteSQL Setting

2. UpdateAttribte for classfing ata : 在流程中有兩段UpdateAttribute的Processes,在這一段主要是區分資料流,各自給予參數與相對的值,此設定是要在流程後利用此參數來區分資料,才能進行Join。
Attribute Description:
  • fragment.index = 0, 1. → 此參數為MergeContent使用。
  • metric = a, b.
UpdateAttribute Setting for classifying

3. UpdateAttribute for Joining style : 流程中的第二段參數設定,主要是要設定Join 的型態。
Attribute Description:
  • fragment.count = 2.
  • fragment.idenifier = X,以上的參數設定,也適用於MergeContent
  • schema.name = metric.
UpdateAttribute for Joining style

4. UpdateRecord:對 record 設定參數。
Attribute Description:
  • /m = ${metric}. ← 此寫法是抓取此流程以上的Processes中有參數設定寫入的參數值。
${metric} value
UpdateRecord Setting

5. MergeContent : 這個Process是讓我在測試Join時,撞到最多牆的一個地方,在merge時,是利用先前設定的參數進行的,相關的參數可以參考官方文件。
MergeContent description
MergeContent Setting
the view after merging
6. QueryRecord: 使用SQL來將已經做分類的資料流,利用參數設定來進行Join。
Attribute Description:
  • query:
attribute query setting
QueryRecord Setting
以上就是Join"表面"的區塊一設定,細節藏在魔鬼中。

關鍵的設定看到在UpdateRecord中的Record Reader和Record Write,這就是Join真正的驅動者。

區塊二設定:
在第一個出現Record Reader and Write的Process是在UpdateRecord.
  • Record Reader : AvroReader.
AvroReader Setting
  • Record Write : AvroRecordSetWriter.
    Setting Description:
    a. Schema Access Strategy (very IMPORTANT): AvroSchemaRegistry.
    b. Schema Name : ${schema.name}. 這部分一樣式擷取傳入的FileFlow中參數 schema.name的值。
AvroRecordSetWriter Setting

第二次出現Reader and Write的設定是在QueryRecord,設定內容如下:
  • Record Reader: AvroReader-Query.
    Setting Description:
    a. Schema Registry: AvroSchemaRegistry.
    b. Schema Name : ${schema.name}.這部分一樣式擷取傳入的FileFlow中參數 schema.name的值。
AvroReader-Query Setting
  • Record Write : AvroRecordSetWriter-Query.
    Setting Description:
    a. Schema Registry: AvroSchemaRegistry.
    b. Schema Name: metric.
AvroRecordSetWriter-Query Setting

設定以上兩組Flow Configuration時,都會設定到Schema Registry,在Join中,就是根據這個設定,來確定最終需要顯示的欄位是什麼,使用的是Apache特有格式Avor.
Schema Registry設定如下:
  • metric =
attribute metric Setting
AvroSchemaRegistry Setting

最後Join的結果如下:
Result

Nifi的測試就到這裡了,在測試的期間,可以感受到Nifi的功能很廣,但測試目的主要是處理DB的基本功能,例如Join, Group by等等,因此Nifi並不適合。
avatar-img
2會員
5內容數
留言0
查看全部
avatar-img
發表第一個留言支持創作者!
你可能也想看
Google News 追蹤
Thumbnail
※ 何時該使用 JOIN? JOIN 使用的時機是:當你需要同時查詢一張以上的資料表的時候。 ※ SQL有哪些TABLE JOIN的方式? INNER JOIN LEFT JOIN RIGHT JOIN SELF JOIN ※ 使用 JOIN 的時候,我們需要考慮到: 我要使用哪一種
Thumbnail
在進行SQL查詢邏輯更改時,需要適當地使用SubQuery和join來達到新的排序需求。本文將介紹原本的撈取邏輯、需求以及如何使用SubQuery來解決新的排序需求。
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
經過前兩篇文章(Notion 是什麼?你適合使用 Notion 嗎、Notion 介面一覽及必備設定)的介紹,相信大家對 Notion 的功能及介面有了大致的瞭解,這次想要與大家分享的是我認為 Notion 中最具代表性的功能,Database。
CI 持續整合 CI目的是建立自動化專案打包。 CD 持續發表 將打包完成的專案,自動發布。
※ 需要做版本備份時: git init:初始化此資料夾,由git 開始追蹤版本控制。 git add:將檔案加入到暫存區。 git commit:把暫存區的內容提交到儲存庫。 git status:查看目前所有檔案的狀態。 git log:查看過去所有commit的記錄。 ※ 需要做修
如何在SQL實踐中EXCEL 常用功能 篩選 和 擷取文字串?需要熟練地使用分組(GROUP BY) 與 排序 (ORDER BY) 以及SUBSTRING_INDEX函數!
Thumbnail
ETL是資料倉儲領域中一個重要的概念,全稱為Extract-Transform-Load,中文可譯為"抽取-轉換-載入"。ETL的作用是將來自不同來源的資料抽取出來,經過清理、轉換、整合等處理後,最終將處理好的資料載入到資料倉儲或其他單一的資料存放區
ERP系統導入,大概分成五個階段: 1。需求訪談 2。作業流程差異分析 3。實作
Thumbnail
※ 何時該使用 JOIN? JOIN 使用的時機是:當你需要同時查詢一張以上的資料表的時候。 ※ SQL有哪些TABLE JOIN的方式? INNER JOIN LEFT JOIN RIGHT JOIN SELF JOIN ※ 使用 JOIN 的時候,我們需要考慮到: 我要使用哪一種
Thumbnail
在進行SQL查詢邏輯更改時,需要適當地使用SubQuery和join來達到新的排序需求。本文將介紹原本的撈取邏輯、需求以及如何使用SubQuery來解決新的排序需求。
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
經過前兩篇文章(Notion 是什麼?你適合使用 Notion 嗎、Notion 介面一覽及必備設定)的介紹,相信大家對 Notion 的功能及介面有了大致的瞭解,這次想要與大家分享的是我認為 Notion 中最具代表性的功能,Database。
CI 持續整合 CI目的是建立自動化專案打包。 CD 持續發表 將打包完成的專案,自動發布。
※ 需要做版本備份時: git init:初始化此資料夾,由git 開始追蹤版本控制。 git add:將檔案加入到暫存區。 git commit:把暫存區的內容提交到儲存庫。 git status:查看目前所有檔案的狀態。 git log:查看過去所有commit的記錄。 ※ 需要做修
如何在SQL實踐中EXCEL 常用功能 篩選 和 擷取文字串?需要熟練地使用分組(GROUP BY) 與 排序 (ORDER BY) 以及SUBSTRING_INDEX函數!
Thumbnail
ETL是資料倉儲領域中一個重要的概念,全稱為Extract-Transform-Load,中文可譯為"抽取-轉換-載入"。ETL的作用是將來自不同來源的資料抽取出來,經過清理、轉換、整合等處理後,最終將處理好的資料載入到資料倉儲或其他單一的資料存放區
ERP系統導入,大概分成五個階段: 1。需求訪談 2。作業流程差異分析 3。實作