-
Notifications
You must be signed in to change notification settings - Fork 0
/
stats.py
375 lines (294 loc) · 9.25 KB
/
stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def minmax(arr, axis=None):
return np.nanmin(arr, axis=axis), np.nanmax(arr, axis=axis)
def weighted_generic_moment(x, k, w=None):
x = np.asarray(x, dtype=np.float64)
if w is not None:
w = np.asarray(w, dtype=np.float64)
else:
w = np.ones_like(x)
return np.sum(x ** k * w) / np.sum(w)
def weighted_mean(x, w):
return np.sum(x * w) / np.sum(w)
def weighted_std(x, w):
x = np.asarray(x, dtype=np.float64)
w = np.asarray(w, dtype=np.float64)
SS = np.sum(w * (x - weighted_mean(x, w)) ** 2) / np.sum(w)
# quantile(x, w, 0.5)
return np.sqrt(SS)
def weighted_percentile(x, w, percentile, p=0):
k = np.isfinite(x + w)
clean_x = np.asarray(x[k], dtype=np.float64)
clean_w = np.asarray(w[k], dtype=np.float64)
srt = np.argsort(clean_x)
sorted_w = clean_w[srt]
sorted_x = clean_x[srt]
Sn = np.cumsum(sorted_w)
Pn = (Sn - 0.5 * sorted_w) / Sn[-1]
return np.interp(np.asarray(percentile) / 100.0, Pn, sorted_x)
def weighted_median(x, w):
return weighted_percentile(x, w, 50)
def weighted_mad(x, w, stddev=True):
x = np.asarray(x, dtype=np.float64)
w = np.asarray(w, dtype=np.float64)
if stddev:
return 1.4826 * weighted_median(np.abs(x - weighted_median(x, w)), w)
else:
return weighted_median(np.abs(x - weighted_median(x, w)), w)
def mavg(arr, n=2, axis=-1):
"""
returns the moving average of an array.
returned array is shorter by (n-1)
applied along last axis by default
"""
return np.mean(rolling_window(arr, n), axis=axis)
def mgeo(arr, n=2, axis=-1):
"""rolling geometric mean
Arguments:
arr {no.array} -- array
Keyword Arguments:
n {int} -- window size (default: {2})
axis {int} -- axis to roll over (default: {-1})
Returns:
[type] -- [description]
"""
return stats.gmean(rolling_window(arr, n), axis=axis)
def pdf(values, bins=None, range=None):
"""
** Normalized differential area function. **
(statistical) probability denisty function
normalized so that the integral is 1
and. The integral over a range is the
probability of the value is within
that range.
Returns array of size len(bins)-1
Plot versus bins[:-1]
"""
if hasattr(bins, "__getitem__") and (range is None):
range = (np.nanmin(bins), np.nanmax(bins))
else:
range = None
h, x = np.histogram(values, bins=bins, range=range, density=False)
# From the definition of Pr(x) = dF(x)/dx this
# is the correct form. It returns the correct
# probabilities when tested
pdf = h / (np.sum(h, dtype=float) * np.diff(x))
return pdf, bin_center(x)
def pdf2(values, bins=None, range=None):
"""
N * PDF(x)
The ~ PDF normalized so that
the integral is equal to the
total amount of a quantity.
The integral over a range is the
total amount within that range.
Returns array of size len(bins)-1
Plot versus bins[:-1]
"""
if hasattr(bins, "__getitem__") and (range is None):
range = (np.nanmin(bins), np.nanmax(bins))
else:
range = None
pdf, x = np.histogram(values, bins=bins, range=range, density=False)
pdf = pdf.astype(float) / np.diff(x)
return pdf, bin_center(x)
def edf(data, pdf=False):
y = np.arange(len(data), dtype=float)
x = np.sort(data).astype(float)
return y, x
def cdf(values, bins):
"""
CDF(x)
(statistical) cumulative distribution function
Integral on [-inf, b] is the fraction below b.
CDF is invariant to binning.
This assumes you are using the entire range in the binning.
Returns array of size len(bins)
Plot versus bins[:-1]
"""
if hasattr(bins, "__getitem__"):
range = (np.nanmin(bins), np.nanmax(bins))
else:
range = None
h, bins = np.histogram(values, bins=bins, range=range, density=False) # returns int
# cumulative fraction below bin_k
c = np.cumsum(h / np.sum(h, dtype=float))
# append 0 to beginning because P(X < min(x)) = 0
return np.append(0, c), bins
def cdf2(values, bins):
"""
# # Exclusively for area_function which needs to be unnormalized
(statistical) cumulative distribution function
Value at b is total amount below b.
CDF is invariante to binning
Plot versus bins[:-1]
Not normalized to 1
"""
if hasattr(bins, "__getitem__"):
range = (np.nanmin(bins), np.nanmax(bins))
else:
range = None
h, bins = np.histogram(values, bins=bins, range=range, density=False)
c = np.cumsum(h).astype(float)
return np.append(0.0, c), bins
def area_function(extmap, bins, scale=1):
"""
Complimentary CDF for cdf2 (not normalized to 1)
Value at b is total amount above b.
"""
c, bins = cdf2(extmap, bins)
return scale * (c.max() - c), bins
def diff_area_function(extmap, bins, scale=1):
"""
See pdf2
"""
s, bins = area_function(extmap, bins)
dsdx = -np.diff(s) / np.diff(bins)
return dsdx * scale, bin_center(bins)
def log_diff_area_function(extmap, bins):
"""
See pdf2
"""
s, bins = diff_area_function(extmap, bins)
g = s > 0
dlnsdlnx = np.diff(np.log(s[g])) / np.diff(np.log(bins[g]))
return dlnsdlnx, bin_center(bins[g])
def mass_function(values, bins, scale=1, aktomassd=183):
"""
M(>Ak), mass weighted complimentary cdf
"""
if hasattr(bins, "__getitem__"):
range = (np.nanmin(bins), np.nanmax(bins))
else:
range = None
if scale != 1:
aktomassd = scale
h, bins = np.histogram(
values,
bins=bins,
range=range,
density=False,
weights=values * aktomassd * scale,
)
c = np.cumsum(h).astype(float)
return c.max() - c, bins
def ortho_dist(x, y, m, b):
"""
get the orthogonal distance
from a point to a line
"""
ortho_dist = (y - m * x - b) / np.sqrt(1 + m ** 2)
return ortho_dist
def mad(X, stddev=True, axis=None):
if stddev:
return 1.4826 * np.nanmedian(np.abs(X - np.nanmedian(X, axis=axis)), axis=axis)
else:
return np.nanmedian(np.abs(X - np.nanmedian(X, axis=axis)), axis=axis)
def mean_mad(X, stddev=True, axis=None):
if stddev:
return 1.4826 * np.nanmedian(np.abs(X - np.nanmeam(X, axis=axis)), axis=axis)
else:
return np.nanmedian(np.abs(X - np.nanmean(X, axis=axis)), axis=axis)
def rms(X, axis=None):
return np.sqrt(np.nanmean(X ** 2, axis=axis))
def standardize(X, remove_mean=True, remove_std=True):
if remove_mean:
mean = np.nanmean(X)
else:
mean = 0
if remove_std:
std = np.nanstd(X)
else:
std = 1
return (X - mean) / std
def pdf_pareto(t, a, k, xmax=None):
"""PDF of Pareto distribution
Parameters
----------
t : input
array
a : power-law power (a = alpha-1 from real Pareto)
array
k : minimum value for power law
array
xmax : max value for, optional, by default None
Returns
-------
PDF(t|a,k,xmax)
numpy array
"""
if xmax is None:
out = ((a - 1) / k) * (t / k) ** (-a)
out[(t < k)] = 0
return out
else:
out = ((a - 1) / (k ** (1 - a) - xmax ** (1 - a))) * t ** (-a)
out[(t <= k) | (t > xmax)] = 0
return out
def cdf_pareto(t, a, k, xmax=None):
"""CDF of Pareto distribution
Parameters
----------
t : input
array
a : power-law power (a = alpha-1 from real Pareto)
array
k : minimum value for power law
array
xmax : max value for, optional, by default None
Returns
-------
CDF(t|a,k,xmax)
numpy array
"""
if xmax is None:
out = 1 - (k / t) ** (a - 1)
out[t < k] = 0
return out
else:
out = (1 - (t / k) ** (1 - a)) / (1 - (xmax / k) ** (1 - a))
out[t <= k] = 0
out[t > xmax] = 1
return out
from scipy.spatial.distance import cdist
def mahalanobis(X,X2=None):
"""mahalanobis distance for data
X = np.array([x1,x2,x3,...])
Parameters
----------
X : np.array (M x N)
M x N array, with M varialbes,
and N observations.
print(X) should look like
# [[x1, x2, x3, x4...xn],
# [y1, y2, y3, y4...yn].
# [z1, z2, z3, z4...zn],
# ..]
# as if X = np.array([x, y, z, ...])
Returns
-------
md: np.array
the square of maholanobis distance
it follows a chi2 distribution for normally
distributed data
"""
# let scipy do all the lifting
# but this is a nice way anyways
# C = np.cov(X.T)
# P, D, T = eigen_decomp(C)
# mu = np.mean(X, axis=1)
# X = (X - mu)
# wX = X @ np.linalg.inv(T.T) #whitened data
# md = np.linalg.norm(wX, axis=1)**2 #norm spannign [xi,yi,zi]
# #wXT = np.linalg.inv(T) @ X.T
# #md = wX @ wX.T
# #md = np.sqrt(md.diagonal())
# #md is distributed as chi2 with d.o.f. = # independent axes
if X2 is None:
return cdist(X,np.atleast_2d(X.mean(axis=0)),metric='mahalanobis')[:,0]**2
else:
C = np.cov(X.T)
P, D, T = eigen_decomp(C)
mu = np.mean(X2, axis=1)
wX = (X2-mu) @ np.linalg.inv(T.T)
md = np.linalg.norm(wX, axis=1)** 2
return md