Skip to content

Commit

Permalink
feat: show peak info
Browse files Browse the repository at this point in the history
  • Loading branch information
Lan Le committed May 8, 2024
1 parent 469314e commit ed60e15
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 45 deletions.
222 changes: 177 additions & 45 deletions chem_spectra/lib/composer/ni.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re # noqa: E402
import numpy as np # noqa: E402
from matplotlib import ticker # noqa: E402
from matplotlib.patches import FancyArrowPatch
import csv

from chem_spectra.lib.composer.base import ( # noqa: E402, F401
Expand Down Expand Up @@ -263,6 +264,7 @@ def __fakto(self):

def tf_img(self):
plt.rcParams['figure.figsize'] = [16, 9]
plt.rcParams['figure.dpi'] = 200
plt.rcParams['font.size'] = 14

# PLOT data
Expand All @@ -273,10 +275,10 @@ def tf_img(self):
plt.xlim(xlim_left, xlim_right)
y_max, y_min = np.max(self.core.ys), np.min(self.core.ys)
h = y_max - y_min
plt.ylim(
y_min - h * 0.2,
y_max + h * 0.2,
)
w = x_max - x_min
y_boundary_min = y_min - h * 0.2
y_boundary_max = y_max + h * 0.5

# PLOT peaks
faktor = self.__fakto()
path_data = [
Expand Down Expand Up @@ -364,9 +366,8 @@ def tf_img(self):
markersize=50,
)

# ----- PLOT integration -----
# ----- Calculate integration -----
refShift, refArea = self.refShift, self.refArea
itg_h = y_max + h * 0.1
if (len(self.all_itgs) == 0 and len(self.core.itg_table) > 0 and not self.core.params['integration'].get('edited') and ('originStack' not in self.core.params['integration'])):
core_itg_table = self.core.itg_table[0]
itg_table = core_itg_table.split('\n')
Expand All @@ -379,41 +380,9 @@ def tf_img(self):
xUStr = split_itg[1].strip()
areaStr = split_itg[2].strip()
self.all_itgs.append({'xL': float(xLStr), 'xU': float(xUStr), 'area': float(areaStr)}) # noqa: E501
for itg in self.all_itgs:
# integration marker
xL, xU, area = itg['xL'] - refShift, itg['xU'] - refShift, itg['area'] * refArea # noqa: E501
# integration curve
ks = calc_ks(self.core.ys, y_max, h)
iL, iU = get_curve_endpoint(self.core.xs, self.core.ys, xL, xU)
ref = ks[iL]
cxs = self.core.xs[iL:iU]

if self.core.is_hplc_uv_vis:
# fill area under curve
cys = self.core.ys[iL:iU]
slope = cal_slope(cxs[0], cys[0], cxs[len(cxs)-1], cys[len(cys)-1]) # noqa: E501
last_y = cys[0]
last_x = cxs[0]
aucys = [last_y]
for i in range(1, len(cys)):
curr_x = cxs[i]
curr_y = slope*(curr_x-last_x) + last_y
aucys.append(curr_y)
last_x = curr_x
last_y = curr_y
plt.fill_between(cxs, y1=cys, y2=aucys, alpha=0.2, color='#FF0000') # noqa: E501
else:
# display integration
plt.plot([xL, xU], [itg_h, itg_h], color='#228B22')
plt.plot([xL, xL], [itg_h + h * 0.01, itg_h - h * 0.01], color='#228B22') # noqa: E501
plt.plot([xU, xU], [itg_h + h * 0.01, itg_h - h * 0.01], color='#228B22') # noqa: E501
plt.text((xL + xU) / 2, itg_h + h * 0.015, '{:0.2f}'.format(area), color='#228B22', ha='center', size=12) # noqa: E501
cys = (ks[iL:iU] - ref) * 1.5 + (y_max - h * 0.4)
# if self.core.is_uv_vis:
# cys = (ref - ks[iL:iU]) * 0.5 + (y_max - h * 0.4)
plt.plot(cxs, cys, color='#228B22')


# ----- PLOT multiplicity -----
# ----- Calculate multiplicity -----
if (len(self.mpys) == 0 and len(self.core.mpy_itg_table) > 0 and not self.core.params['integration'].get('edited') and ('originStack' not in self.core.params['integration'])):
core_mpy_pks_table = self.core.mpy_pks_table[0]
mpy_pks_table = core_mpy_pks_table.split('\n')
Expand Down Expand Up @@ -451,17 +420,27 @@ def tf_img(self):
mpy_item['mpyType'] = typeStr
mpy_item['peaks'] = tmp_dic_mpy_peaks[idxStr]
self.mpys.append(mpy_item)
mpy_h = y_min - h * 0.08

# ----- PLOT integration -----
itg_h = y_max + h * 0.6
itg_h = itg_h + itg_h * 0.1
itg_value_position_y = y_min - h * 0.25
if (len(self.mpys) == 0):
itg_value_position_y = y_min - h * 0.05
y_boundary_max = self.__draw_integrals(plt, refShift, refArea, y_max, h, itg_value_position_y, itg_h)
y_boundary_min = itg_value_position_y - h * 0.1

# ----- PLOT multiplicity -----
mpy_h = y_min - h * 0.03
for mpy in self.mpys:
xL, xU, area, typ, peaks = mpy['xExtent']['xL'] - refShift, mpy['xExtent']['xU'] - refShift, mpy['area'] * refArea, mpy['mpyType'], mpy['peaks'] # noqa: E501
plt.plot([xL, xU], [mpy_h, mpy_h], color='#DA70D6')
plt.plot([xL, xL], [mpy_h + h * 0.01, mpy_h - h * 0.01], color='#DA70D6') # noqa: E501
plt.plot([xU, xU], [mpy_h + h * 0.01, mpy_h - h * 0.01], color='#DA70D6') # noqa: E501
plt.text((xL + xU) / 2, mpy_h - h * 0.1, '({})'.format(typ), color='#DA70D6', ha='center', size=12) # noqa: E501
plt.text((xL + xU) / 2, mpy_h - h * 0.06, '{:0.3f}'.format(calc_mpy_center(mpy['peaks'], refShift, mpy['mpyType'])), color='#DA70D6', ha='center', size=12) # noqa: E501
plt.text((xL + xU) / 2, mpy_h - h * 0.01, '{:0.3f} ({})'.format(calc_mpy_center(mpy['peaks'], refShift, mpy['mpyType']), typ), color='#DA70D6', size=7, rotation=90., ha='right', va='top', rotation_mode='anchor') # noqa: E501
for p in peaks:
x = p['x']
plt.plot([x - refShift, x - refShift], [mpy_h, mpy_h + h * 0.05], color='#DA70D6') # noqa: E501
plt.plot([x - refShift, x - refShift], [mpy_h, mpy_h + h * 0.02], color='#DA70D6') # noqa: E501

# PLOT label
if (self.core.is_xrd):
Expand All @@ -470,22 +449,176 @@ def tf_img(self):
plt.xlabel((label), fontsize=18)
elif (self.core.is_cyclic_volta):
plt.xlabel("{}".format(self.core.label['x']), fontsize=18)
elif (self.core.non_nmr == False):
plt.xlabel("Chemical shift ({})".format(self.core.label['x'].lower()), fontsize=18)
else:
plt.xlabel("X ({})".format(self.core.label['x']), fontsize=18)

if (self.core.is_cyclic_volta):
plt.ylabel("{}".format(self.core.label['y']), fontsize=18)
elif (self.core.non_nmr == False):
plt.ylabel("Intensity ({})".format(self.core.label['y'].lower()), fontsize=18)
else:
plt.ylabel("Y ({})".format(self.core.label['y']), fontsize=18)
plt.locator_params(nbins=self.__plt_nbins())
plt.grid(False)

y_boundary_max = self.__draw_peaks(plt, x_peaks, y_peaks, h, w, y_boundary_max*1.5)


plt.ylim(
y_boundary_min,
y_boundary_max,
)

# Save
tf_img = tempfile.NamedTemporaryFile(suffix='.png')
plt.savefig(tf_img, format='png')
tf_img.seek(0)
plt.clf()
plt.cla()
return tf_img


def __draw_integrals(self, plt, refShift, refArea, y_max, h, itg_value_position_y, itg_h):
y_boundary_max = y_max
for itg in self.all_itgs:
# integration marker
xL, xU, area = itg['xL'] - refShift, itg['xU'] - refShift, itg['area'] * refArea # noqa: E501
# integration curve
ks = calc_ks(self.core.ys, y_max, h)
iL, iU = get_curve_endpoint(self.core.xs, self.core.ys, xL, xU)
ref = ks[iL]
cxs = self.core.xs[iL:iU]

if self.core.is_hplc_uv_vis:
# fill area under curve
cys = self.core.ys[iL:iU]
slope = cal_slope(cxs[0], cys[0], cxs[len(cxs)-1], cys[len(cys)-1]) # noqa: E501
last_y = cys[0]
last_x = cxs[0]
aucys = [last_y]
for i in range(1, len(cys)):
curr_x = cxs[i]
curr_y = slope*(curr_x-last_x) + last_y
aucys.append(curr_y)
last_x = curr_x
last_y = curr_y
plt.fill_between(cxs, y1=cys, y2=aucys, alpha=0.2, color='#FF0000') # noqa: E501
else:
# display integration
plt.plot([xL, xU], [itg_value_position_y, itg_value_position_y], color='#228B22')
plt.plot([xL, xL], [itg_value_position_y + h * 0.01, itg_value_position_y - h * 0.01], color='#228B22') # noqa: E501
plt.plot([xU, xU], [itg_value_position_y + h * 0.01, itg_value_position_y - h * 0.01], color='#228B22') # noqa: E501
plt.text((xL + xU) / 2, itg_value_position_y - h * 0.01, '{:0.2f}'.format(area), color='#228B22', ha='right', rotation_mode='anchor', size=7, rotation=90.) # noqa: E501
cys = (ks[iL:iU] - ref) * 1.5 + h * 0.15
plt.plot(cxs, cys, color='#228B22')
try:
cys_max = np.max(cys)
y_boundary_max = max(y_boundary_max, cys_max + h*0.1)
except:
pass

return y_boundary_max


def __draw_peaks(self, plt, x_peaks, y_peaks, h, w, y_boundary_max):
if self.core.non_nmr == True or len(x_peaks) == 0:
return y_boundary_max

params = self.core.params
if params['ref_name'] is None or params['peaks_str'] is None:
return y_boundary_max

ax = plt.gca()
differences = np.diff(x_peaks)
grouping_threshold = np.mean(differences)
groups_x = []
groups_y = []
current_group_x = [x_peaks[0]]
current_group_y = [y_peaks[0]]

for i in range(1, len(x_peaks)):
if (x_peaks[i] - current_group_x[-1] < grouping_threshold):
current_group_x.append(x_peaks[i])
current_group_y.append(y_peaks[i])
else:
groups_x.append(current_group_x)
current_group_x = [x_peaks[i]]
groups_y.append(current_group_y)
current_group_y = [y_peaks[i]]

groups_x.append(current_group_x)
groups_y.append(current_group_y)
max_values = [np.max(items) for items in groups_y]
x_boundary_min, x_boundary_max = np.min(x_peaks), np.max(x_peaks)

highest_peak_anotation = 0

for i in range(len(groups_x)):
mygroup_x, mygroup_y = groups_x[i], groups_y[i]
len_group_x = len(mygroup_x)
if len_group_x > 1:
max_current_group = max_values[i] + h * 0.25
if (i > 0):
prev_max_group = max_values[i-1] + h * 0.25
my_gap = abs(max_current_group - prev_max_group)
if my_gap < h*0.1:
max_current_group = max_current_group + h * 0.45
middle_idx = int(len_group_x/2)
gap_value = np.mean(mygroup_x)
x_text = 0
for j in range(len_group_x):
x_pos = mygroup_x[j]
y_pos = mygroup_y[j] + h * 0.5
x_float = '{:.2f}'.format(x_pos)
peak_label = '{x}'.format(x=x_float)
if j >= middle_idx:
x_text = -(w * 0.01) * (middle_idx - j)
elif j < middle_idx:
x_text = w * 0.01 * (j-middle_idx)

ax.add_patch(FancyArrowPatch((x_pos, max_current_group), (x_pos, max_current_group + h * 0.05), linewidth=0.1))
ax.add_patch(FancyArrowPatch((x_pos, max_current_group + h * 0.05), (gap_value + x_text, max_current_group + h * 0.11), linewidth=0.1))

x_boundary_min = min(gap_value + x_text, x_boundary_min)
x_boundary_max = max(gap_value + x_text, x_boundary_max)

ax.annotate(peak_label,
xy=(gap_value + x_text, max_current_group + h * 0.11), xycoords='data',
xytext=(0, 12), textcoords='offset points',
arrowprops=dict(arrowstyle="-", linewidth=0.2),
rotation=90, size=6)

highest_peak_anotation = max(highest_peak_anotation, max_current_group + h * 0.25)
else:
x_pos = mygroup_x[0]
y_pos = mygroup_y[0] + h * 0.18
x_float = '{:.2f}'.format(x_pos)
peak_label = '{x}'.format(x=x_float)
ax.annotate(peak_label,
xy=(x_pos, y_pos), xycoords='data',
xytext=(0, 20), textcoords='offset points',
arrowprops=dict(arrowstyle="-", linewidth=0.2),
rotation=90, size=6)

highest_peak_anotation = max(highest_peak_anotation, y_pos)

x_boundary_min = x_boundary_min - w * 0.01
x_boundary_max = x_boundary_max + w * 0.02
if self.core.ncl == '13C':
x_boundary_min = min(x_boundary_min, -10.0)
x_boundary_max = max(x_boundary_max, 195.0)
elif self.core.ncl == '1H':
x_boundary_min = min(x_boundary_min, -0.2)
x_boundary_max = max(x_boundary_max, 9.0)
plt.xlim(
x_boundary_max,
x_boundary_min,
)

y_boundary_max = min(y_boundary_max, highest_peak_anotation)
return y_boundary_max

def __prepare_metadata_info_for_csv(self, csv_writer: csv.DictWriter):
csv_writer.writerow({
Expand All @@ -512,7 +645,6 @@ def __prepare_metadata_info_for_csv(self, csv_writer: csv.DictWriter):
})
csv_writer.writerow({
})


def tf_csv(self):
if self.core.is_cyclic_volta == False:
Expand Down
6 changes: 6 additions & 0 deletions tests/lib/converter/jcamp/test_jcamp_ni_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ def test_init_jcamp_ni_success(jcamp_file_1h):

assert ni_converter is not None
assert ni_converter.base == base_converter

def test_init_jcamp_ni_nmr_label(jcamp_file_1h):
base_converter = JcampBaseConverter(jcamp_file_1h)
ni_converter = JcampNIConverter(base=base_converter)

assert ni_converter.label == {'x': 'PPM', 'y': 'ARBITRARY'}

0 comments on commit ed60e15

Please sign in to comment.