-
Notifications
You must be signed in to change notification settings - Fork 0
/
support_functions.py
102 lines (81 loc) · 3.49 KB
/
support_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import numpy as np
import scipy.signal as ssi
from scipy.optimize import minimize
from scipy.signal import find_peaks, fftconvolve
def corr_fun(X, Y, dt, norm=True, biased=True, method="auto"):
"""
Estimates the correlation function between X and Y using ssi.correlate.
For now, we require both signals to be of equal length.
Input:
X: First array to be correlated. ............................. (Nx1) np.array
Y: Second array to be correlated. ............................ (Nx1) np.array
dt: Time step of the time series. ............................ float
norm: Normalizes the correlation function to a maxima of 1 ... bool
biased: Trigger estimator biasing. ........................... bool
method: 'direct', 'fft' or 'auto'. Passed to ssi.correlate ... string
For biased=True, the result is divided by X.size.
For biased=False, the estimator is unbiased and returns the result
divided by X.size-|k|, where k is the lag.
The unbiased estimator diverges for large lags, and
for small lags and large X.size, the difference is trivial.
"""
assert X.size == Y.size
if norm:
Xn = (X - X.mean()) / X.std()
Yn = (Y - Y.mean()) / Y.std()
else:
Xn = X
Yn = Y
R = ssi.correlate(Xn, Yn, mode="full", method=method)
k = np.arange(-(X.size - 1), X.size)
tb = k * dt
if biased:
R /= X.size
else:
R /= X.size - np.abs(k)
return tb, R
def sample_asymm_laplace(alpha=1.0, kappa=0.5, size=None, seed=None):
"""
Use:
sample_asymm_laplace(alpha=1., kappa=0.5, size=None)
Random samples drawn from the asymmetric Laplace distribution
using inverse sampling. The distribution is given by
F(A;alpha,kappa) = 1-(1-kappa)*Exp(-A/(2*alpha*(1-kappa))), A>0
kappa*Exp[A/(2*alpha*kappa)], A<0
where F is the CDF of A, alpha is a scale parameter and
kappa is the asymmetry parameter.
Input:
alpha: scale parameter. ......................... float, alpha>0
kappa: shape (asymmetry) parameter .............. float, 0<=kappa<=1
size: number of points to draw. 1 by default. ... int, size>0
seed: specify a random seed. .................... int
Output:
X: Array of randomly distributed values. ........ (size,) np array
"""
import numpy as np
assert alpha > 0.0
assert (kappa >= 0.0) & (kappa <= 1.0)
if size:
assert size > 0
prng = np.random.RandomState(seed=seed)
U = prng.uniform(size=size)
X = np.zeros(size)
X[U > kappa] = -2 * alpha * (1 - kappa) * np.log((1 - U[U > kappa]) / (1 - kappa))
X[U < kappa] = 2 * alpha * kappa * np.log(U[U < kappa] / kappa)
return X
def create_fit(dt, normalized_data, T, td, lam=0.5, distance=200):
"""calculates fit for K time series"""
kernrad = 2**18
time_kern = np.arange(-kernrad, kernrad + 1) * dt
peak_loc = find_peaks(normalized_data, height=1.0, distance=distance)[0]
forcing = np.zeros(T.size)
forcing[peak_loc] = normalized_data[peak_loc]
def double_exp(tkern, lam, td):
kern = np.zeros(tkern.size)
kern[tkern < 0] = np.exp(tkern[tkern < 0] / lam / td)
kern[tkern >= 0] = np.exp(-tkern[tkern >= 0] / (1 - lam) / td)
return kern
kern = double_exp(time_kern, lam, td)
time_series_fit = fftconvolve(forcing, kern, "same")
time_series_fit = (time_series_fit - time_series_fit.mean()) / time_series_fit.std()
return time_series_fit