-
Notifications
You must be signed in to change notification settings - Fork 0
/
priors_atm.py
282 lines (219 loc) · 8.07 KB
/
priors_atm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
""" Main module for MCMC priors
All priors are objects which should expose:
- A `params` attribute, which holds the list of parameter names that this prior handles
- A `ln_prior()` method, which evaluates the log-prior on a given dictionary of parameter values
- A `rvs()` method, which draws a random variate sample according to the prior (returns a dict)
"""
import numpy as np
from scipy import stats
class FullPrior(object):
def __init__(self, priors):
self.priors = priors
all_params = tuple(sum([prior.params for prior in priors], []))
assert len(all_params) == len(
set(all_params)
), "sub-priors of FullPrior should not have overlapping parameters"
self.params = all_params
def ln_prior(self, param_dict):
ln_p = 0.0
for prior in self.priors:
ln_p_this_prior = prior.ln_prior(param_dict)
if np.isfinite(ln_p_this_prior):
ln_p += ln_p_this_prior
else:
return -np.inf
return ln_p
def rvs(self):
d = {}
for prior in self.priors:
d.update(prior.rvs())
return d
def ln_prior_beta(x, a, b):
if not (0 < x < 1):
return -np.inf
else:
return (a - 1.0) * np.log(x) + (b - 1.0) * np.log(1.0 - x)
class SimplexZonesPrior(object):
params = [
"core_fraction",
"lambda_diffuse",
"lambda_min_layered",
"lambda_max_layered",
]
def __init__(self, a=18.0, b=8.0):
self.a, self.b = a, b
self.beta = stats.beta(a=a, b=b)
def _is_inside_simplex(self, d):
return (
0.0
< d["core_fraction"]
< d["lambda_diffuse"]
< d["lambda_min_layered"]
< d["lambda_max_layered"]
< 1.0
)
def ln_prior(self, param_dict):
if not self._is_inside_simplex(param_dict):
return -np.inf
else:
ln_p_lambda_max = ln_prior_beta(
param_dict["lambda_max_layered"], a=self.a, b=self.b
)
# Careful about volume of simplex: depends on lambda_max_layered
# Because each variable is uniform between 0 and lambda_max_layered
ln_p_simplex_cond_lambda_max = -3.0 * np.log(
param_dict["lambda_max_layered"]
)
return ln_p_lambda_max + ln_p_simplex_cond_lambda_max
def rvs(self):
# First draw lambda_max_layered
lam_max = self.beta.rvs()
# Then get the other ones with rejection sampling
while True:
d = {
"core_fraction": np.random.uniform(low=0, high=lam_max),
"lambda_diffuse": np.random.uniform(low=0, high=lam_max),
"lambda_min_layered": np.random.uniform(low=0, high=lam_max),
"lambda_max_layered": lam_max,
}
if self._is_inside_simplex(d):
return d
class SimplexUniformT_Priors(object):
params = [
"kappa_IR",
"gamma",
"T_int",
"T_eq",
]
def __init__(self, kappa_min=-5.0, kappa_max=-2.0, gamma_min=-2.0,gamma_max=2.0,T_int_min=0.0,T_int_max=500.0, \
T_eq_min=800.0,T_eq_max=2500.0):
self.kappa_min = kappa_min
self.kappa_max = kappa_max
self.gamma_min = gamma_min
self.gamma_max = gamma_max
self.T_int_min = T_int_min
self.T_int_max = T_int_max
self.T_eq_min = T_eq_min
self.T_eq_max = T_eq_max
def _is_inside_simplex(self, d):
return ( self.kappa_min<d["kappa_IR"]<self.kappa_max
and self.gamma_min<d["gamma"]<self.gamma_max
and self.T_int_min<d["T_int"]<self.T_int_max
and self.T_eq_min<d["T_eq"]<self.T_eq_max
)
def ln_prior(self, param_dict):
if self._is_inside_simplex(param_dict):
return 0.0
else:
return -np.inf
def rvs(self):
while True:
kir = np.random.uniform(low=self.kappa_min, high=self.kappa_max)
gamma = np.random.uniform(low=self.gamma_min, high=self.gamma_max)
d = {
"kappa_IR": kir,
"gamma": gamma,
"T_int": np.random.uniform(low=self.T_int_min, high=self.T_int_max),
"T_eq": np.random.uniform(low=self.T_eq_min, high=self.T_eq_max),
}
if self._is_inside_simplex(d):
print(d["T_int"])
return d
class SimplexUniformAbund_Priors(object):
params = [
"MMR_H2O",
# "MMR_CO",
# "MMR_CO2",
]
def __init__(self, H2O_min, H2O_max):
# , CO2_min=-8.0, CO2_max=-2.0,CO_min=-8.0, CO_max=-2.0):
self.H2O_min = H2O_min
self.H2O_max = H2O_max
# self.CO2_min = CO2_min
# self.CO2_max = CO2_max
# self.CO_min = CO_min
# self.CO_max = CO_max
def _is_inside_simplex(self, d):
return (self.H2O_min<d["MMR_H2O"]<self.H2O_max
# and self.CO2_min<d["MMR_CO2"]<self.CO2_max
# and self.CO_min<d["MMR_CO"]<self.CO_max
)
def ln_prior(self, param_dict):
if self._is_inside_simplex(param_dict):
return 0.0
else:
return -np.inf
def rvs(self):
while True:
H2O= np.random.uniform(low=self.H2O_min, high=self.H2O_max)
# CO= np.random.uniform(low=self.CO_min, high=self.CO_max)
# CO2= np.random.uniform(low=self.CO2_min, high=self.CO2_max)
d = {
"MMR_H2O": H2O,
# "MMR_CO": CO,
# "MMR_CO2": CO2,
}
if self._is_inside_simplex(d):
return d
class SimplexUniformTransit_Priors(object):
params = [
"Kp",
"Vsys"
]
def __init__(self, Kp_min,Kp_max, \
Vsys_min,Vsys_max):
self.Kp_min = Kp_min
self.Kp_max = Kp_max
self.Vsys_min = Vsys_min
self.Vsys_max = Vsys_max
def _is_inside_simplex(self, d):
return (self.Kp_min<d["Kp"]<self.Kp_max
and self.Vsys_min<d["Vsys"]<self.Vsys_max
)
def ln_prior(self, param_dict):
if self._is_inside_simplex(param_dict):
return 0.0
else:
return -np.inf
def rvs(self):
while True:
d = {
"Kp": np.random.uniform(low=self.Kp_min, high=self.Kp_max),
"Vsys": np.random.uniform(low=self.Vsys_min, high=self.Vsys_max),
}
if self._is_inside_simplex(d):
return d
class EntropyJumpExpPrior(object):
params = ["delta_S_layered"]
def __init__(self, delta_S_exp_scale=3.0e-4, delta_S_cutoff=3e-1):
self.delta_S_exp_scale = delta_S_exp_scale
self.delta_S_cutoff = delta_S_cutoff
self.expon = stats.expon(scale=self.delta_S_exp_scale)
def ln_prior(self, param_dict):
if not (0.0 < param_dict["delta_S_layered"] < self.delta_S_cutoff):
return -np.inf
else:
return -param_dict["delta_S_layered"] / self.delta_S_exp_scale
def rvs(self):
while True:
delta_S = self.expon.rvs()
if delta_S < self.delta_S_cutoff:
return {"delta_S_layered": delta_S}
class DifferentialRotationPrior(object):
params = ["omega_slope_l0", "omega_slope_surface", "omega_surface"]
def __init__(self, omega_surface_absmax=0.2):
assert omega_surface_absmax > 0.0
self.omega_surface_absmax = omega_surface_absmax
def ln_prior(self, param_dict):
if not (np.abs(param_dict["omega_surface"]) < self.omega_surface_absmax):
return -np.inf
else:
return 0.0
def rvs(self):
return {
"omega_surface": np.random.uniform(
low=-self.omega_surface_absmax, high=self.omega_surface_absmax
),
"omega_slope_l0": np.random.normal(),
"omega_slope_surface": np.random.normal(),
}