-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulation.py
More file actions
174 lines (143 loc) · 5.95 KB
/
Copy pathsimulation.py
File metadata and controls
174 lines (143 loc) · 5.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
simulation.py (v2)
===================
요인모형 copula로 상관 부도를 생성하고, 분기별 폭포구조를 경로마다 돌려
트랜치 손실분포를 추정한다. v2 추가: t-copula, 분기 장벽, 재투자 연동.
잠재변수 (2-요인)
-----------------
G_i = sqrt(rho_sys)*M_glob + sqrt(rho_ind)*M_ind(i) + sqrt(1-rho_sys-rho_ind)*Z_i
- Gaussian copula: A_i = G_i, marginal N(0,1)
- t-copula: A_i = G_i / sqrt(W/nu), W~chi2(nu) (경로 공통), marginal t_nu
→ 공통 W가 꼬리 의존성(동시 극단부도)을 만든다.
부도 장벽 (분기)
----------------
누적부도율 F(tau) = 1 - (1-p_annual*mult)^(tau/ppy), tau=1..Q
Gaussian: c = Phi^{-1}(F)
t: c = t_nu^{-1}(F)
부도분기 = min{ tau : A_i < c_tau }
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List
import numpy as np
from scipy import stats
import config as C
import waterfall as wf
def factor_loadings():
rho_sys = C.CORR.inter_industry
rho_ind = C.CORR.intra_industry - rho_sys
if rho_ind < 0:
raise ValueError("intra<inter")
residual = 1.0 - rho_sys - rho_ind
if residual <= 0:
raise ValueError("corr sum>1")
return rho_sys, rho_ind, residual
def beta_params(mean, std):
var = std ** 2
max_var = mean * (1 - mean)
if var >= max_var:
var = 0.99 * max_var
common = mean * (1 - mean) / var - 1
return mean * common, (1 - mean) * common
def default_barriers(pd_annual, n_periods, ppy, pd_multiplier=1.0,
copula="gaussian", t_df=5.0):
"""(n, Q) 분기 부도장벽."""
dt = 1.0 / ppy
p = np.clip(pd_annual * pd_multiplier, 1e-6, 0.999) # (n,)
tau = np.arange(1, n_periods + 1) # (Q,)
years = tau * dt
cum_pd = 1.0 - np.power(1.0 - p[:, None], years[None, :]) # (n, Q)
if copula == "t":
return stats.t.ppf(cum_pd, df=t_df)
return stats.norm.ppf(cum_pd)
def asset_to_default_period(A, barriers):
below = A[:, None] < barriers
has = below.any(axis=1)
first = np.argmax(below, axis=1) + 1
return np.where(has, first, 0).astype(int)
@dataclass
class SimResult:
tranche_names: List[str]
tranche_ratings: List[str]
loss_matrix: np.ndarray
scenario: str
n_paths: int
copula: str = "gaussian"
def expected_loss(self):
return {n: float(self.loss_matrix[:, j].mean())
for j, n in enumerate(self.tranche_names)}
def prob_loss(self):
return {n: float((self.loss_matrix[:, j] > 1e-9).mean())
for j, n in enumerate(self.tranche_names)}
def percentile(self, q):
return {n: float(np.percentile(self.loss_matrix[:, j], q))
for j, n in enumerate(self.tranche_names)}
def run_simulation(df, scenario="Base", n_paths=None, seed=None,
copula=None, enable_coverage_tests=True,
enable_reinvest=True):
if n_paths is None:
n_paths = C.SIM.n_paths
if seed is None:
seed = C.SIM.seed
if copula is None:
copula = C.SIM.copula
ppy = C.SIM.periods_per_year
Q = C.SIM.n_periods
reinv_q = C.SIM.reinvest_periods
t_df = C.SIM.t_df
rng = np.random.default_rng(seed)
par = df["par"].values.astype(float)
spread = df["spread_bps"].values.astype(float)
pd_annual = df["pd_annual"].values.astype(float)
ind_id = df["industry_id"].values.astype(int)
n = len(df)
n_ind = int(ind_id.max()) + 1
base = C.POOL.base_rate
coupon_rate = base + spread / 1e4
pool_avg_pd = float(np.average(pd_annual, weights=par))
pool_avg_coupon = float(np.average(coupon_rate, weights=par))
rho_sys, rho_ind, residual = factor_loadings()
b_sys, b_ind, b_res = np.sqrt(rho_sys), np.sqrt(rho_ind), np.sqrt(residual)
mult = C.SIM.scenarios.get(scenario, 1.0)
barriers = default_barriers(pd_annual, Q, ppy, mult, copula=copula, t_df=t_df)
a_rec, b_rec = beta_params(C.RECOVERY.mean, C.RECOVERY.std)
states0 = wf.build_tranche_states(par.sum())
tranche_names = [s.name for s in states0]
tranche_ratings = [s.rating for s in states0]
loss_matrix = np.zeros((n_paths, len(tranche_names)))
for p in range(n_paths):
M_glob = rng.standard_normal()
M_ind = rng.standard_normal(n_ind)
Z = rng.standard_normal(n)
G = b_sys * M_glob + b_ind * M_ind[ind_id] + b_res * Z
if copula == "t":
W = rng.chisquare(t_df)
A = G / np.sqrt(W / t_df)
else:
A = G
dq = asset_to_default_period(A, barriers)
rec = rng.beta(a_rec, b_rec, size=n)
res = wf.run_waterfall(par, coupon_rate, dq, rec,
M_glob=M_glob, pool_avg_pd=pool_avg_pd,
pool_avg_coupon=pool_avg_coupon,
pd_multiplier=mult, horizon=Q,
reinvest_periods=reinv_q, periods_per_year=ppy,
enable_coverage_tests=enable_coverage_tests,
enable_reinvest=enable_reinvest)
for j, s in enumerate(res.tranche_states):
loss_matrix[p, j] = s.loss_pct
return SimResult(tranche_names, tranche_ratings, loss_matrix, scenario, n_paths, copula)
if __name__ == "__main__":
import time, loan_pool as lp
df = lp.generate_loan_pool()
rho_sys, rho_ind, residual = factor_loadings()
print(f"적재: rho_sys={rho_sys:.3f} rho_ind={rho_ind:.3f} residual={residual:.3f}")
print(f"분기수 Q={C.SIM.n_periods}, 재투자분기={C.SIM.reinvest_periods}")
for cop in ["gaussian", "t"]:
t0 = time.time()
r = run_simulation(df, "Base", n_paths=500, seed=1, copula=cop)
dt = time.time() - t0
el = r.expected_loss()
print(f"\n[{cop}] 500경로 {dt:.2f}s → 10k 추정 {dt*20:.0f}s")
for nm in r.tranche_names:
print(f" {nm:16s} EL {el[nm]:7.3%}")