-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline.py
More file actions
77 lines (69 loc) · 3.2 KB
/
Copy pathpipeline.py
File metadata and controls
77 lines (69 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# pipeline.py — FINAL, BULLETPROOF
import polars as pl
import os
import numpy as np
from scipy import stats
import pandas as pd # For temp conversions
dfs = []
for file in os.listdir("data/raw"):
if file.endswith(".csv"):
path = f"data/raw/{file}"
df = pl.read_csv(path, infer_schema_length=0)
date_col = next((c for c in df.columns if "date" in c.lower()), None)
numeric_cols = [c for c in df.columns if any(x in c.lower() for x in ["price", "open", "high", "low", "vol", "change"])]
if date_col and numeric_cols:
select_exprs = [
pl.col(date_col).str.to_date().alias("Date"),
pl.lit(file.split(".")[0].replace("_", " ").title()).alias("Asset")
]
for col in numeric_cols:
clean_col = (
pl.col(col)
.str.replace_all(",", "")
.str.replace_all("%", "")
.cast(pl.Float64, strict=False)
.fill_null(0) # Fill nulls with 0
.alias(col)
)
select_exprs.append(clean_col)
clean = df.select(select_exprs)
dfs.append(clean)
# Combine
data = pl.concat(dfs).sort("Date")
os.makedirs("data", exist_ok=True)
data.write_csv("data/combined_commodities.csv")
print(f"DONE: {len(data)} rows from {len(dfs)} files → data/combined_commodities.csv")
# ————————————————————————
# DAY 1 BLOOMBERG UPGRADES (DQ + Anomaly Detection)
# ————————————————————————
print("\nRunning Day-1 Bloomberg checks (DQ + Anomaly Detection)")
# Convert to pandas only once
data_pd = data.to_pandas()
# 1. SIMPLE DQ: price > 0 (NO Great Expectations — avoids typing bug)
price_col = next((c for c in data_pd.columns if "price" in c.lower() or "close" in c.lower()), None)
if price_col:
invalid = data_pd[price_col] <= 0
print(f"Data Quality (price > 0): {'PASS' if not invalid.any() else 'FAIL'}")
if invalid.any():
print(f" → Removing {invalid.sum()} invalid rows with {price_col} <= 0")
data_pd = data_pd[~invalid]
else:
print("No price/close column found for DQ — skipping")
# 2. Z-Score Anomaly Detection (3σ on log-price) — unchanged & perfect
if price_col:
log_prices = np.log(data_pd[price_col] + 1e-8)
z_scores = np.abs(stats.zscore(log_prices, nan_policy="omit"))
data_pd["is_anomaly"] = z_scores > 3
anomalies = data_pd[data_pd["is_anomaly"]][["Date", "Asset", price_col]].round(4)
print(f"Z-Score Anomaly Detection: {len(anomalies)} anomalies found (3σ rule)")
if len(anomalies) > 0:
print(anomalies)
else:
anomalies = pd.DataFrame()
print("No price column found for anomaly detection")
# 3. Final save (CSV instead of Parquet — zero deps, full compatibility)
final = pl.from_pandas(data_pd.drop(columns=["is_anomaly"], errors="ignore"))
final.write_csv("data/combined_commodities_clean.csv") # Changed to CSV — no fastparquet needed
print(f"\nPipeline complete → {len(final)} clean rows saved to CSV")
print(f"→ {len(anomalies)} anomalies flagged")
print("DAY 1 100% DONE — Bloomberg-ready")