How to utilize batch input and multi-processing techniques to accelerate feature engineering?
在進行特徵工程的過程中,我們通常需要處理各種各樣的數據,並轉換它們成有意義的特徵,以供後續的模型訓練和預測使用。然而,當數據集過於龐大,尤其是當記憶體限制造成RAM不足時,這個過程變得困難重重。這不僅影響效能,也可能阻礙我們進行更深入的特徵工程實驗。
為了克服這個效能問題,我們需要尋找解決方法,讓特徵工程能在有限的資源下順利進行。以下是我所採取的解決方案:
在進行 feature engineering 之前,我首先將原始數據集進行分割,按照特定的 ID 進行切割,將每個子集分別存儲成 pickle 檔案。這個步驟也須考量後續的feature engineering 內容,這次遇到的處理是以 group by ID 的方式進行,因此在這個步驟也依 ID 進行分割。這樣做的好處不僅有助於減少單次操作時的內存使用量,還能夠提高之後多處理的效能。
在分割好數據後,我們使用批次(batch)的方式進行feature engineering。這意味著我們只處理一小部分數據,而不是一次性處理整個資料集。這有助於控制內存的使用,避免RAM不足的問題。此外,我們還運用了多處理(multiprocess)的技術,同時在多個處理單元中執行特徵工程,進一步提高效能。
這次選用 kaggle dataset : Bank Transaction Data ,他是一個銀行的交易數據,數據量不大,僅以次作為 sample 以實作後續內容(讀者可以找任何的資料取代,資料在本篇不是重點)。
import pandas as pd
data_path = './data/bank.xlsx'
df = pd.read_excel(data_path, sheet_name='Sheet1')
df.rename(columns = {'Account No':'ID'}, inplace = True)
df.head(5)
我將原始的資料集按照特定的標識(ID)進行分割,並將每個子集分別存儲成pickle檔案。這使得每個子集都可以獨立地載入和處理。
import os
def split_by_id_save_to_pickle(df: pd.DataFrame, output_path:str) -> list:
unique_accounts = df['ID'].unique()
pickle_filename_list = []
for account_id in unique_accounts:
account_data = df[df['ID'] == account_id]
account_id = account_id[:-1]
pickle_filename = f'{account_id}.pickle'
account_data = account_data.reset_index(drop=True)
account_data.to_pickle(os.path.join(output_path,pickle_filename))
pickle_filename_list.append(pickle_filename)
return pickle_filename_list
pickle_filename_list = split_by_id_save_to_pickle(df,'process')
這裡以 “calculate_transaction_percentage“ 計算交易佔該ID的總交易比例,也是只是一個 sample ,可以換任何的 feature engineering 。
並且再用一個 preprocess function 把要做的事情都包起來,從讀取 pickle 到特徵工程以及最後再存回去 pickle。
def calculate_transaction_percentage(data):
data.sort_values(by=['ID', 'DATE'], inplace=True)
data['Accumulated Deposit'] = data.groupby('ID')['DEPOSIT AMT'].cumsum()
data['Transaction Percentage'] = data['DEPOSIT AMT'] / (data['Accumulated Deposit'] )
return data
def preporcess(args):
file_name = args[0]
input_path = args[1]
output_path = args[2]
try:
data = pd.read_pickle(os.path.join(input_path,file_name))
data = calculate_transaction_percentage(data)
data.to_pickle(os.path.join(output_path,file_name))
return f"success: {file_name}"
except Exception as e:
print(e)
return f"error: {file_name}"
對每個分割後的子集,我使用批次的方式進行特徵工程。這意味著我們每次只處理數據的一部分,從而有效控制了記憶體的使用。同時,我們使用多處理技術,將特徵工程任務分配給多個處理單元,並且同步地進行處理,從而極大地提高了效能。
def concurrent_multi_process(list_:list, function_:Callable, *para):
""" Implement multi-process to speed up process time. Args: Input: list, function output: list of function's output """
args = ((element, *para) for element in list_)
with concurrent.futures.ProcessPoolExecutor() as executor:
result_list = list(tqdm(executor.map(function_, args), total = len(list_)))
return result_list
file_list = concurrent_multi_process(pickle_filename_list,
preporcess,
'process',
'output')
在這篇文章中,我們討論在特徵工程過程中因RAM不足而導致的效能問題。為了解決這個問題,我們引入了數據分割、批次處理和多處理等技術,從而成功地突破了效能瓶頸。這些方法不僅幫助我們克服了記憶體限制,還大幅提高了特徵工程的效能,使我們能夠更深入地進行資料分析和建模工作。希望本篇文章能夠啟發更多的資料科學從業者,針對效能問題尋找更多的解決方案,我們下次見!