diff --git a/simpeg/dask/__init__.py b/simpeg/dask/__init__.py index f5a00b7334..6de861ddb1 100644 --- a/simpeg/dask/__init__.py +++ b/simpeg/dask/__init__.py @@ -1,6 +1,7 @@ try: import simpeg.dask.simulation import simpeg.dask.electromagnetics.frequency_domain.simulation + import simpeg.dask.electromagnetics.natural_source.receivers import simpeg.dask.electromagnetics.static.resistivity.simulation import simpeg.dask.electromagnetics.static.resistivity.simulation_2d import simpeg.dask.electromagnetics.static.induced_polarization.simulation diff --git a/simpeg/dask/electromagnetics/frequency_domain/simulation.py b/simpeg/dask/electromagnetics/frequency_domain/simulation.py index eb8527ed8a..538f2ffaa4 100644 --- a/simpeg/dask/electromagnetics/frequency_domain/simulation.py +++ b/simpeg/dask/electromagnetics/frequency_domain/simulation.py @@ -4,11 +4,17 @@ import scipy.sparse as sp from multiprocessing import cpu_count from dask import array, compute, delayed +from simpeg.utils import sdiag +from time import time # from dask.distributed import get_client, Client, performance_report from simpeg.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag -from simpeg.dask.utils import get_parallel_blocks +from simpeg.dask.utils import get_block_survey from simpeg.electromagnetics.natural_source.sources import PlanewaveXYPrimary +from simpeg.electromagnetics.natural_source.receivers import ( + PointNaturalSource, + Point3DTipper, +) import zarr from tqdm import tqdm @@ -199,34 +205,54 @@ def compute_J(self, f=None): Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 32.0 * 1e-6)) - blocks = get_parallel_blocks( - self.survey.source_list, compute_row_size, optimize=False - ) + + if getattr(self, "source_list_blocks", None) is None: + self.source_list_blocks, self.addresses = get_block_survey( + self.survey.source_list, compute_row_size, optimize=True + ) + fields_array = delayed(f[:, self._solutionType]) fields = delayed(f) - survey = delayed(self.survey) + + e = delayed(f[self.survey.source_list[0], "e"]) + h = delayed(f[self.survey.source_list[0], "h"]) mesh = delayed(self.mesh) - blocks_receiver_derivs = [] - - for block in blocks: - blocks_receiver_derivs.append( - receiver_derivs( - survey, - mesh, - fields, - block, + + parallel_blocks = [] + + for block in self.source_list_blocks: + deriv_blocks = [] + for source in block: + deriv_blocks.append( + array.from_delayed( + receiver_derivs( + source.frequency, + source.receiver_list[0], # Always a list of one + mesh, + e, + h, + self, + ), + dtype=np.complex128, + shape=(A_i.A.shape[0], source.receiver_list[0].nD * 2), + ) ) - ) + + parallel_blocks.append(deriv_blocks) # with Client(processes=False) as client: # with performance_report(filename="dask-report.html"): # Dask process for all derivatives - blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] + # blocks_receiver_derivs = [] + # tc = time() + # print("Computing receiver derivatives") + # parallel_blocks = compute(parallel_blocks)[0] + # print("Runtime:", time() - tc) for block_derivs_chunks, addresses_chunks in tqdm( - zip(blocks_receiver_derivs, blocks), - ncols=len(blocks_receiver_derivs), + zip(parallel_blocks, self.addresses), + ncols=len(parallel_blocks), desc=f"Sensitivities at {list(self.Ainv)[0]} Hz", ): Jmatrix = parallel_block_compute( @@ -250,7 +276,10 @@ def parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses ): m_size = self.model.size - block_stack = sp.hstack(blocks_receiver_derivs).toarray() + print("Computing block") + tc = time() + block_stack = array.hstack(blocks_receiver_derivs).compute().toarray() + print("Runtime:", time() - tc) ATinvdf_duT = delayed(A_i * block_stack) count = 0 rows = [] @@ -291,24 +320,17 @@ def parallel_block_compute( @delayed -def receiver_derivs(survey, mesh, fields, blocks): - field_derivatives = [] - for address in blocks: - source = survey.source_list[address[0][0]] - receiver = source.receiver_list[address[0][1]] - - if isinstance(source, PlanewaveXYPrimary): - v = np.eye(receiver.nD, dtype=float) - else: - v = sp.csr_matrix(np.ones(receiver.nD), dtype=float) +def receiver_derivs(frequency, receiver, mesh, e, h, simulation): + v = sdiag(np.ones(receiver.nD)) - # Assume the derivatives in terms of model are Zero (seems to always be case) - dfduT, _ = receiver.evalDeriv( - source, mesh, fields, v=v[:, address[1][0]], adjoint=True - ) - field_derivatives.append(dfduT) + if isinstance(receiver, Point3DTipper): + if receiver.component == "imag": + v = -1j * v + dfduT = receiver._eval_tipper_deriv(frequency, mesh, h, simulation, v=v, adjoint=True) + elif isinstance(receiver, PointNaturalSource): + dfduT, _ = receiver._eval_impedance_deriv(frequency, mesh, e, h, simulation, v=v, adjoint=True) - return field_derivatives + return dfduT @delayed diff --git a/simpeg/dask/electromagnetics/natural_source/receivers.py b/simpeg/dask/electromagnetics/natural_source/receivers.py new file mode 100644 index 0000000000..16fcaf1c16 --- /dev/null +++ b/simpeg/dask/electromagnetics/natural_source/receivers.py @@ -0,0 +1,244 @@ +import scipy.sparse as sp +import numpy as np +from simpeg.electromagnetics.natural_source.receivers import ( + PointNaturalSource, + Point3DTipper, +) +from simpeg.utils import sdiag + + +def _eval_impedance_deriv(self, frequency, mesh, e, h, simulation, du_dm_v=None, v=None, adjoint=False): + if mesh.dim < 3 and self.orientation in ["xx", "yy"]: + if adjoint: + return 0 * v + else: + return 0 * du_dm_v + # e = f[src, "e"] + # h = f[src, "h"] + if mesh.dim == 3: + if self.orientation[0] == "x": + Pe = self.getP(mesh, "Ex", "e") + e = Pe @ e + else: + Pe = self.getP(mesh, "Ey", "e") + e = Pe @ e + + Phx = self.getP(mesh, "Fx", "h") + Phy = self.getP(mesh, "Fy", "h") + hx = Phx @ h + hy = Phy @ h + if self.orientation[1] == "x": + h = hy + else: + h = -hx + + top = e[:, 0] * h[:, 1] - e[:, 1] * h[:, 0] + bot = hx[:, 0] * hy[:, 1] - hx[:, 1] * hy[:, 0] + imp = top / bot + else: + if mesh.dim == 1: + e_loc = f.aliasFields["e"][1] + h_loc = f.aliasFields["h"][1] + PE = self.getP(mesh, e_loc) + PH = self.getP(mesh, h_loc) + elif mesh.dim == 2: + if self.orientation == "xy": + PE = self.getP(mesh, "Ex") + PH = self.getP(mesh, "CC") + elif self.orientation == "yx": + PE = self.getP(mesh, "CC") + PH = self.getP(mesh, "Ex") + + top = PE @ e[:, 0] + bot = PH @ h[:, 0] + + if mesh.dim == 1 and self.orientation != f.field_directions: + bot = -bot + + imp = top / bot + + if adjoint: + if self.component == "phase": + # gradient of arctan2(y, x) is (-y/(x**2 + y**2), x/(x**2 + y**2)) + v = 180 / np.pi * imp / (imp.real**2 + imp.imag**2) * v + # switch real and imaginary, and negate real part of output + v = -v.imag - 1j * v.real + # imaginary part gets extra (-) due to conjugate transpose + elif self.component == "apparent_resistivity": + v = 2 * _alpha(src) * imp * v + v = v.real - 1j * v.imag + elif self.component == "imag": + v = -1j * v + + # Work backwards! + gtop_v = sdiag(bot**-1) @ v + gbot_v = sdiag(-imp / bot) @ v + + if mesh.dim == 3: + block_a = sp.kron(sdiag(hy[:, 1]) @ gbot_v, [1, 0]) + block_b = sp.kron(sdiag(-hy[:, 0]) @ gbot_v, [0, 1]) + ghx_v = block_a + block_b + + block_a = sp.kron(sdiag(-hx[:, 1]) @ gbot_v, [1, 0]) + block_b = sp.kron(sdiag(hx[:, 0]) @ gbot_v, [0, 1]) + ghy_v = block_a + block_b + + block_a = sp.kron(sdiag(h[:, 1]) @ gtop_v, [1, 0]) + block_b = sp.kron(sdiag(-h[:, 0]) @ gtop_v, [0, 1]) + ge_v = block_a + block_b + + block_a = sp.kron(sdiag(-e[:, 1]) @ gtop_v, [1, 0]) + block_b = sp.kron(sdiag(e[:, 0]) @ gtop_v, [0, 1]) + gh_v = block_a + block_b + + if self.orientation[1] == "x": + ghy_v += gh_v + else: + ghx_v -= gh_v + + gh_v = Phx.T @ ghx_v + Phy.T @ ghy_v + ge_v = Pe.T @ ge_v + else: + if mesh.dim == 1 and self.orientation != f.field_directions: + gbot_v = -gbot_v + + gh_v = PH.T @ gbot_v + ge_v = PE.T @ gtop_v + + gfu_h_v = -1.0 / (1j * 2 * np.pi * frequency) * (mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v))) + + return gfu_h_v + ge_v, None + + if mesh.dim == 3: + de_v = Pe @ f._eDeriv(src, du_dm_v, v, adjoint=False) + dh_v = f._hDeriv(src, du_dm_v, v, adjoint=False) + dhx_v = Phx @ dh_v + dhy_v = Phy @ dh_v + if self.orientation[1] == "x": + dh_dm_v = dhy_v + else: + dh_dm_v = -dhx_v + + dtop_v = ( + e[:, 0] * dh_dm_v[:, 1] + + de_v[:, 0] * h[:, 1] + - e[:, 1] * dh_dm_v[:, 0] + - de_v[:, 1] * h[:, 0] + ) + dbot_v = ( + hx[:, 0] * dhy_v[:, 1] + + dhx_v[:, 0] * hy[:, 1] + - hx[:, 1] * dhy_v[:, 0] + - dhx_v[:, 1] * hy[:, 0] + ) + imp_deriv = (bot * dtop_v - top * dbot_v) / (bot * bot) + else: + de_v = PE @ f._eDeriv(src, du_dm_v, v, adjoint=False) + dh_v = PH @ f._hDeriv(src, du_dm_v, v, adjoint=False) + + if mesh.dim == 1 and self.orientation != f.field_directions: + dh_v = -dh_v + + imp_deriv = (de_v - imp * dh_v) / bot + + if self.component == "apparent_resistivity": + rx_deriv = ( + 2 * _alpha(src) * (imp.real * imp_deriv.real + imp.imag * imp_deriv.imag) + ) + elif self.component == "phase": + amp2 = imp.imag**2 + imp.real**2 + deriv_re = -imp.imag / amp2 * imp_deriv.real + deriv_im = imp.real / amp2 * imp_deriv.imag + + rx_deriv = (180 / np.pi) * (deriv_re + deriv_im) + else: + rx_deriv = getattr(imp_deriv, self.component) + return rx_deriv + + +PointNaturalSource._eval_impedance_deriv = _eval_impedance_deriv + + +def _eval_tipper_deriv(self, frequency, mesh, h, simulation, du_dm_v=None, v=None, adjoint=False): + # will grab both primary and secondary and sum them! + + # if not isinstance(f, np.ndarray): + # h = f[src, "h"] + # else: + # h = f + + Phx = self.getP(mesh, "Fx", "h") + Phy = self.getP(mesh, "Fy", "h") + Phz = self.getP(mesh, "Fz", "h") + hx = Phx @ h + hy = Phy @ h + hz = Phz @ h + + if self.orientation[1] == "x": + h = -hy + else: + h = hx + + top = h[:, 0] * hz[:, 1] - h[:, 1] * hz[:, 0] + bot = hx[:, 0] * hy[:, 1] - hx[:, 1] * hy[:, 0] + imp = top / bot + + if adjoint: + + gtop_v = sdiag(bot**-1) @ v + gbot_v = sdiag(-imp / bot) @ v + + block_a = sp.kron(sdiag(hy[:, 1]) @ gbot_v, [1, 0]) + block_b = sp.kron(sdiag(-hy[:, 0]) @ gbot_v, [0, 1]) + ghx_v = block_a + block_b + + block_a = sp.kron(sdiag(-hx[:, 1]) @ gbot_v, [1, 0]) + block_b = sp.kron(sdiag(hx[:, 0]) @ gbot_v, [0, 1]) + ghy_v = block_a + block_b + + block_a = sp.kron(sdiag(-h[:, 1]) @ gtop_v, [1, 0]) + block_b = sp.kron(sdiag(h[:, 0]) @ gtop_v, [0, 1]) + ghz_v = block_a + block_b + + block_a = sp.kron(sdiag(hz[:, 1]) @ gtop_v, [1, 0]) + block_b = sp.kron(sdiag(-hz[:, 0]) @ gtop_v, [0, 1]) + gh_v = block_a + block_b + + if self.orientation[1] == "x": + ghy_v -= gh_v + else: + ghx_v += gh_v + + gh_v = Phx.T @ ghx_v + Phy.T @ ghy_v + Phz.T @ ghz_v + + gfu_h_v = -1.0 / (1j * 2 * np.pi * frequency) * ( + mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v))) + + return gfu_h_v + + dh_v = f._hDeriv(src, du_dm_v, v, adjoint=False) + dhx_v = Phx @ dh_v + dhy_v = Phy @ dh_v + dhz_v = Phz @ dh_v + if self.orientation[1] == "x": + dh_v = -dhy_v + else: + dh_v = dhx_v + + dtop_v = ( + h[:, 0] * dhz_v[:, 1] + + dh_v[:, 0] * hz[:, 1] + - h[:, 1] * dhz_v[:, 0] + - dh_v[:, 1] * hz[:, 0] + ) + dbot_v = ( + hx[:, 0] * dhy_v[:, 1] + + dhx_v[:, 0] * hy[:, 1] + - hx[:, 1] * dhy_v[:, 0] + - dhx_v[:, 1] * hy[:, 0] + ) + + return (bot * dtop_v - top * dbot_v) / (bot * bot) + + +Point3DTipper._eval_tipper_deriv = _eval_tipper_deriv diff --git a/simpeg/dask/utils.py b/simpeg/dask/utils.py index ad292dc9a2..8a76d5fa4d 100644 --- a/simpeg/dask/utils.py +++ b/simpeg/dask/utils.py @@ -1,4 +1,5 @@ import numpy as np +from copy import deepcopy from dask.distributed import get_client from multiprocessing import cpu_count @@ -40,6 +41,56 @@ def compute(self, job): return job.compute() +def get_block_survey( + source_list: list, data_block_size, optimize=True +) -> tuple[list, list]: + row_count = 0 + row_index = 0 + block_count = 0 + blocks = [[]] + parallel_blocks_list = [[]] + for s_id, src in enumerate(source_list): + for r_id, rx in enumerate(src.receiver_list): + indices = np.arange(rx.nD).astype(int) + chunks = np.array_split( + indices, int(np.ceil(len(indices) / data_block_size)) + ) + + for ind, chunk in enumerate(chunks): + chunk_size = len(chunk) + new_receiver = type(rx)( + locations=rx.locations[chunk, :], + orientation=rx.orientation, + component=rx.component, + storeProjections=rx.storeProjections, + ) + new_source = deepcopy(src) + new_source.receiver_list = [new_receiver] + + # Condition to start a new block + if (row_count + chunk_size) > (data_block_size * cpu_count()): + row_count = 0 + block_count += 1 + blocks.append([]) + parallel_blocks_list.append([]) + + parallel_blocks_list[block_count].append(new_source) + blocks[block_count].append( + ( + (s_id, r_id, ind), + ( + chunk, + np.arange(row_index, row_index + chunk_size).astype(int), + rx.locations.shape[0], + ), + ) + ) + row_index += chunk_size + row_count += chunk_size + + return parallel_blocks_list, blocks + + def get_parallel_blocks(source_list: list, data_block_size, optimize=True) -> list: """ Get the blocks of sources and receivers to be computed in parallel. diff --git a/simpeg/electromagnetics/natural_source/receivers.py b/simpeg/electromagnetics/natural_source/receivers.py index 930fff879e..2683ee53ce 100644 --- a/simpeg/electromagnetics/natural_source/receivers.py +++ b/simpeg/electromagnetics/natural_source/receivers.py @@ -555,11 +555,12 @@ def _eval_tipper_deriv(self, src, mesh, f, du_dm_v=None, v=None, adjoint=False): else: ghx_v += gh_v - gh_v = ( - Phx.T @ sp.csr_matrix(ghx_v) - + Phy.T @ sp.csr_matrix(ghy_v) - + Phz.T @ sp.csr_matrix(ghz_v) + gh_v = sp.csr_matrix( + Phx.T @ ghx_v + + Phy.T @ ghy_v + + Phz.T @ ghz_v ) + return f._hDeriv(src, None, gh_v, adjoint=True) dh_v = f._hDeriv(src, du_dm_v, v, adjoint=False)