-
Notifications
You must be signed in to change notification settings - Fork 0
/
timescale_development_hf.py
103 lines (86 loc) · 3.42 KB
/
timescale_development_hf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from timescales.autoreg import compute_ar_spectrum
from neurodsp.spectral import compute_spectrum
import numpy as np
import warnings
warnings.filterwarnings('once')
warnings.filterwarnings("ignore", category=DeprecationWarning)
def get_dpdf(time_object, day_diff):
""" Converts data time object into days post differentiation int value"""
duration = time_object - day_diff
duration_in_s = duration.total_seconds()
# days post differentiation in days
days = duration.days
dpdf = int(divmod(duration_in_s, 86400)[0]) # Seconds in a day = 86400
# add 56 days (8 weeks) to calculated day - first recording was done at 8 weeks
# post differentiation
dpdf = dpdf + 56
return dpdf
def bin_spikes(spike_times, bin_size, bins_per_window=None, fs=20000, n_recording_bins=None):
# Convert from ms to samples
bin_size = round((bin_size / 1000) * fs)
samples = (spike_times / 1000 * fs).astype(int)
# Init spike counts
if n_recording_bins is None:
spikes = np.zeros(samples[-1] // bin_size, dtype=int)
else:
spikes = np.zeros(n_recording_bins, dtype=int)
# Bin spikes
bin_ind = 0
for i in samples:
bin_ind = i // bin_size
if bin_ind > len(spikes)-1:
break
spikes[bin_ind] += 1
# Create windows from binned counts
if bins_per_window is not None:
max_bin = int((len(spikes) // bins_per_window) * bins_per_window)
spikes = spikes[:max_bin]
spikes = spikes.reshape(-1, bins_per_window)
return spikes
# def trial_average_spectrum_welch(trial_data, fs, f_range=None):
# power_all = []
# for trial in trial_data:
# print(np.array(trial))
# freq, power = compute_spectrum(
# np.array(trial), fs, method='welch', f_range=f_range)
# power_all.append(power)
# power_all_np = np.array(power_all)
# power_avg = np.mean(power_all_np, axis=0)
# return freq, power_avg
def trial_average_spectrum_welch(trial_data, fs, f_range=None):
trial_data_np = np.array(trial_data)
freq, power = compute_spectrum(
trial_data_np, fs, method='welch', avg_type='median', f_range=f_range)
power_avg = np.nanmean(power, axis=0)
if np.any(np.isnan(power_avg)):
print(power_avg)
breakme
return freq, power_avg
# def trial_average_spectrum_welch(trial_data, fs, f_range=None):
# trial_data_np = np.array(trial_data)
# if trial_data_np.size > 0:
# freq, power = compute_spectrum(
# trial_data_np, fs, method='welch', avg_type='median', f_range=f_range)
# power_avg = np.mean(power, axis=0)
# else:
# freq,power_avg = np.array(np.nan),np.array(np.nan)
# return freq, power_avg
def compute_ar_spectrum_(spikes, fs, ar_order, f_range=None):
"""Prevent annoying warnings."""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
freqs, powers = compute_ar_spectrum(
spikes, fs, ar_order, f_range=f_range)
return freqs, powers
def trial_average_spectrum_ar(trial_data, fs, ar_order, f_range=None):
power_all = []
for trial in trial_data:
freq, power = compute_ar_spectrum_(
trial, fs, ar_order, f_range=f_range)
power_all.append(power)
power_all_np = np.array(power_all)
power_avg = np.nanmean(power_all_np, axis=0)
if np.any(np.isnan(power_avg)):
print(power_avg)
breakme
return freq, power_avg