2023-08-14|閱讀時間 ‧ 約 9 分鐘

如何運用批次輸入、多處理技術加速特徵工程

How to utilize batch input and multi-processing techniques to accelerate feature engineering?

Image Creator from Microsoft Bing

Image Creator from Microsoft Bing

問題

在進行特徵工程的過程中,我們通常需要處理各種各樣的數據,並轉換它們成有意義的特徵,以供後續的模型訓練和預測使用。然而,當數據集過於龐大,尤其是當記憶體限制造成RAM不足時,這個過程變得困難重重。這不僅影響效能,也可能阻礙我們進行更深入的特徵工程實驗。

解決方案:分割與多處理技術

為了克服這個效能問題,我們需要尋找解決方法,讓特徵工程能在有限的資源下順利進行。以下是我所採取的解決方案:

1. 數據分割與存儲

在進行 feature engineering 之前,我首先將原始數據集進行分割,按照特定的 ID 進行切割,將每個子集分別存儲成 pickle 檔案。這個步驟也須考量後續的feature engineering 內容,這次遇到的處理是以 group by ID 的方式進行,因此在這個步驟也依 ID 進行分割。這樣做的好處不僅有助於減少單次操作時的內存使用量,還能夠提高之後多處理的效能。

2. 批次處理(batch)與多處理(multiprocess)

在分割好數據後,我們使用批次(batch)的方式進行feature engineering。這意味著我們只處理一小部分數據,而不是一次性處理整個資料集。這有助於控制內存的使用,避免RAM不足的問題。此外,我們還運用了多處理(multiprocess)的技術,同時在多個處理單元中執行特徵工程,進一步提高效能。

實作細節

Data

這次選用 kaggle dataset : Bank Transaction Data ,他是一個銀行的交易數據,數據量不大,僅以次作為 sample 以實作後續內容(讀者可以找任何的資料取代,資料在本篇不是重點)。

import pandas as pd 

data_path = './data/bank.xlsx'
df = pd.read_excel(data_path, sheet_name='Sheet1')
df.rename(columns = {'Account No':'ID'}, inplace = True)
df.head(5)

df.head(5)

數據分割與存儲

我將原始的資料集按照特定的標識(ID)進行分割,並將每個子集分別存儲成pickle檔案。這使得每個子集都可以獨立地載入和處理。

import os

def split_by_id_save_to_pickle(df: pd.DataFrame, output_path:str) -> list:
unique_accounts = df['ID'].unique()
pickle_filename_list = []

for account_id in unique_accounts:
account_data = df[df['ID'] == account_id]
account_id = account_id[:-1]
pickle_filename = f'{account_id}.pickle'

account_data = account_data.reset_index(drop=True)

account_data.to_pickle(os.path.join(output_path,pickle_filename))
pickle_filename_list.append(pickle_filename)

return pickle_filename_list

pickle_filename_list = split_by_id_save_to_pickle(df,'process')


範例 feature engineering

這裡以 “calculate_transaction_percentage“ 計算交易佔該ID的總交易比例,也是只是一個 sample ,可以換任何的 feature engineering 。

並且再用一個 preprocess function 把要做的事情都包起來,從讀取 pickle 到特徵工程以及最後再存回去 pickle。

def calculate_transaction_percentage(data):

data.sort_values(by=['ID', 'DATE'], inplace=True)
data['Accumulated Deposit'] = data.groupby('ID')['DEPOSIT AMT'].cumsum()
data['Transaction Percentage'] = data['DEPOSIT AMT'] / (data['Accumulated Deposit'] )
return data


def preporcess(args):

file_name = args[0]
input_path = args[1]
output_path = args[2]

try:
data = pd.read_pickle(os.path.join(input_path,file_name))
data = calculate_transaction_percentage(data)
data.to_pickle(os.path.join(output_path,file_name))
return f"success: {file_name}"
except Exception as e:
print(e)
return f"error: {file_name}"


多處理 multiprocess

對每個分割後的子集,我使用批次的方式進行特徵工程。這意味著我們每次只處理數據的一部分,從而有效控制了記憶體的使用。同時,我們使用多處理技術,將特徵工程任務分配給多個處理單元,並且同步地進行處理,從而極大地提高了效能。

def concurrent_multi_process(list_:list, function_:Callable, *para):
""" Implement multi-process to speed up process time. Args: Input: list, function output: list of function's output """
args = ((element, *para) for element in list_)
with concurrent.futures.ProcessPoolExecutor() as executor:
result_list = list(tqdm(executor.map(function_, args), total = len(list_)))

return result_list

file_list = concurrent_multi_process(pickle_filename_list,
preporcess,
'process',
'output')

結論

在這篇文章中,我們討論在特徵工程過程中因RAM不足而導致的效能問題。為了解決這個問題,我們引入了數據分割、批次處理和多處理等技術,從而成功地突破了效能瓶頸。這些方法不僅幫助我們克服了記憶體限制,還大幅提高了特徵工程的效能,使我們能夠更深入地進行資料分析和建模工作。希望本篇文章能夠啟發更多的資料科學從業者,針對效能問題尋找更多的解決方案,我們下次見!

分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.