Skip to content

Commit

Permalink
candidate grouping and group model
Browse files Browse the repository at this point in the history
  • Loading branch information
georgievgeorgi committed May 26, 2022
1 parent abc432f commit 5f9ee1e
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 92 deletions.
4 changes: 3 additions & 1 deletion src/ramanchada2/misc/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python3

from .spectrum import * # noqa
from .spectrum import * # noqa
from .peak_candidates import (PeakCandidatesListModel,
PeakCandidateModel)
57 changes: 33 additions & 24 deletions src/ramanchada2/misc/types/spectrum/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import numpy as np
import numpy.typing as npt

from ..pydantic_base_model import PydBaseModel


SpeMetadataFieldTyping = Union[
npt.NDArray, pydantic.StrictBool,
npt.NDArray, PydBaseModel,
pydantic.StrictBool,
pydantic.StrictInt, float,
datetime.datetime,
List[Any], Dict[str, Any],
Expand All @@ -19,42 +22,48 @@
SpeMetadataTyping = Dict[str, SpeMetadataFieldTyping]


class SpeMetaBaseModel(pydantic.BaseModel):
class Config:
arbitrary_types_allowed = True


class SpeMetadataFieldModel(SpeMetaBaseModel):
class SpeMetadataFieldModel(PydBaseModel):
__root__: SpeMetadataFieldTyping

@pydantic.validator('__root__', pre=True)
def pre_validate(cls, val):
if isinstance(val, np.ndarray):
return val
elif (isinstance(val, str) and (
val.startswith('[') and val.endswith(']') or
val.startswith('{') and val.endswith('}'))):
return json.loads(val.replace("'", '"'))
else:
return val

if isinstance(val, str):
if val.startswith('ramanchada2_model@'):
# The format is:
# ramanchada2_model@ModelName#<DATA>
pos_at = val.index('@')
pos_hash = val.index('#')
model_name = val[pos_at+1:pos_hash]
from ramanchada2.misc import types
model = getattr(types, model_name)
return model.validate(val[pos_hash+1:])
if(val.startswith('[') and val.endswith(']') or
val.startswith('{') and val.endswith('}')):
return json.loads(val.replace("'", '"').replace(r'b"', '"'))
return val

class SpeMetadataModel(SpeMetaBaseModel):
def serialize(self):
if isinstance(self.__root__, list) or isinstance(self.__root__, dict):
return json.dumps(self.__root__)
if isinstance(self.__root__, PydBaseModel):
return f'ramanchada2_model@{type(self.__root__).__name__}#' + self.json()
if isinstance(self.__root__, datetime.datetime):
return self.__root__.isoformat()
if isinstance(self.__root__, PydBaseModel):
return self.__root__.serialize()
return self.__root__


class SpeMetadataModel(PydBaseModel):
__root__: Dict[str, SpeMetadataFieldModel]

def __str__(self):
return str(self.serialize())

def serialize(self):
ret = dict()
for key, val in self.__root__.items():
if isinstance(val.__root__, list) or isinstance(val.__root__, dict):
ret.update({key: json.dumps(val.__root__)})
elif isinstance(val.__root__, datetime.datetime):
ret.update({key: val.__root__.isoformat()})
else:
ret.update({key: val.__root__})
return ret
return {k: v.serialize() for k, v in self.__root__.items()}

def __getitem__(self, key: str) -> SpeMetadataFieldTyping:
return self.__root__[key].__root__
Expand Down
34 changes: 21 additions & 13 deletions src/ramanchada2/spectrum/peaks/find_peaks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
#!/usr/bin/env python3

import numpy as np
from typing import Union, Tuple, List
from scipy import signal
from pydantic import validate_arguments
from pydantic import validate_arguments, Field

from ..spectrum import Spectrum
from ramanchada2.misc.spectrum_deco import (add_spectrum_method,
add_spectrum_filter)
from ramanchada2.misc.types import PeakCandidatesListModel


@add_spectrum_method
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def find_peaks(
spe: Spectrum, /,
prominence: float = 1e-2,
width: int = 1):
wlen=None,
width: Union[int, Tuple[int, int]] = 1
) -> PeakCandidatesListModel:
"""
Find peaks in spectrum.
Expand All @@ -30,16 +33,21 @@ def find_peaks(
_type_
_description_
"""
loc, data = signal.find_peaks(spe.y, prominence=prominence, width=width)
w = data['widths']
bounds = np.stack((data['left_ips'] - 2*w,
data['right_ips'] + 2*w
), axis=-1).astype(int)
ampl = data['prominences']
return dict(amplitudes=ampl,
locations=loc,
widths=w,
bounds=bounds)
res = signal.find_peaks(spe.y, prominence=prominence, width=width, wlen=wlen)
return PeakCandidatesListModel.from_find_peaks(res)


@add_spectrum_method
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def find_peak_groups(
spe: Spectrum, /,
prominence: float = 1e-2,
wlen=None,
width: Union[int, Tuple[int, int]] = 1,
n_sigma_group: float = Field(5, ge=0)
) -> List[PeakCandidatesListModel]:
res = signal.find_peaks(spe.y, prominence=prominence, width=width, wlen=wlen)
return PeakCandidatesListModel.from_find_peaks(res).group_neighbours(n_sigma=n_sigma_group)


@add_spectrum_filter
Expand Down
136 changes: 84 additions & 52 deletions src/ramanchada2/spectrum/peaks/fit_peaks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#!/usr/bin/env python3

from typing import Literal
from typing import Literal, List, Union
from collections import UserList
import json

import numpy as np
from pydantic import validate_arguments
from pydantic import validate_arguments, Field
from lmfit.models import lmfit_models
from lmfit import models

from ramanchada2.misc.spectrum_deco import (add_spectrum_method,
add_spectrum_filter)
from ramanchada2.misc.types import (PeakCandidatesListModel,
PeakCandidateModel)
from ..spectrum import Spectrum


Expand All @@ -23,64 +24,95 @@ def valuesdict(self):

@property
def locations(self):
return [v for peak in self for k, v in peak.values.items() if '_center' in k]
return [v for peak in self for k, v in peak.values.items() if k.endswith('center')]

def to_json(self):
mod = [peak.model.dumps() for peak in self]
par = [peak.params.dumps() for peak in self]
return json.dumps(dict(models=mod, params=par))


@add_spectrum_method
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def fit_peaks(
spe, /,
model: Literal['Gaussian', 'Lorentzian', 'Moffat',
'Voigt', 'PseudoVoigt',
],
bounds,
amplitudes=None,
locations=None,
widths=None,
intercept=True,
slope=False,
):
results = FitPeaksResult()
if locations is None:
locations = np.average(bounds, axis=1).astype(int)
if amplitudes is None:
amplitudes = spe.y[locations]
if widths is None:
widths = np.diff(bounds/3, axis=1).astype(int)
widths.shape = (-1)
locations = spe.x[locations]
bounds = np.array(bounds).astype(int)
for i, (b, a, x0, w) in enumerate(zip(bounds, amplitudes, locations, widths)):
x = spe.x[b[0]:b[1]]
y = spe.y[b[0]:b[1]]
mod = lmfit_models[model](name=f'p{i}', prefix=f'p{i}_')
if intercept or slope:
mod += models.LinearModel(name=f'p{i}_pedestal', prefix=f'p{i}_pedestal_')
params = mod.make_params()
if intercept:
params[f'p{i}_pedestal_intercept'].set(value=np.min(y), min=0)
else:
params[f'p{i}_pedestal_intercept'].set(value=0, vary=False, min=0)
if slope:
params[f'p{i}_pedestal_slope'].set(value=0)
else:
params[f'p{i}_pedestal_slope'].set(value=0, vary=False)
params[f'p{i}_amplitude'].set(value=a*10, min=0)
params[f'p{i}_center'].set(value=x0)
params[f'p{i}_sigma'].set(value=w)
def fit_peaks_model(spe: Spectrum, /, *,
model: Literal['Gaussian', 'Lorentzian', 'Moffat',
'Voigt', 'PseudoVoigt'],
peak_candidates: Union[PeakCandidateModel, PeakCandidatesListModel],
n_sigma_trim: float = Field(5, gt=0),
):
lb, rb = peak_candidates.boundaries_idx(n_sigma=n_sigma_trim, x_arr=spe.x)
fitx = spe.x[lb:rb]
fity = spe.y[lb:rb]

if isinstance(peak_candidates, PeakCandidateModel):
x0, a, w, p = peak_candidates.pos_ampl_sigma_base(x_arr=spe.x)
fit_model = lmfit_models[model]()
fit_params = fit_model.make_params()
fit_params['amplitude'].set(value=a*3, min=0, max=a*50)
fit_params['center'].set(value=x0, min=spe.x[lb], max=spe.x[rb])
fit_params['sigma'].set(value=w, min=.01, max=w*5)
if model == 'Moffat':
params[f'p{i}_beta'].set(value=1)
if model == 'Voigt':
tmpres = mod.fit(y, params=params, x=x)
params = tmpres.params
params[f'p{i}_gamma'].set(vary=True)
results.append(mod.fit(y, params=params, x=x))
return results
fit_params['beta'].set(value=1)
else:
mod_list = list()
pos_ampl_sigma_base = peak_candidates.pos_ampl_sigma_base(x_arr=spe.x)
for i, (x0, a, w, p) in enumerate(pos_ampl_sigma_base):
mod = lmfit_models[model](name=f'p{i}', prefix=f'p{i}_')
mod_list.append(mod)
fit_model = np.sum(mod_list)

fit_params = fit_model.make_params()
for i, (x0, a, w, p) in enumerate(pos_ampl_sigma_base):
fit_params[f'p{i}_amplitude'].set(value=a*3, min=0, max=a*50)
fit_params[f'p{i}_center'].set(value=x0, min=spe.x[lb], max=spe.x[rb])
fit_params[f'p{i}_sigma'].set(value=w, min=.01, max=w*5)
if model == 'Moffat':
fit_params[f'p{i}_beta'].set(value=1)

if model == 'Voigt':
tmpres = fit_model.fit(fity, params=fit_params, x=fitx)
fit_params = tmpres.params
if isinstance(peak_candidates, PeakCandidateModel):
fit_params['gamma'].set(vary=True)
else:
for i in range(len(peak_candidates)):
fit_params[f'p{i}_gamma'].set(vary=True)
return fit_model.fit(fity, params=fit_params, x=fitx)


@add_spectrum_method
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def fit_peak_groups(spe, /, *,
model: Literal['Gaussian', 'Lorentzian', 'Moffat',
'Voigt', 'PseudoVoigt',
],
peak_candidate_groups: List[PeakCandidatesListModel],
n_sigma_trim: float = 3,
):
fit_res = FitPeaksResult()
for group in peak_candidate_groups:
fit_res.append(fit_peaks_model(spe,
peak_candidates=group,
model=model,
n_sigma_trim=n_sigma_trim))
return fit_res


@add_spectrum_method
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def fit_peaks(spe, /, *,
model: Literal['Gaussian', 'Lorentzian', 'Moffat',
'Voigt', 'PseudoVoigt',
],
peak_candidates: PeakCandidatesListModel,
n_sigma_trim: float = 3,
):
fit_res = FitPeaksResult()
for peak in peak_candidates:
fit_res.append(fit_peaks_model(spe,
peak_candidates=peak,
model=model,
n_sigma_trim=n_sigma_trim))
return fit_res


@add_spectrum_filter
Expand Down
4 changes: 2 additions & 2 deletions tests/end_to_end/test_generate_and_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ def test_generate_and_fit():
found_peaks = spe.find_peaks(prominence=.1)

true_pos = np.array(list(lines.keys()))
calc_pos = found_peaks['locations']
fit_peaks = spe.fit_peaks('Voigt', **found_peaks)
calc_pos = found_peaks.peaks
fit_peaks = spe.fit_peaks(model='Voigt', peak_candidates=found_peaks)
fit_pos = fit_peaks.locations

assert len(true_pos) == len(calc_pos), 'wrong number of peaks found'
Expand Down

0 comments on commit 5f9ee1e

Please sign in to comment.