forked from kstueb/MoraineAge_GaussianMixture
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmoraine_age_calculator.py
343 lines (282 loc) · 12.1 KB
/
moraine_age_calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Code developed by Konstanze Stuebner ([email protected]) and Bodo Bookhagen ([email protected])
#
# Python 3, using some additional python packages. If you have not setup an environment,
# you may want to add the following packages via conda
# conda install scipy numpy scikit-learn lmfit
# or generate a separate environment
import numpy as np
from scipy.stats import norm
from scipy.signal import find_peaks
from sklearn.neighbors import KernelDensity
from sklearn.model_selection import GridSearchCV
from lmfit.models import GaussianModel
import matplotlib.pyplot as plt
VALID_KDE_Weight = ['adf','1/error','1/error**2','none']
VALID_ADF_Sigma = ['error','sqrtError']
class BoulderAges():
"""Age density function as sum of Gaussians.
parameters:
Ages, Errors : 1D array
KDEweight : str ['adf'|'1/error'|'1/error**2'|'none']
KDE can be weighted by the age density function ('adf'), 1/error,
or not weighted at all. If KDEweight='adf' then ADFsigma should be
specified. Default is 'adf'.
ADFsigma : str ['error'|'sqrtError']
The age density function is calculated as sum of Gaussians for each
boulder age, where the width of the Gaussians is specified by
ADFsigma. Default is 'error'.
example:
test = BoulderAges(Ages, Errors)
x = np.arange(0,100)
plt.plot(x,test.ADF(x),'-b',label='Age DF')
plt.plot(Ages,test.ADF(Ages),'.b')
kde = test.KDE()
plt.plot(x,scoreKDE(kde,x),'r',label='KDE, bw={:.0f}'.format(kde.bandwidth))
plt.legend()
"""
def __init__(self, Ages, Errors, *, KDEweight='adf', ADFsigma='sqrtError'):
self.Ages = Ages
self.Errors = Errors
self.KDEweight = KDEweight
self.ADFsigma = ADFsigma
if Ages.shape!=Errors.shape:
raise ValueError("Ages and Errors must be 1D arrays of the same length")
if KDEweight not in VALID_KDE_Weight:
raise ValueError("Invalid KDEweight: {:s}".format(KDEweight))
if ADFsigma not in VALID_ADF_Sigma:
raise ValueError("Invalid ADFsigma: {:s}".format(ADFsigma))
return
def ADF(self, xx):
"""Age density function as sum of Gaussians."""
if self.ADFsigma=='error':
Sigma=self.Errors
elif self.ADFsigma=='sqrtError':
Sigma=np.sqrt(self.Errors)
else:
raise ValueError("Invalid ADFsigma: {:s}".format(ADFsigma))
yy = 0
for Age_i,Sigma_i in zip(self.Ages,Sigma):
yy += norm.pdf(xx,loc=Age_i,scale=Sigma_i)
return yy/len(self.Ages)
def KDE(self, bandwidth=None):
"""Kernel Density Estimation."""
if self.KDEweight=='adf':
Weights = self.ADF(self.Ages)
elif self.KDEweight=='1/error':
Weights = 1/self.Errors
elif self.KDEweight=='1/error**2':
Weights = 1/(self.Errors**2)
elif self.KDEweight=='none':
Weights = None
else:
raise ValueError("Invalid KDEweight: {:s}".format(KDEweight))
if bandwidth is None:
grid = GridSearchCV(KernelDensity(kernel='gaussian'),
{'bandwidth': np.linspace(5., 50., 46)},
cv=len(self.Ages)) #cross-validation
grid.fit(self.Ages.reshape(-1,1), sample_weight=Weights)
kde = grid.best_estimator_
bandwidth = grid.best_params_['bandwidth']
else:
kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth).fit(
self.Ages.reshape(-1,1), sample_weight=Weights)
return kde
def scoreKDE(kde, xx):
"""Values of the kde at positions xx."""
if not isinstance(kde,KernelDensity):
raise TypeError("'kde' must be a KernelDensity")
return np.exp(kde.score_samples(xx.reshape(-1, 1)))
def fit_Gaussians_to_curve(x_data, y_data, peaks):
"""Perform Gaussian curve fitting.
Return: lmfit.model.ModelResult
Note that peaks older than max(x_data) will be ignored.
parameters:
x_data, y_data : 1D arrays of the smoothed boulder age distribution (kde)
peaks: ndarray of approximate peak locations
example:
peaks = get_auto_peaks(x, scoreKDE(kde,x))
result = fit_Gaussians_to_curve(x, scoreKDE(kde,x), peaks=peaks)
"""
x_data = x_data.reshape(-1); y_data = y_data.reshape(-1)
prefix = ['G'+str(i)+'_' for i in range(100)]
prefix = prefix[:len(peaks)]
model = [None]*len(prefix)
for i,p in enumerate(prefix):
model[i] = GaussianModel(prefix=p)
#initialize parameters of first Gaussian component
params = model[0].make_params(center=peaks[0], sigma=1, amplitude=1)
params[prefix[0]+'amplitude'].min = 0.
params[prefix[0]+'center'].min = 0.
params[prefix[0]+'center'].max = max(x_data)
if len(peaks>1): #add parameters of remaining Gaussian components
for i in range(1,len(peaks)):
params += model[i].make_params(center=peaks[i], sigma=1, amplitude=1)
params[prefix[i]+'amplitude'].min = 0.
params[prefix[i]+'center'].min = 0.
params[prefix[i]+'center'].max = max(x_data)
compositeModel = model[0]
for i in range(1,len(peaks)):
compositeModel += model[i]
return compositeModel.fit(y_data, params, x=x_data)
def iterate_Gaussian_fitting(x_data, y_data, nA, verbose=False):
"""Repeat fit_Gaussians_to_curve() with n+1 peaks until the misfit <1%
(see Gaussian_misfit() for definition).
Note that peaks older than max(x_data) will be ignored.
parameters:
x_data, y_data : 1D arrays of the smoothed boulder age distribution (kde)
nA : len(Ages); no more than ceil(sqrt(nA)) peaks are allowed.
verbose : boolean; if 'True' report progress in the terminal
example:
result = iterate_Gaussian_fitting(x, scoreKDE(kde,x), len(Ages))
"""
if verbose:
print('\nIterations:')
peaks = get_auto_peaks(x_data, y_data)
A = 100
i = 0
while A>1: #misfit >1%
i += 1
result = fit_Gaussians_to_curve(x_data, y_data, peaks)
A = Gaussian_misfit(x_data, y_data, result.best_fit)
if verbose:
print('#{:d}'.format(i))
report = Gaussian_fitting_report(result)
for r in report:
print('{:.1f} +/- {:.1f} ky ({:.0f}%)'.format(r['Mu'], r['Sigma'], r['Area']))
print('misfit {:.1f}%\n'.format(A))
peaks = get_peaks_from_lmfitresult(result)
sorted = np.argsort(result.residual)
peaks = np.append(peaks,x_data[sorted[0]])
#if len(peaks)>np.ceil(np.sqrt(nA)):
# if verbose:
# print('max. number of peaks ({:d})'.format(len(peaks)-1))
# break
#remove Gaussians with less than 1% Area and recalculate
peaks = []
report = Gaussian_fitting_report(result)
for r in report:
if r['Area']>1:
peaks = np.append(peaks,r['Mu'])
result = fit_Gaussians_to_curve(x_data, y_data, peaks)
return result
def get_auto_peaks(x_data, y_data):
"""Return age peaks as ndarray."""
peaks,_ = find_peaks(y_data)
return x_data[peaks]
def get_peaks_from_lmfitresult(result):
"""Return age peaks from lmfit.model.ModelResult as ndarray."""
prefix = [r.prefix for r in result.components]
return np.array([result.best_values[p+'center'] for p in prefix])
def Gaussian_misfit(x_data,y_data,y_fit):
"""Estimate misfit between KDE and sum-of-Gaussian model.
Return area of misfit to the KDE in percent.
parameters:
x_data, y_data, y_fit : 1D arrays of KDE (data) and model fit (fit)
example:
A = Gaussian_misfit(x_data, y_data, result.best_fit)
"""
return np.sum(np.abs(y_fit-y_data))/np.sum(y_data)*100
def Gaussian_fitting_report(result):
"""Return list of dictionaries with keys prefix, Mu, Sigma, Apc for each
component in the Gaussian composite model sorted by decending age (Mu).
example:
report = Gaussian_fitting_report(result)
for r in report:
print('{:.1f} +/- {:.1f} ky ({:.0f}%)'.format(r['Mu'], r['Sigma'], r['Area']))
"""
prefix = [r.prefix for r in result.components]
Mu = np.array([result.best_values[p+'center'] for p in prefix])
Sigma = np.array([result.best_values[p+'sigma'] for p in prefix])
sorted = np.flip(np.argsort(Mu))
Mu = Mu[sorted]
Sigma = Sigma[sorted]
prefix = [prefix[i] for i in sorted]
Atotal = np.sum(result.best_fit) #model total area
report = []
for p,m,s in zip(prefix, Mu, Sigma):
Apc = np.sum(result.eval_components()[p])/Atotal*100
report.append({'prefix':p, 'Mu':m, 'Sigma':s, 'Area':Apc})
return report
def plot_lmfit_model(xx, result, misfit, axis=None, thresholdArea=5):
"""Plot the lmfit.model.ModelResult results
parameters:
xx : 1D array
result : output of fit_Gaussians_to_curve() or iterate_Gaussian_fitting()
misfit :
axis : axis object to add the plot to (optional)
thresholdArea : the oldest peak with an area > thresholdArea is the
preferred moraine age and plotted in red
example:
x = np.arange(0,100)
result = iterate_Gaussian_fitting(x, scoreKDE(kde,x), len(Ages))
A = Gaussian_misfit(x, scoreKDE(kde,x), result.best_fit)
plot_lmfit_model(x, result, A)
"""
if axis: ax = axis
else: _,ax = plt.subplots()
#total model:
label = 'Model, misfit {:.1f}%'.format(misfit)
ax.plot(xx, result.best_fit, 'k-', label=label)
report = Gaussian_fitting_report(result)
prefix = [r['prefix'] for r in report] #prefix sorted by decending Mu
#find oldest peak with a an area of >5%
for r in report:
if r['Area']>thresholdArea:
best_p = r['prefix']
break
#Gaussian components:
Atotal = np.sum(result.best_fit) #model total area
Mu = np.empty_like(prefix,dtype=float) #mean of each Gaussian
Sigma = np.empty_like(prefix,dtype=float) #sigma of each Gaussian
Apc = np.empty_like(prefix,dtype=float) #area of each Gaussian in percent
for i, p in enumerate(reversed(prefix)):
Mu[i] = result.best_values[p+'center']
Sigma[i] = result.best_values[p+'sigma']
Apc[i] = np.sum(result.eval_components()[p])/Atotal*100
label = '{:s}: {:.1f}$\pm${:.1f} ky, {:.1f}%'.format(p[:-1], Mu[i], Sigma[i], Apc[i])
if p==best_p:
color='red'
else:
color='grey'
ax.plot(xx, result.eval_components()[p], color=color, label=label)
ax.legend()
return ax
def main():
plt.close('all')
np.random.seed(12345)
#Generating a random age distribution with mean age of 50 and deviation of 15. 8 samples
Ages = np.random.normal(loc=50,scale=15,size=8)
Errors = 0.1 * Ages.reshape(len(Ages)) + np.random.rand(1) * 5
test = BoulderAges(Ages, Errors)
#By default, the KDE is weighted with the age density function (ADF), which is
#the sum of Gaussians computed with Mu=boulder age and Sigma=sqrt(boulder error).
#This weighting can be changed, e.g.:
#test.ADFsigma='error'
#test.KDEweight='none'
_,ax = plt.subplots()
x = np.arange(0,100)
ax.plot(x,test.ADF(x),'-b',label='Age DF')
ax.plot(Ages,test.ADF(Ages),'.r')
kde = test.KDE()
ax.plot(x,scoreKDE(kde,x),'r',label='KDE, bw={:.0f}'.format(kde.bandwidth))
#data points and errorbars at convenient y-scaling
y = np.arange(1,len(Ages)+1)/2/len(Ages)*ax.get_ylim()[1]
ax.plot(Ages, y, 'k+')
ax.errorbar(x=Ages, y=y, xerr=Errors, fmt='none', ecolor='k')
plt.legend()
plt.title('Random test sample, n={:d}'.format(len(Ages)))
plt.xlabel('Age (ky)')
plt.ylabel('Probability')
plt.grid()
result = iterate_Gaussian_fitting(x, scoreKDE(kde,x), len(Ages), verbose=True)
A = Gaussian_misfit(x, scoreKDE(kde,x), result.best_fit)
ax = plot_lmfit_model(x, result, A)
ax.plot(Ages, y, 'k+')
ax.errorbar(x=Ages, y=y, xerr=Errors, fmt='none', ecolor='k')
plt.title('Random test sample, Gaussian fitting')
plt.xlabel('Age (ky)')
plt.ylabel('Probability')
plt.grid()
plt.show()
if __name__ == "__main__":
main()