日K → 週/月/年K 清洗與報酬計算模組說明-2

更新 發佈閱讀 32 分鐘
投資理財內容聲明

由於方格子單篇文章有字數限制,因此本次完整的日K → 週/月/年K 清洗與報酬計算程式碼,將分成兩篇文章提供:




def detect_pingpong_patterns(day_df: pd.DataFrame, price_col='收盤', threshold=0.4) -> pd.DataFrame:

    # ... (函式內容保持不變) ...

    df = day_df.sort_values('日期').reset_index(drop=True).copy()

    if price_col not in df.columns:

        return pd.DataFrame()

    df['pct_change'] = df[price_col].pct_change()

    rows = []

    for i in range(1, len(df)-1):

        prev_change = df.loc[i, 'pct_change']

        next_change = df.loc[i+1, 'pct_change']

        if pd.notna(prev_change) and pd.notna(next_change):

            if abs(prev_change) > threshold and abs(next_change) > threshold and prev_change * next_change < 0:

                rows.append({

                    '日期(異常日)': df.loc[i, '日期'],

                    '前日收盤': df.loc[i-1, price_col],

                    '當日收盤': df.loc[i, price_col],

                    '次日收盤': df.loc[i+1, price_col],

                    '前日變動(%)': round(prev_change * 100, 2),

                    '次日變動(%)': round(next_change * 100, 2),

                })

    return pd.DataFrame(rows)



def _process_one_file(path: str):

    try:

        df_day = _load_day_clean_full(path)

        # 僅保留 20242025(含起始緩衝)

        df_day = _clip_by_date_range(df_day, col='日期',

                                     start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START)



        stem = Path(path).stem

        raw_id, _ = _parse_id_name(stem)

        cid = _canonical_id(raw_id) # StockID 必須在這裡定義



        if df_day.empty or df_day['日期'].max() < DATE_PAD_START:

            return "SKIP", {'StockID': cid, 'file': Path(path).name,

                            'count': 0, 'reason': 'out_of_range'}, f"{Path(path).name}: out_of_range"



        if SKIP_SUSPECT_STOCK:

            suspects = detect_pingpong_patterns(df_day, threshold=PINGPONG_THRESHOLD)

            if not suspects.empty:

                msg = f"{Path(path).name}: [SKIP_PINGPONG] 命中疑似除權錯位 {len(suspects)} 筆"

                # 將 cid 傳入 payload

                return "SKIP", {'StockID': cid, 'file': Path(path).name, 'count': len(suspects), 'reason': 'suspect_pingpong'}, msg



        # 週//K 轉換

        dfw = _resample_ohlc_with_flags(df_day, 'W'); dfw = _add_period_returns(dfw, 'W'); dfw['成交量'] = dfw['成交量'].astype('float64'); dfw['StockID'] = cid

        dfm = _resample_ohlc_with_flags(df_day, 'M'); dfm = _add_period_returns(dfm, 'M'); dfm['成交量'] = dfm['成交量'].astype('float64'); dfm['StockID'] = cid

        dfy = _resample_ohlc_with_flags(df_day, 'Y'); dfy = _add_period_returns(dfy, 'Y'); dfy['成交量'] = dfy['成交量'].astype('float64'); dfy['StockID'] = cid



        # 週//K 只留 20242025(不需要 pad)

        for _df in (dfw, dfm, dfy):

            _df.dropna(subset=['日期'], inplace=True)

            _df['日期'] = pd.to_datetime(_df['日期']).dt.tz_localize(None)

        dfw = dfw[(dfw['日期'] >= DATE_START) & (dfw['日期'] <= DATE_END)]

        dfm = dfm[(dfm['日期'] >= DATE_START) & (dfm['日期'] <= DATE_END)]

        dfy = dfy[(dfy['日期'] >= DATE_START) & (dfy['日期'] <= DATE_END)]



        return True, (dfw, dfm, dfy), None

    except Exception as e:

        return False, None, f"{Path(path).name}: {e}"



def _split_into_shards(items, shard_size):

    return [items[i:i+shard_size] for i in range(0, len(items), shard_size)] if shard_size > 0 else [items]



def build_wmy_parquets(market_key: str):

    base_dir = LOCAL_BASE if WRITE_MODE in ('A','B') else DRIVE_BASE

    market_abbr = MARKET_ABBR_MAP.get(market_key, market_key.replace('-share','').upper())



    # 1) 本地快取

    local_dayk_dir = _prepare_local_dayk(market_key)

    if not local_dayk_dir:

        return None, None, None



    # 2) 輸出路徑

    out_week  = f"{base_dir}/{market_key}/weekK_{market_abbr}.parquet"

    out_month = f"{base_dir}/{market_key}/monthK_{market_abbr}.parquet"

    out_year  = f"{base_dir}/{market_key}/yearK_{market_abbr}.parquet"

    Path(f"{base_dir}/{market_key}").mkdir(parents=True, exist_ok=True)



    # 3) 檔案清單

    files = sorted(glob.glob(f"{local_dayk_dir}/*.csv"))

    if not files:

        print(f"❌ 快取路徑 {local_dayk_dir} 內無檔案,程式終止。")

        return None, None, None



    shards = _split_into_shards(files, SHARD_SIZE)

    print(f"\n📦 [{market_key}] 共 {len(files)} 檔,分 {len(shards)} 批處理")



    # 調整欄位列表以包含 IsFiltered_QA

    base_front = ['StockID','日期','開盤','最高','最低','收盤','成交量']

    base_common_wmy = ['CurPeriod_Days','PrevPeriod_Days', 'HasResume', 'IsFiltered_QA']

    base_common_w = ['CurWeek_Days','PrevWeek_Days', 'HasResume']



    ret_cols = [f'PrevC_{f}' for f in ['W','M','Y']] + [f'Ret_Gap_{f}' for f in ['W','M','Y']] + \

               [f'Ret_Trad_{f}' for f in ['W','M','Y']] + [f'Ret_C_{f}' for f in ['W','M','Y']] + \

               [f'Ret_Max_H_{f}' for f in ['W','M','Y']] + [f'Ret_Min_L_{f}' for f in ['W','M','Y']] + \

               [f'Range_{f}' for f in ['W','M','Y']] + [f'Ret_Max_H_Pos_{f}' for f in ['W','M','Y']] + \

               [f'Ret_H_from_O_{f}' for f in ['W','M','Y']] + [f'Ret_L_from_O_{f}' for f in ['W','M','Y']] + \

               [f'Ret_End_RelH_{f}' for f in ['W','M','Y']] + [f'Ret_End_RelL_{f}' for f in ['W','M','Y']]

    ret_cols = sorted(list(set(ret_cols)))



    wk_cols = base_front + base_common_w + [c for c in ret_cols if c.endswith('_W')] + ['ISO_Week']

    mo_cols = base_front + base_common_wmy + [c for c in ret_cols if c.endswith('_M')]

    yr_cols = base_front + base_common_wmy + [c for c in ret_cols if c.endswith('_Y')]



    # 移除重複的 Period/Week 欄位

    mo_cols = [c for c in mo_cols if not c.startswith('CurWeek') and not c.startswith('PrevWeek')]

    yr_cols = [c for c in yr_cols if not c.startswith('CurWeek') and not c.startswith('PrevWeek')]

    wk_cols = [c for c in wk_cols if not c.startswith('CurPeriod') and not c.startswith('PrevPeriod') and c != 'IsFiltered_QA']



    w_writer = ParquetStreamer(out_week,  keep_cols=wk_cols)

    m_writer = ParquetStreamer(out_month, keep_cols=mo_cols)

    y_writer = ParquetStreamer(out_year,  keep_cols=yr_cols)



    total_ok = total_fail = 0

    skip_records = []



    t0 = time.time()

    for si, shard in enumerate(shards, 1):

        print(f"🧩 Shard {si}/{len(shards)}: {len(shard)} 檔")

        shard_w, shard_m, shard_y = [], [], []



        # [***修正點: 將 futs 和 as_completed 移入 with 區塊***]

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:

            # 建立 futures 字典,用於追蹤檔案路徑

            futs = {ex.submit(_process_one_file, p): p for p in shard}



            # 遍歷 futures

            for f in as_completed(futs):

                path = futs[f]  # 從字典中獲取當前檔案路徑

                try:

                    status, payload, err = f.result()

                except Exception as e:

                    # 處理執行緒層級的極端錯誤

                    total_fail += 1

                    err = f"{Path(path).name}: Thread execution failed ({e})"

                    if total_fail <= 20:

                        print(" ❌", err)

                    continue



                if status is True:

                    dfw, dfm, dfy = payload

                    shard_w.append(dfw); shard_m.append(dfm); shard_y.append(dfy)

                    total_ok += 1

                elif status == "SKIP":

                    info = payload or {}

                    # 從 payload 中安全獲取 StockID

                    stock_id = info.get('StockID', Path(path).stem.split("_")[0])

                    skip_records.append({'StockID': stock_id, 'file': Path(path).name,

                                         'reason': info.get('reason', 'suspect_pingpong'), 'count': info.get('count', 0)})

                    if total_fail + total_ok + len(skip_records) <= 20 and err:

                        print(" ⏭️", err)

                else:

                    total_fail += 1

                    if total_fail <= 20 and err:

                        print(" ⚠️", err)



        if shard_w: w_writer.append_df(pd.concat(shard_w, ignore_index=True))

        if shard_m: m_writer.append_df(pd.concat(shard_m, ignore_index=True))

        if shard_y: y_writer.append_df(pd.concat(shard_y, ignore_index=True))

        time.sleep(0.5 + random.random()*0.5)



    w_writer.close(); m_writer.close(); y_writer.close()

    print(f"⏱️ 批次耗時:{time.time() - t0:.1f} 秒")



    if skip_records:

        qa_dir = Path(f"{DRIVE_BASE}/{market_key}/_qa")

        qa_dir.mkdir(parents=True, exist_ok=True)

        pd.DataFrame(skip_records).to_csv(qa_dir / "skip_pingpong.csv", index=False, encoding='utf-8-sig')

        print(f"🧹 已輸出略過名單:{qa_dir / 'skip_pingpong.csv'}({len(skip_records)} 檔)")



    market_abbr = MARKET_ABBR_MAP.get(market_key, market_key.replace('-share','').upper())

    final_week  = f"{DRIVE_BASE}/{market_key}/weekK_{market_abbr}.parquet"

    final_month = f"{DRIVE_BASE}/{market_key}/monthK_{market_abbr}.parquet"

    final_year  = f"{DRIVE_BASE}/{market_key}/yearK_{market_abbr}.parquet"



    if WRITE_MODE == 'A':

        for src, dst in [(out_week, final_week), (out_month, final_month), (out_year, final_year)]:

            if Path(src).exists():

                Path(dst).parent.mkdir(parents=True, exist_ok=True)

                Path(dst).unlink(missing_ok=True)

                Path(src).replace(dst)

        print("📤 A 模式:已將本機 Parquet 移動到 Google Drive")



    elif WRITE_MODE == 'B':

        for src, dst in [(out_week, final_week), (out_month, final_month), (out_year, final_year)]:

            if Path(src).exists():

                Path(dst).parent.mkdir(parents=True, exist_ok=True)

                copy2(str(src), str(dst))

        print("📤 B 模式:已將本機 Parquet 複製到 Google Drive(本機仍保留)")



    else:   # WRITE_MODE == 'C'

        final_week, final_month, final_year = out_week, out_month, out_year

        print("☁️ C 模式:直接寫入 Google Drive")



    return final_week, final_month, final_year



# --------------------------------------------------------------------------------

# Colab Cell 4: QA 稽核函式 (Part 4) - QA 僅作為定義,不會被呼叫

# --------------------------------------------------------------------------------



def _geom_chain_return(day_df: pd.DataFrame, start, end) -> float:

    # ... (函式內容保持不變) ...

    sub = day_df[(day_df['日期'] >= start) & (day_df['日期'] <= end)].copy().sort_values('日期')

    c = sub['收盤'].to_numpy(dtype='float64')

    if len(c) < 2 or not np.all(np.isfinite(c)) or np.any(c <= 0):

        return np.nan

    return float(np.prod(c[1:] / c[:-1]) - 1.0)



def _week_bins():

    # ... (函式內容保持不變) ...

    edges = [-1000, -200, -100, -90, -80, -70, -60, -50, -40, -30, -20, -10, 0,

             10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 1000, 10000]

    labels = [f"{edges[i]}~{edges[i+1]}%" for i in range(len(edges)-1)]

    return edges, labels



def audit_weekly_parquet(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_vs_daily(market_key: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_high_consistency(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_high_tail(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_nan_filter(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_monthly_yearly_filter(market_key: str, month_path: str, year_path: str):

    # 此函式不會被呼叫

    pass



# --------------------------------------------------------------------------------

# Colab Cell 5: 主流程執行 (Part 5) - 已移除 QA 呼叫

# --------------------------------------------------------------------------------



if __name__ == "__main__":

    print(f"⏳ 僅處理期間:{DATE_START_STR} ~ {DATE_END_STR}(含起始緩衝 {PAD_DAYS} 天)")

    print(f"⚠️ 月K/年K 報酬過濾已啟用:小於 {MIN_DAYS_MONTH} / {MIN_DAYS_YEAR} 交易日,或遇停牌/極端跳空報酬,該期報酬將被設為 NaN。")

    print(f"🛠️ [時區修正已啟用] 修正月份邊界錯誤(如 9/1 誤算進 8 月)。")



    for MK in MARKET_LIST:

        print(f"\n{'='*20} 處理市場:{MK}(W-FRI + 玩股口徑 + 本地快取 + 僅核心清洗) {'='*20}")



        # 1. 生成新的 Parquet 檔案 (核心轉換)

        w, m, y = build_wmy_parquets(MK)



        if w:

            print(f"\n✅ 市場 {MK} 核心轉換成功!Parquet 檔案已輸出至 Google Drive (B 模式)。")

           

            # --- QA 呼叫已移除以節省時間 ---

            # audit_weekly_parquet(MK, w)

            # audit_weekly_vs_daily(MK)

            # ... (所有 audit_ 呼叫都已刪除)

            # --------------------------------



        qa_dir = Path(f"{DRIVE_BASE}/{MK}/_qa")

        if qa_dir.exists():

            print("\n🌟 核心 QA 報表輸出確認 (僅 Pingpong 略過清單):")

            # 僅確認 skip_pingpong.csv 是否存在

            f = qa_dir / "skip_pingpong.csv"

            status = "✅" if f.exists() and f.stat().st_size > 0 else ("✅ (空表)" if f.exists() else "❌")

            print(f"   {status} skip_pingpong.csv")

           

        print(f"\n🗃️ 本地日K快取保留:/content/_wmy_tmp/dayK_cache")
留言
avatar-img
《炒股不看周月年K漲幅機率就是耍流氓》
12會員
278內容數
普通上班族,用 AI 與 Python 將炒股量化。我的數據宣言是:《炒股不做量化,都是在耍流氓》。
2025/11/09
本模組負責將各市場的日K資料轉換為週K(W-FRI)、月K、年K格式,並進行一系列清洗與報酬計算。整體流程已涵蓋: 極端報酬過濾(跳空、燭身、最高/最低) ghost 段偵測與復牌標記 停牌後復牌的週期性跳空處理 薄樣本與極端報酬的 QA 標記 pingpong 模式偵測(疑似除權錯位
2025/11/09
本模組負責將各市場的日K資料轉換為週K(W-FRI)、月K、年K格式,並進行一系列清洗與報酬計算。整體流程已涵蓋: 極端報酬過濾(跳空、燭身、最高/最低) ghost 段偵測與復牌標記 停牌後復牌的週期性跳空處理 薄樣本與極端報酬的 QA 標記 pingpong 模式偵測(疑似除權錯位
2025/11/08
# [已修改] 函式名稱: audit_weekly_high_consistency def audit_weekly_high_consistency(market_key: str, week_path: str):     """     [🌟 修正路徑] QA 稽核路徑:改為讀
2025/11/08
# [已修改] 函式名稱: audit_weekly_high_consistency def audit_weekly_high_consistency(market_key: str, week_path: str):     """     [🌟 修正路徑] QA 稽核路徑:改為讀
2025/11/08
# -------------------------------------------------------------------------------- # Colab Cell 4: QA 稽核函式 (Part 4) # ----------------------------
2025/11/08
# -------------------------------------------------------------------------------- # Colab Cell 4: QA 稽核函式 (Part 4) # ----------------------------
看更多
你可能也想看
Thumbnail
vocus 慶祝推出 App,舉辦 2026 全站慶。推出精選內容與數位商品折扣,訂單免費與紅包抽獎、新註冊會員專屬活動、Boba Boost 贊助抽紅包,以及全站徵文,並邀請你一起來回顧過去的一年, vocus 與創作者共同留下了哪些精彩創作。
Thumbnail
vocus 慶祝推出 App,舉辦 2026 全站慶。推出精選內容與數位商品折扣,訂單免費與紅包抽獎、新註冊會員專屬活動、Boba Boost 贊助抽紅包,以及全站徵文,並邀請你一起來回顧過去的一年, vocus 與創作者共同留下了哪些精彩創作。
Thumbnail
髒數據是 AI 最大敵人!本單元教你用 Pandas 快速搞定缺失值、異常值、格式錯誤與重複資料,循序示範 dropna、fillna、IQR、astype 等技巧,確保資料純淨,模型準確度瞬間飆升。學完立即用乾淨數據武裝你的機器學習專案,讓 AI 更聰明、更可靠!
Thumbnail
髒數據是 AI 最大敵人!本單元教你用 Pandas 快速搞定缺失值、異常值、格式錯誤與重複資料,循序示範 dropna、fillna、IQR、astype 等技巧,確保資料純淨,模型準確度瞬間飆升。學完立即用乾淨數據武裝你的機器學習專案,讓 AI 更聰明、更可靠!
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 整理目前手上有的素材: AI說書 - 從0開始 - 338 | Embedding Based Search 資料集描述 AI說書 - 從0開始 - 339 | E
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 整理目前手上有的素材: AI說書 - 從0開始 - 338 | Embedding Based Search 資料集描述 AI說書 - 從0開始 - 339 | E
Thumbnail
在當今快速變化的數位時代,企業面臨著前所未有的數據處理需求。為了應對這些挑戰,企業紛紛建立自己的大型語言模型(LLM),利用大量數據進行訓練,讓模型能夠理解並生成自然語言,從而實現人機協作,優化業務流程並提升客戶體驗。然而,資料清理在這個過程中顯得至關重要。
Thumbnail
在當今快速變化的數位時代,企業面臨著前所未有的數據處理需求。為了應對這些挑戰,企業紛紛建立自己的大型語言模型(LLM),利用大量數據進行訓練,讓模型能夠理解並生成自然語言,從而實現人機協作,優化業務流程並提升客戶體驗。然而,資料清理在這個過程中顯得至關重要。
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 整理目前手上有的素材: AI說書 - 從0開始 - 180 | RoBERTa 預訓練前言:RoBERTa 預訓練前言 AI說書 - 從0開始 - 181 | 預訓
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 整理目前手上有的素材: AI說書 - 從0開始 - 180 | RoBERTa 預訓練前言:RoBERTa 預訓練前言 AI說書 - 從0開始 - 181 | 預訓
Thumbnail
在當今快速變化的數位時代,企業面臨著前所未有的數據處理需求。為了應對這些挑戰,企業紛紛建立自己的大型語言模型(LLM),利用大量數據進行訓練,讓模型能夠理解並生成自然語言,從而實現人機協作,優化業務流程並提升客戶體驗。
Thumbnail
在當今快速變化的數位時代,企業面臨著前所未有的數據處理需求。為了應對這些挑戰,企業紛紛建立自己的大型語言模型(LLM),利用大量數據進行訓練,讓模型能夠理解並生成自然語言,從而實現人機協作,優化業務流程並提升客戶體驗。
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 總結一下目前有的素材: AI說書 - 從0開始 - 103:資料集載入 AI說書 - 從0開始 - 104:定義資料清洗的函數 AI說書 - 從0開始 - 105
Thumbnail
我想要一天分享一點「LLM從底層堆疊的技術」,並且每篇文章長度控制在三分鐘以內,讓大家不會壓力太大,但是又能夠每天成長一點。 總結一下目前有的素材: AI說書 - 從0開始 - 103:資料集載入 AI說書 - 從0開始 - 104:定義資料清洗的函數 AI說書 - 從0開始 - 105
Thumbnail
繼「【🔒 Python實戰營 - Data Science 必修班】Pandas 資料清洗技 - 填補式」之後,我們已經學會怎麼填補空缺資料了,那這個章節我們來教您如何對某些欄位有條件的整形,有時候我們的資料來源某些欄位資料格式不一,甚至型態都不是正規統一的值,此時我們就需要針對這些值進行一些處理
Thumbnail
繼「【🔒 Python實戰營 - Data Science 必修班】Pandas 資料清洗技 - 填補式」之後,我們已經學會怎麼填補空缺資料了,那這個章節我們來教您如何對某些欄位有條件的整形,有時候我們的資料來源某些欄位資料格式不一,甚至型態都不是正規統一的值,此時我們就需要針對這些值進行一些處理
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News