在數據科學和機器學習中,特徵工程是提高模型性能的關鍵步驟之一,通過創建新特徵或轉換現有特徵,我們可以更好地捕捉數據中的信息,提高模型的預測能力。然而,當處理大數據集時,特徵工程可能變得耗時,而且若是在研發階段,特徵其實是需要快速迭代去產生並且做後續的實驗,這也是近期遇到的問題,因此想在這篇文章實作多進程的優化。那要執行特徵工程的流程,其實有很多個選擇,在本次文章中,想分享最近實做的一種方法,雖然他不是最好的,但它是可行的,如果有比較建議的方法歡迎分享交流,一起學習成長!
使用Python的多進程功能來加速特徵工程,並將結果存儲在SQLite數據庫中,這可以達到快速產生特徵,並且在RAM資源有限的情況下,可以如預期執行。
num : 表示帳戶號碼,建立100個帳戶
date : 表示交易日期,建立交易日期在 2023-10-20 到 2023-10-30日之間
time : 表示交易時間,一天24 小時內都可以
amount : 這筆交易的金額,金額範圍 1- 1000000
sample 如下:
import pandas as pd
import numpy as np
import random
from faker import Faker
from datetime import datetime, timedelta
np.random.seed(0)
random.seed(0)
fake = Faker()
data = pd.DataFrame(columns=['num', 'date', 'time', 'amount'])
account_numbers = [fake.unique.random_number(digits=6) for _ in range(100)]
for _ in range(1000):
acct_num = random.choice(account_numbers)
date = (datetime(2023, 10, 20) + timedelta(days=random.randint(0, 10))).date()
time = fake.time(pattern='%H:%M')
amt = random.randint(1, 1000000)
data = pd.concat([data, pd.DataFrame({'num': [num], 'date': [date], 'time': [time],
'amount': [amount]})], ignore_index=True)
data.to_csv('data/transaction.csv', index=False)
import multiprocessing
import sqlite3
import pandas as pd
def create_db_connection():
return sqlite3.connect('mydatabase.db')
def calculate_average_amount(data, num):
average_amount = data.groupby('num')['amount'].mean().reset_index()
print(average_amount)
return average_amount
def calculate_variance(data, num ):
variance_data = data.groupby('num')['amount'].var().reset_index()
variance_data.rename(columns={'amount': 'amount_variance'}, inplace=True)
return variance_data
def calculate_time_diff(data):
account_data = data.copy()
account_data['time'] = pd.to_datetime(account_data['time'], format='%H:%M')
account_data = account_data.sort_values(by=['date', 'time'])
account_data['time_diff'] = account_data['time'].diff().dt.total_seconds() / 60.0
return account_data[['num', 'time_diff']]
def calculate_average_time_diff(time_df):
average_time_diff = time_df.groupby('num')['time_diff'].mean().reset_index()
average_time_diff.rename(columns={'time_diff': 'time_diff_mean'}, inplace=True)
return average_time_diff
def process_feature(data,num):
print('in function process_feature: ',data.shape)
data = data[data['num'] == num]
ori_df = calculate_average_amount(data, num)
output_df = calculate_variance(data,num )
output_df = ori_df.merge(output_df, on='num')
print(output_df.columns)
time_df = calculate_time_diff(data)
time_df_o = calculate_average_time_diff(time_df)
print(time_df_o.columns)
output_df = output_df.merge(time_df_o, on='num')
return output_df
import multiprocessing
import sqlite3
import pandas as pd
from feature import calculate_average_amount, calculate_variance, process_feature
from functools import partial
def create_db_connection():
return sqlite3.connect('feature.db')
def process_acct_num(num, data):
output_df = process_feature(data,num)
conn = create_db_connection()
output_df.to_sql('feature', conn, if_exists='append', index=False)
conn.close()
if __name__ == '__main__':
data = pd.read_csv('./data/transaction.csv')
nums = data['num'].unique()
start_date, end_date = '2023-10-20' , '2023-10-30'
num_processes = multiprocessing.cpu_count()
print(num_processes)
num_processes = num_processes-2
process_acct_num_partial = partial(process_acct_num, data=data)
with multiprocessing.Pool(num_processes) as pool:
pool.map(process_acct_num_partial, nums)
我們使用 Python 和多進程來進行特徵工程的例子,並將結果存儲在 SQLite 數據庫中。這個例子展示了對於一個交易數據集,如何計算每個帳戶的平均交易金額,特徵可以根據現實的情況擴增,這裡只展示一個框架。
過程中利用了 Python 的 multiprocessing
模塊,有效地利用了多核 CPU 資源,加快了大量數據的處理速度,特別是在處理大型數據集時,使用多進程可以顯著提高效率。
最後,我們將計算完的特徵通過 SQLite 數據庫進行了存儲,這樣不僅便於管理和存取這些數據,也為後續的數據分析和機器學習建模提供了方便。透過這篇文章,記錄下前些日子在資源有限的情況,可以 work 的 solution 。
歡迎交流~分享~一起成長!