股票數據清洗與日K轉換周月年K工具 - 完整功能解析-2

更新 發佈閱讀 57 分鐘
投資理財內容聲明

因為程式碼太常超過篇幅字數限制所以分段

# -*- coding: utf-8 -*-

"""

K → weekK/monthK/yearK.parquet(W-FRI + 玩股口徑 + 本地快取 + 7 QA

+ 包含所有修正與優化 (過濾/統計/QA輸出)

"""



import os, re, glob, time, warnings, random

from math import isfinite

from pathlib import Path

from concurrent.futures import ThreadPoolExecutor, as_completed

import sys # 用於 Colab 環境檢查



import numpy as np

import pandas as pd

import pyarrow as pa

import pyarrow.parquet as pq

from shutil import copy2



# ===== 基本設定 =====

warnings.filterwarnings("ignore", category=DeprecationWarning)

warnings.filterwarnings("ignore", category=FutureWarning)

warnings.filterwarnings("ignore")



WRITE_MODE = 'B'            # 'A' / 'B' / 'C'



# 🌟 優化 1: Row Group 翻倍或四倍

ROW_GROUP_SIZE = 512_000    # 建議 512,000

# 🌟 優化 2: 增加執行緒數,最大化 CPU 利用率

MAX_WORKERS = 16            # 建議 16 或更高

# 🌟 優化 3: 顯著加大批次合併量,減少 I/O 寫入次數

SHARD_SIZE = 10000          # 建議 8,000 - 15,000 之間



# 🌟 快取控制 (保留快取,下次執行跳過複製)

KEEP_LOCAL_CACHE = True



MARKET_LIST = [

    "us-share", "hk-share", "kr-share", "cn-share"

    # "us-share", "hk-share", "jp-share", "kr-share", "cn-share",

]



MARKET_ABBR_MAP = {

    "tw-share": "TW", "us-share": "US", "cn-share": "CN",

    "hk-share": "HK", "jp-share": "KR", "kr-share": "KR"

}



# 根據您的需求,假設 DRIVE_BASELOCAL_BASE 如下

try:

    # 檢查是否在 Colab 環境中

    if 'google.colab' in sys.modules:

        from google.colab import drive

        try:

            # 避免重複 mount 警告

            drive.mount('/content/drive', force_remount=False)

        except Exception:

            pass

        DRIVE_BASE = "/content/drive/MyDrive/各國股票檔案"

    else:

        DRIVE_BASE = os.path.abspath("./data_drive")

except Exception:

    DRIVE_BASE = os.path.abspath("./data_drive")



LOCAL_BASE = "/content/_wmy_tmp"

LOCAL_DAYK_CACHE = f"{LOCAL_BASE}/dayK_cache"   # 本地日K快取(保留)



# ===== 範圍限制(只處理 2024-01-01 ~ 2025-12-31=====

DATE_START_STR = "2000-01-01"

DATE_END_STR   = "2025-12-31"

PAD_DAYS       = 14    # 緩衝天數:保留起始日前 N 天,讓第一期有前收可算



DATE_START = pd.Timestamp(DATE_START_STR)

DATE_END   = pd.Timestamp(DATE_END_STR)

DATE_PAD_START = DATE_START - pd.Timedelta(days=PAD_DAYS)



# ===== 清洗與偵測參數 =====

MIN_YEAR = 1990

ALLOW_ZERO_VOLUME = True



CHECK_RETURN_D = True

# [***修正點 A***] 將日 K 極端報酬門檻調高,確保極端跳空日不會被日 K 預先刪除。

EXTREME_RET_GAP = 100.00

EXTREME_RET_CANDLE = 100.00

EXTREME_RET_MAX_FROMP = 100.00

EXTREME_RET_MIN_FROMP = 100.00

ABS_MAX_REASONABLE = 100.00



# 週級安全閥:上一週無交易且本週大跳空 → 視為拆分/復牌,不計報酬

STOPWEEK_JUMP_THRESHOLD = 0.80   # 80%



# ====== 復牌偵測參數(NEW======

LONG_GAP_DAYS = 5        # 與前一筆有效交易日相隔 ≥ N 天 → 疑似停牌/長假

RESUME_JUMP_THRESHOLD = 0.30     # 復牌第一天:|開盤/前收 - 1|30%

GHOST_MIN_LEN = 2        # 連續 ghost(四價相等且零量)至少幾天算占位段

PRE_RESUME_DAYS = 5      # 在 ghost 段「前面」最多回溯幾天,若大跳空也視為復牌頭

EPS_EQ = 1e-8            # 四價相等容差



SKIP_SUSPECT_STOCK = True

PINGPONG_THRESHOLD = 0.40



# [***新增 M/Y 門檻***] 穩健過濾參數

MIN_DAYS_MONTH = 5      # 月K:至少要有 5 個交易日

MIN_DAYS_YEAR = 120      # 年K:至少要有 120 個交易日 (約半年)

BIG_RET_MONTH_YEAR = 0.50 # 月/年報酬 |Ret_Trad| > 50% 且 HasResume/薄樣本 → 過濾

GAP_BIG_MONTH_YEAR = 0.50 # 月/年跳空 |Ret_Gap| > 50% 且 HasResume/薄樣本 → 過濾



# 欄位候選

DATE_COLS  = ['date','日期','Date']

OPEN_COLS  = ['open','開盤','開盤價','Open']

HIGH_COLS  = ['high','最高','最高價','High']

LOW_COLS   = ['low','低','最低價','Low']

CLOSE_COLS = ['close','收盤','收盤價','Adj Close','adj close','Close']

VOL_COLS   = ['volume','成交量','Volume']

SPLIT_COLS = ['Stock Splits','stock splits','Splits']

DIV_COLS   = ['Dividends','dividends']



def _is_equal4(o, h, l, c, eps=EPS_EQ):

    mx = max(o, h, l, c); mn = min(o, h, l, c)

    return (mx - mn) <= eps



def _pick(df, cands):

    low = {str(c).lower(): c for c in df.columns}

    for k in cands:

        if k.lower() in low:

            return low[k.lower()]

    for k in cands:

        rx = re.compile(k, re.I)

        for c in df.columns:

            if rx.search(str(c)):

                return c

    return None



def _read_csv_fast(path: str) -> pd.DataFrame:

    try:

        return pd.read_csv(path, encoding='utf-8-sig', engine='pyarrow')

    except Exception:

        try:

            return pd.read_csv(path, encoding='utf-8', engine='pyarrow')

        except Exception:

            try:

                return pd.read_csv(path, encoding='utf-8-sig')

            except Exception:

                return pd.read_csv(path, encoding='utf-8')



def _parse_id_name(stem: str):

    m = re.match(r'(?P<id>[^_]+)_(?P<name>.+)$', stem)

    return (m.group('id'), m.group('name')) if m else (stem, "")



def _canonical_id(raw_id: str) -> str:

    return raw_id.split('.')[0] if '.' in raw_id else raw_id



def _prepare_local_dayk(market_key: str) -> str:

    drive_dayk_dir = Path(f"{DRIVE_BASE}/{market_key}/dayK")



    local_dayk_dir = Path(f"{LOCAL_DAYK_CACHE}/{market_key}")

    local_dayk_dir.mkdir(parents=True, exist_ok=True)



    csv_files = sorted(glob.glob(str(drive_dayk_dir / "*.csv")))

    if not csv_files:

        print(f"❌ 快取失敗: Drive 上的 {drive_dayk_dir} 找不到任何 CSV 檔案。")

        return ""



    def _copy_if_newer(src: str, dst_dir: Path):

        dst = dst_dir / Path(src).name

        try:

            if not dst.exists():

                copy2(src, dst)

            else:

                s, d = Path(src).stat(), dst.stat()

                if (s.st_mtime > d.st_mtime) or (s.st_size != d.st_size):

                    copy2(src, dst)

        except Exception as e:

            print(f"⚠️ 快取檔案失敗:{src} -> {dst} ({e})")



    t_start = time.time()

    print(f"\n🚀 增量快取 {len(csv_files)} 檔日K CSV 到本地:{local_dayk_dir}")

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:

        list(as_completed([executor.submit(_copy_if_newer, f, local_dayk_dir) for f in csv_files]))

    print(f"✅ 快取完成!耗時 {time.time() - t_start:.2f} 秒。路徑: {local_dayk_dir}")

    return str(local_dayk_dir)



def _basic_price_checks(df: pd.DataFrame, allow_zero_vol=ALLOW_ZERO_VOLUME) -> pd.DataFrame:

    df = df.copy()

    for c in ['開盤','最高','最低','收盤','成交量']:

        if c in df.columns:

            df[c] = pd.to_numeric(df[c], errors='coerce')

    df = df[(df['開盤'] > 0) & (df['最高'] > 0) & (df['最低'] > 0) & (df['收盤'] > 0)]

    df = df[(df['最高'] >= df[['開盤','收盤','最低']].max(axis=1)) &

            (df['最低'] <= df[['開盤','收盤','最高']].min(axis=1)) &

            (df['最高'] >= df['最低'])]

    if not allow_zero_vol and '成交量' in df.columns:

        df = df[df['成交量'] > 0]

    # [***修正點 1/3:這裡暫不處理時區***]

    df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)

    df = df.dropna(subset=['日期'])

    df = df[df['日期'].dt.year >= MIN_YEAR]

    df = df.sort_values('日期').drop_duplicates(subset=['日期']).reset_index(drop=True)

    return df



def _extreme_return_filter_day(df: pd.DataFrame) -> pd.DataFrame:

    if not CHECK_RETURN_D:

        return df

    df = df.copy()

    prev_c = df['收盤'].shift(1).replace(0, np.nan)

    o_safe = df['開盤'].replace(0, np.nan)

    ret_gap = (o_safe / prev_c) - 1

    ret_candle = (df['收盤'] / o_safe) - 1

    ret_max_p = (df['最高'] / prev_c) - 1

    ret_min_p = (df['最低'] / prev_c) - 1

    mask_ok = (

        (ret_gap.abs() < EXTREME_RET_GAP) &

        (ret_candle.abs() < EXTREME_RET_CANDLE) &

        (ret_max_p.abs() < EXTREME_RET_MAX_FROMP) &

        (ret_min_p.abs() < EXTREME_RET_MIN_FROMP) &

        (ret_gap.abs() <= ABS_MAX_REASONABLE) &

        (ret_candle.abs() <= ABS_MAX_REASONABLE) &

        (ret_max_p.abs() <= ABS_MAX_REASONABLE) &

        (ret_min_p.abs() <= ABS_MAX_REASONABLE)

    )

    return df[mask_ok.fillna(False)].copy()



def _clip_by_date_range(df: pd.DataFrame, col='日期',

                         start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START):

    """裁成 2024–2025;起始保留 pad_start 以便第一期有前收。"""

    if df.empty or col not in df.columns:

        return df

    m = (df[col] >= pad_start) & (df[col] <= end)

    return df.loc[m].copy()



# [已修改] 函式名稱: _mark_resume_flags_with_ghost (向量化優化)

def _mark_resume_flags_with_ghost(df_orig: pd.DataFrame) -> pd.Series:

    """在『未刪除 ghost』表上標記復牌日。(向量化優化)"""

    if df_orig.empty:

        return pd.Series(False, index=df_orig.index)



    df = df_orig.copy()

   

    # 🌟 向量化檢查四價相等

    o, h, l, c = df['開盤'].to_numpy(), df['最高'].to_numpy(), df['最低'].to_numpy(), df['收盤'].to_numpy()

   

    # 檢查 NaN/Inf 且四價相等

    eq_ohlc_vec = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \

                  ((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)

                 

    ghost_mask = (df['成交量'].fillna(0) == 0) & eq_ohlc_vec



    prev_valid_close = df['收盤'].where(~ghost_mask).shift(1).ffill()

    prev_valid_date  = df['日期'].where(~ghost_mask).shift(1).ffill()



    gap_days = (df['日期'] - prev_valid_date).dt.days

    open_to_prev = (df['開盤'] / prev_valid_close) - 1

    cond_A = (gap_days >= LONG_GAP_DAYS) & (open_to_prev.abs() >= RESUME_JUMP_THRESHOLD)



    g = ghost_mask.to_numpy(); n = len(df)

    resume_idx = set(np.where(cond_A)[0].tolist())



    i = 0

    while i < n:

        if g[i]:

            j = i

            while j < n and g[j]:

                j += 1

            seg_len = j - i

            if seg_len >= GHOST_MIN_LEN:

                for back in range(1, PRE_RESUME_DAYS + 1):

                    k = i - back

                    if k < 0: break

                    if (not g[k]) and (abs(open_to_prev.iloc[k]) >= RESUME_JUMP_THRESHOLD):

                        resume_idx.add(k)

            i = j

        else:

            i += 1



    s = pd.Series(False, index=df.index)

    if resume_idx:

        s.iloc[sorted(list(resume_idx))] = True

    return s



# [已修改] 函式名稱: _load_day_clean_full (配合向量化)

def _load_day_clean_full(path: str) -> pd.DataFrame:

    """正規欄位對齊 → 標記復牌 → 刪 ghost → 價量檢查 → 極端過濾"""

    raw = _read_csv_fast(path)



    dc = _pick(raw, DATE_COLS)

    oc = _pick(raw, OPEN_COLS)

    hc = _pick(raw, HIGH_COLS)

    lc = _pick(raw, LOW_COLS)

    cc = _pick(raw, CLOSE_COLS)

    vc = _pick(raw, VOL_COLS)

    sc = _pick(raw, SPLIT_COLS)  # 可能不存在



    if not all([dc, oc, hc, lc, cc]):

        raise ValueError(f"缺必要欄位:{Path(path).name}")

    if vc is None:

        raw['__vol__'] = np.nan

        vc = '__vol__'



    df0 = raw[[dc, oc, hc, lc, cc, vc]].copy()

    df0.columns = ['日期','開盤','最高','最低','收盤','成交量']

    for c in ['開盤','最高','最低','收盤','成交量']:

        df0[c] = pd.to_numeric(df0[c], errors='coerce')



    # [***修正 1:時區邊界錯誤修正***]

    df0['日期'] = pd.to_datetime(df0['日期'], errors='coerce')

    if df0['日期'].dt.tz is not None:

        df0['日期'] = df0['日期'].dt.tz_convert('Asia/Taipei').dt.tz_localize(None)

    else:

        df0['日期'] = df0['日期'].dt.tz_localize(None)

    # [***修正 1 結束***]



    resume_flag = _mark_resume_flags_with_ghost(df0)

   

    # 🌟 配合向量化邏輯

    o, h, l, c = df0['開盤'].to_numpy(), df0['最高'].to_numpy(), df0['最低'].to_numpy(), df0['收盤'].to_numpy()

    eq_ohlc = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \

              ((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)

   

    ghost_mask = (df0['成交量'].fillna(0) == 0) & eq_ohlc

   

    if sc is not None and sc in raw.columns:

        splits = pd.to_numeric(raw[sc], errors='coerce').fillna(0)

        ghost_mask = ghost_mask | ((df0['成交量'].fillna(0) == 0) & (splits > 0))



    df = df0.loc[~ghost_mask].copy()

    df['resume_flag'] = resume_flag.loc[df.index]



    # === [***最終修正點 B***] 強化日 K 復牌標記:捕捉未能被 Ghost 邏輯捕捉的極端跳空 ===

    DAILY_JUMP_THRESHOLD = 1.00 # 100% 日報酬

    prev_c_day = df['收盤'].shift(1).replace(0, np.nan)

    daily_ret = (df['收盤'] / prev_c_day) - 1

    extreme_daily_jump_mask = daily_ret.abs().gt(DAILY_JUMP_THRESHOLD).fillna(False)

    df['resume_flag'] = df['resume_flag'] | extreme_daily_jump_mask

    # ====================================================================================



    df = _basic_price_checks(df)

    if df.empty:

        raise ValueError("清洗後為空")



    df = _extreme_return_filter_day(df)

    if df.empty:

        raise ValueError("極端報酬過濾後為空")



    return df

def _load_day_clean_full(path: str) -> pd.DataFrame:

    """正規欄位對齊 → 標記復牌 → 刪 ghost → 價量檢查 → 極端過濾"""

    raw = _read_csv_fast(path)



    dc = _pick(raw, DATE_COLS)

    oc = _pick(raw, OPEN_COLS)

    hc = _pick(raw, HIGH_COLS)

    lc = _pick(raw, LOW_COLS)

    cc = _pick(raw, CLOSE_COLS)

    vc = _pick(raw, VOL_COLS)

    sc = _pick(raw, SPLIT_COLS)  # 可能不存在



    if not all([dc, oc, hc, lc, cc]):

        raise ValueError(f"缺必要欄位:{Path(path).name}")

    if vc is None:

        raw['__vol__'] = np.nan

        vc = '__vol__'



    df0 = raw[[dc, oc, hc, lc, cc, vc]].copy()

    df0.columns = ['日期','開盤','最高','最低','收盤','成交量']

    for c in ['開盤','最高','最低','收盤','成交量']:

        df0[c] = pd.to_numeric(df0[c], errors='coerce')



    # [***修正 1:時區邊界錯誤修正***]

    df0['日期'] = pd.to_datetime(df0['日期'], errors='coerce')



    # 嘗試將日期時間處理為本地日期(假設源數據是 UTC 或本地 +08:00

    if df0['日期'].dt.tz is not None:

        # 如果有時區資訊,先轉成 +08:00 再移除

        df0['日期'] = df0['日期'].dt.tz_convert('Asia/Taipei').dt.tz_localize(None)

    else:

        # 如果沒有時區資訊,視為無時區的本地時間

        df0['日期'] = df0['日期'].dt.tz_localize(None)

    # [***修正 1 結束***]



    resume_flag = _mark_resume_flags_with_ghost(df0)



    eq_ohlc = df0[['開盤','最高','最低','收盤']].apply(lambda r: np.isfinite(r).all() and _is_equal4(*r), axis=1)

    ghost_mask = (df0['成交量'].fillna(0) == 0) & eq_ohlc

    if sc is not None and sc in raw.columns:

        splits = pd.to_numeric(raw[sc], errors='coerce').fillna(0)

        ghost_mask = ghost_mask | ((df0['成交量'].fillna(0) == 0) & (splits > 0))



    df = df0.loc[~ghost_mask].copy()

    df['resume_flag'] = resume_flag.loc[df.index]



    # === [***最終修正點 B***] 強化日 K 復牌標記:捕捉未能被 Ghost 邏輯捕捉的極端跳空 ===

    DAILY_JUMP_THRESHOLD = 1.00 # 100% 日報酬(即股價翻倍或腰斬)



    prev_c_day = df['收盤'].shift(1).replace(0, np.nan)

    daily_ret = (df['收盤'] / prev_c_day) - 1



    # 如果日報酬絕對值 > 100%,則強制視為復牌/重大事件

    extreme_daily_jump_mask = daily_ret.abs().gt(DAILY_JUMP_THRESHOLD).fillna(False)

    df['resume_flag'] = df['resume_flag'] | extreme_daily_jump_mask

    # ====================================================================================



    df = _basic_price_checks(df)

    if df.empty:

        raise ValueError("清洗後為空")



    df = _extreme_return_filter_day(df)

    if df.empty:

        raise ValueError("極端報酬過濾後為空")



    return df



# [***新函式 2***] 統一處理 W/M/YOHLC 聚合,並帶上 HasResume / Days

def _resample_ohlc_with_flags(df: pd.DataFrame, rule: str) -> pd.DataFrame:

    df = df.copy()



    # [***修正 2:移除多餘的 tz_localize(None)***]

    df['日期'] = pd.to_datetime(df['日期'], errors='coerce')

    g = df.set_index('日期').sort_index()



    if rule == 'W':  # 以週五結算(與玩股網一致)

        rule_code, label, closed = 'W-FRI', 'right', 'right'

        period_name = 'Week'

    elif rule == 'M':

        rule_code, label, closed = 'ME', 'right', 'right'

        period_name = 'Period'

    elif rule == 'Y':

        rule_code, label, closed = 'YE', 'right', 'right'

        period_name = 'Period'

    else:

        raise ValueError("rule must be 'W', 'M', or 'Y'.")



    o = g['開盤'].resample(rule_code, label=label, closed=closed).first()

    h = g['最高'].resample(rule_code, label=label, closed=closed).max()

    l = g['最低'].resample(rule_code, label=label, closed=closed).min()

    c = g['收盤'].resample(rule_code, label=label, closed=closed).last()

    v = g['成交量'].resample(rule_code, label=label, closed=closed).sum(min_count=1)



    # 期間交易天數

    n = g['收盤'].resample(rule_code, label=label, closed=closed).count().rename(f'Cur{period_name}_Days')



    # 期間是否發生過 resume_flag

    has_resume = None

    if 'resume_flag' in g.columns:

        has_resume = g['resume_flag'].resample(rule_code, label=label, closed=closed).max().rename('HasResume')



    parts = [o, h, l, c, v, n]

    if has_resume is not None:

        parts.append(has_resume)



    out = pd.concat(parts, axis=1).dropna(subset=['開盤','收盤'])

    cols = ['開盤','最高','最低','收盤','成交量', f'Cur{period_name}_Days']

    if has_resume is not None:

        cols.append('HasResume')

    out.columns = cols



    # 上期交易天數

    out[f'Prev{period_name}_Days'] = out[f'Cur{period_name}_Days'].shift(1).fillna(0).astype(int)



    if rule == 'W':

        iso = out.index.to_series().dt.isocalendar()

        out['ISO_Week'] = iso.year.astype(str) + '-' + iso.week.astype(str).str.zfill(2)

        out = out.rename(columns={'CurWeek_Days': 'CurWeek_Days', 'PrevWeek_Days': 'PrevWeek_Days'})



    out = out.reset_index().rename(columns={'index':'日期'})

    return _basic_price_checks(out)



# [***修正函式 3***] 整合 W/M/Y 報酬計算與過濾邏輯 + 新增 QA 追蹤

def _add_period_returns(df: pd.DataFrame, freq_code: str) -> pd.DataFrame:

    df = df.sort_values('日期').reset_index(drop=True).copy()

    prev_c = df['收盤'].shift(1).replace(0, np.nan)

    o = df['開盤'].replace(0, np.nan)

    h = df['最高'].replace(0, np.nan)

    l = df['最低'].replace(0, np.nan)

    c = df['收盤'].replace(0, np.nan)



    # 報酬率計算 (與您原代碼一致)

    df[f'PrevC_{freq_code}']          = prev_c

    df[f'Ret_Gap_{freq_code}']        = (o / prev_c) - 1

    df[f'Ret_Trad_{freq_code}']       = (c - prev_c) / prev_c

    df[f'Ret_C_{freq_code}']          = (c - o) / o

    df[f'Ret_Max_H_{freq_code}']      = (h - prev_c) / prev_c

    df[f'Ret_Min_L_{freq_code}']      = (l - prev_c) / prev_c

    df[f'Range_{freq_code}']          = (h - l) / o

    df[f'Ret_Max_H_Pos_{freq_code}']  = df[f'Ret_Max_H_{freq_code}'].clip(lower=0)

    df[f'Ret_H_from_O_{freq_code}']   = (h - o) / o

    df[f'Ret_L_from_O_{freq_code}']   = (l - o) / o

    df[f'Ret_End_RelH_{freq_code}']   = (c - h) / h

    df[f'Ret_End_RelL_{freq_code}']   = (c - l) / l



    # --- 過濾邏輯 ---

    final_clear_mask = pd.Series(False, index=df.index)

    cols_to_nan = [f'Ret_Trad_{freq_code}', f'Ret_Max_H_{freq_code}', f'Ret_Min_L_{freq_code}']



    # 預設 IsFiltered_QA 為 0 (M/Y 需要)

    if freq_code in ('M', 'Y'):

        df['IsFiltered_QA'] = 0



    if freq_code == 'W' and 'PrevWeek_Days' in df.columns:

        # 1.K 玩股口徑過濾:上一週無交易且本週大跳空

        big_jump = (df[f'PrevC_W'] > 0) & (df['開盤'] > 0) & ((df['開盤'] / df[f'PrevC_W'] - 1).abs() > STOPWEEK_JUMP_THRESHOLD)

        prev_w_no_trade = (df['PrevWeek_Days'] == 0)

        jump_mask = big_jump & prev_w_no_trade

        final_clear_mask = final_clear_mask | jump_mask



        # 2.K HasResume + 極端事件過濾 ([***最終修正點 C***])

        EXTREME_RET_USER_REQ = 1.00 # 100%

        BIG_RET_THRESHOLD = 0.50

        has_resume_mask = df.get('HasResume', pd.Series(False, index=df.index)).fillna(0).gt(0)

        is_extreme_event_current_week = has_resume_mask & df[f'Ret_Trad_W'].abs().gt(EXTREME_RET_USER_REQ)

        skip_current_week_from_previous_event = is_extreme_event_current_week.shift(1).fillna(False)

        mask_thin_or_large_jump = has_resume_mask & (df['CurWeek_Days'].lt(3) | df[f'Ret_Trad_W'].abs().gt(BIG_RET_THRESHOLD))

        final_clear_mask = final_clear_mask | is_extreme_event_current_week | skip_current_week_from_previous_event | mask_thin_or_large_jump



    elif freq_code in ('M', 'Y') and 'HasResume' in df.columns:

        # 3./K 穩健過濾:薄樣本 + 停牌復牌 + 大跳空

        period_name = 'Period'



        # [***修正 3:修正 Days 欄位名稱判斷***]

        days_col = 'CurPeriod_Days' if 'CurPeriod_Days' in df.columns else f'Cur{period_name}_Days'

        prev_days_col = 'PrevPeriod_Days' if 'PrevPeriod_Days' in df.columns else f'Prev{period_name}_Days'



        min_days = MIN_DAYS_MONTH if freq_code == 'M' else MIN_DAYS_YEAR



        # 條件 A: 覆蓋率不足 (新掛牌或殘年)

        small_coverage_mask = df[days_col].fillna(0).astype(int) < min_days



        # 條件 B: 停牌/復牌週期極端值

        has_resume = df['HasResume'].fillna(0).gt(0)

        thin_mask  = df[days_col].fillna(0).astype(int) < 3 # 極薄樣本 (3日以下)

        big_gap_mask   = df[f'Ret_Gap_{freq_code}'].abs() > GAP_BIG_MONTH_YEAR

        big_ret_mask   = df[f'Ret_Trad_{freq_code}'].abs() > BIG_RET_MONTH_YEAR

        resume_extreme_mask = has_resume & (thin_mask | big_ret_mask | big_gap_mask)



        # 條件 C: 上期無交易 + 本期大跳空 (類似週K的玩股口徑)

        stop_jump_mask = (df[prev_days_col].fillna(0).astype(int) == 0) & (df[f'Ret_Gap_{freq_code}'].abs() > STOPWEEK_JUMP_THRESHOLD)



        final_clear_mask = final_clear_mask | small_coverage_mask | stop_jump_mask | resume_extreme_mask



        # 🎯 [***新增 QA 統計***]:標記被過濾的行

        df.loc[final_clear_mask, 'IsFiltered_QA'] = 1



    # 應用過濾:將滿足過濾條件的報酬設為 NaN

    if final_clear_mask.any():

        df.loc[final_clear_mask, cols_to_nan] = np.nan



    return df

class ParquetStreamer:

    def __init__(self, path: str, keep_cols=None, row_group_size: int = ROW_GROUP_SIZE):

        self.path = path

        self.writer = None

        self.schema = None

        self.rows = 0

        self.keep_cols = keep_cols

        self.row_group_size = row_group_size



    def _ensure_writer(self, table: pa.Table):

        if self.writer is None:

            Path(self.path).parent.mkdir(parents=True, exist_ok=True)

            self.schema = table.schema

            self.writer = pq.ParquetWriter(self.path, self.schema, compression='snappy')



    def append_df(self, df: pd.DataFrame):

        if df is None or df.empty:

            return

        df = df.copy()

        if self.keep_cols is not None:

            for c in self.keep_cols:

                if c not in df.columns:

                    df[c] = np.nan

            df = df.reindex(columns=self.keep_cols)

        if '日期' in df.columns:

            df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)



        # 確保 IsFiltered_QA 欄位是整數

        if 'IsFiltered_QA' in df.columns:

            df['IsFiltered_QA'] = pd.to_numeric(df['IsFiltered_QA'], errors='coerce').fillna(0).astype('int32')



        for c in df.columns:

            if df[c].dtype in ['int64','float64','int32','float32'] and c not in ['ISO_Week', 'IsFiltered_QA']:

                df[c] = pd.to_numeric(df[c], errors='coerce').astype(np.float64)



        table = pa.Table.from_pandas(df, preserve_index=False)

        self._ensure_writer(table)

        if table.schema != self.schema:

            table = pa.Table.from_pandas(df.reindex(columns=[f.name for f in self.schema]), preserve_index=False).cast(self.schema, safe=False)

        self.writer.write_table(table, row_group_size=self.row_group_size)

        self.rows += len(df)



    def close(self):

        if self.writer is not None:

            try:

                self.writer.close()

            except:

                pass
留言
avatar-img
留言分享你的想法!
avatar-img
《炒股不看周月年K漲幅機率就是耍流氓》
3會員
230內容數
普通上班族,用 AI 與 Python 將炒股量化。我的數據宣言是:《炒股不做量化,都是在耍流氓》。
2025/11/08
📋 程式概述 這是一個企業級股票數據清洗與時間週期轉換系統,專門處理日 K 線數據並轉換為週/月/年 K 線,同時進行多層次的數據品質控管與異常偵測。程式採用玩股網口徑標準,確保數據品質符合量化交易需求。 🎯 核心功能架構 1. 數據來源與處理範圍 時間範圍:2000-01-01
2025/11/08
📋 程式概述 這是一個企業級股票數據清洗與時間週期轉換系統,專門處理日 K 線數據並轉換為週/月/年 K 線,同時進行多層次的數據品質控管與異常偵測。程式採用玩股網口徑標準,確保數據品質符合量化交易需求。 🎯 核心功能架構 1. 數據來源與處理範圍 時間範圍:2000-01-01
2025/11/05
我不是什麼量化高手,也不是什麼金融工程師。 我只是那種會自己寫下載器、跑回測的人。 但說真的,如果沒有 AI 幫我,我根本不知道怎麼處理這些資料。 🧠 問題一:ETF 拆股後,Yahoo Finance 的 Adj Close 竟然沒調整? 理論上: Adj Close = Close
2025/11/05
我不是什麼量化高手,也不是什麼金融工程師。 我只是那種會自己寫下載器、跑回測的人。 但說真的,如果沒有 AI 幫我,我根本不知道怎麼處理這些資料。 🧠 問題一:ETF 拆股後,Yahoo Finance 的 Adj Close 竟然沒調整? 理論上: Adj Close = Close
2025/11/04
我會回答:有沒有可能付費資料也有問題,只是沒被發現? 市面上絕大多數教學都遵循一條「快速上手」的路線: 使用某個 T 開頭的回測平台(你知道是哪個) 安裝 Python 套件如 backtrader、bt、yfinance 丟入策略,跑出回測結果 然後就開始分析報酬率、夏普值、最大回撤
2025/11/04
我會回答:有沒有可能付費資料也有問題,只是沒被發現? 市面上絕大多數教學都遵循一條「快速上手」的路線: 使用某個 T 開頭的回測平台(你知道是哪個) 安裝 Python 套件如 backtrader、bt、yfinance 丟入策略,跑出回測結果 然後就開始分析報酬率、夏普值、最大回撤
看更多
你可能也想看
Thumbnail
吸引力法則是互相的,頻率相近的兩方總會尋到彼此。香氛和你也是。
Thumbnail
吸引力法則是互相的,頻率相近的兩方總會尋到彼此。香氛和你也是。
Thumbnail
本文探討臺灣串流平臺的發展現況、競爭格局,並解析其帶來的經濟效應。透過美國電影協會(MPA)的講座內容,結合業界專家意見與生活觀察,文章揭示串流平臺如何影響內容製作, 同時討論臺灣有利的創作環境,包括自由的風氣和開放的政策,對於提升國家軟實力與國際影響力的重要性。
Thumbnail
本文探討臺灣串流平臺的發展現況、競爭格局,並解析其帶來的經濟效應。透過美國電影協會(MPA)的講座內容,結合業界專家意見與生活觀察,文章揭示串流平臺如何影響內容製作, 同時討論臺灣有利的創作環境,包括自由的風氣和開放的政策,對於提升國家軟實力與國際影響力的重要性。
Thumbnail
歡迎來到Scikit-learn教學系列的第二篇文章!在上篇中,我們介紹了Scikit-learn與機器學習基礎,並探索了Iris資料集。這一篇將聚焦於資料預處理,我們將學習如何使用Scikit-learn清理資料、處理缺失值、進行特徵縮放與類別編碼,並以真實資料集進行實作。
Thumbnail
歡迎來到Scikit-learn教學系列的第二篇文章!在上篇中,我們介紹了Scikit-learn與機器學習基礎,並探索了Iris資料集。這一篇將聚焦於資料預處理,我們將學習如何使用Scikit-learn清理資料、處理缺失值、進行特徵縮放與類別編碼,並以真實資料集進行實作。
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 整理目前手上有的素材: AI說書 - 從0開始 - 180 | RoBERTa 預訓練前言:RoBERTa 預訓練前言 AI說書 - 從0開始 - 181 | 預訓
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 整理目前手上有的素材: AI說書 - 從0開始 - 180 | RoBERTa 預訓練前言:RoBERTa 預訓練前言 AI說書 - 從0開始 - 181 | 預訓
Thumbnail
為了讓資料更適合進行後續的分析、建立模型,模型的決策準確性,資料探索與清理是資料分析過程中非常重要的步驟,主要目的在於確保資料的品質和可靠性。 因為前幾篇的例子中的資料,並沒有缺失值與重複值的部分,我另外找了一份有包含的資料來做案例分析,由於找到的資料沒有重複值的部分,故本文主要解釋處理缺失值的部
Thumbnail
為了讓資料更適合進行後續的分析、建立模型,模型的決策準確性,資料探索與清理是資料分析過程中非常重要的步驟,主要目的在於確保資料的品質和可靠性。 因為前幾篇的例子中的資料,並沒有缺失值與重複值的部分,我另外找了一份有包含的資料來做案例分析,由於找到的資料沒有重複值的部分,故本文主要解釋處理缺失值的部
Thumbnail
從基本概念開始,然後逐步深入學習 pandas 的各種功能。這是一個非常強大的 Python 資料分析工具,常用於處理結構化數據。 基本概念 pandas 主要有兩個核心資料結構: Series: 一維的資料結構,類似於 Python 中的列表,但它可以帶有標籤(index)。 DataFr
Thumbnail
從基本概念開始,然後逐步深入學習 pandas 的各種功能。這是一個非常強大的 Python 資料分析工具,常用於處理結構化數據。 基本概念 pandas 主要有兩個核心資料結構: Series: 一維的資料結構,類似於 Python 中的列表,但它可以帶有標籤(index)。 DataFr
Thumbnail
繼「【🔒 Python實戰營 - Data Science 必修班】Pandas 資料清洗技 - 填補式」之後,我們已經學會怎麼填補空缺資料了,那這個章節我們來教您如何對某些欄位有條件的整形,有時候我們的資料來源某些欄位資料格式不一,甚至型態都不是正規統一的值,此時我們就需要針對這些值進行一些處理
Thumbnail
繼「【🔒 Python實戰營 - Data Science 必修班】Pandas 資料清洗技 - 填補式」之後,我們已經學會怎麼填補空缺資料了,那這個章節我們來教您如何對某些欄位有條件的整形,有時候我們的資料來源某些欄位資料格式不一,甚至型態都不是正規統一的值,此時我們就需要針對這些值進行一些處理
Thumbnail
繼「【Google Colab Python系列】 資料處理神器 Pandas 起手式」之後,相信對於各位來說已經是小兒科了吧,沒關係! 我們今天來增加一點點小挑戰,你知道嗎? Pandas對於大部分人的第一印象就是「不就表格化而已,有什麼了不起?」、「幫我們整理格式轉換的介接器」...,但其實它不
Thumbnail
繼「【Google Colab Python系列】 資料處理神器 Pandas 起手式」之後,相信對於各位來說已經是小兒科了吧,沒關係! 我們今天來增加一點點小挑戰,你知道嗎? Pandas對於大部分人的第一印象就是「不就表格化而已,有什麼了不起?」、「幫我們整理格式轉換的介接器」...,但其實它不
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News