Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOPY-1700: Benchmark parallelization between Azure (spot) and DUG #64

Open
wants to merge 15 commits into
base: release/0.21.2.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions simpeg/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
try:
import simpeg.dask.simulation
import simpeg.dask.electromagnetics.frequency_domain.simulation
import simpeg.dask.electromagnetics.natural_source.receivers
import simpeg.dask.electromagnetics.static.resistivity.simulation
import simpeg.dask.electromagnetics.static.resistivity.simulation_2d
import simpeg.dask.electromagnetics.static.induced_polarization.simulation
Expand Down
92 changes: 57 additions & 35 deletions simpeg/dask/electromagnetics/frequency_domain/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
import scipy.sparse as sp
from multiprocessing import cpu_count
from dask import array, compute, delayed
from simpeg.utils import sdiag
from time import time

# from dask.distributed import get_client, Client, performance_report
from simpeg.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag
from simpeg.dask.utils import get_parallel_blocks
from simpeg.dask.utils import get_block_survey
from simpeg.electromagnetics.natural_source.sources import PlanewaveXYPrimary
from simpeg.electromagnetics.natural_source.receivers import (
PointNaturalSource,
Point3DTipper,
)
import zarr
from tqdm import tqdm

Expand Down Expand Up @@ -199,34 +205,54 @@ def compute_J(self, f=None):
Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32)

compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 32.0 * 1e-6))
blocks = get_parallel_blocks(
self.survey.source_list, compute_row_size, optimize=False
)

if getattr(self, "source_list_blocks", None) is None:
self.source_list_blocks, self.addresses = get_block_survey(
self.survey.source_list, compute_row_size, optimize=True
)

fields_array = delayed(f[:, self._solutionType])
fields = delayed(f)
survey = delayed(self.survey)

e = delayed(f[self.survey.source_list[0], "e"])
h = delayed(f[self.survey.source_list[0], "h"])
mesh = delayed(self.mesh)
blocks_receiver_derivs = []

for block in blocks:
blocks_receiver_derivs.append(
receiver_derivs(
survey,
mesh,
fields,
block,

parallel_blocks = []

for block in self.source_list_blocks:
deriv_blocks = []
for source in block:
deriv_blocks.append(
array.from_delayed(
receiver_derivs(
source.frequency,
source.receiver_list[0], # Always a list of one
mesh,
e,
h,
self,
),
dtype=np.complex128,
shape=(A_i.A.shape[0], source.receiver_list[0].nD * 2),
)
)
)

parallel_blocks.append(deriv_blocks)

# with Client(processes=False) as client:
# with performance_report(filename="dask-report.html"):

# Dask process for all derivatives
blocks_receiver_derivs = compute(blocks_receiver_derivs)[0]
# blocks_receiver_derivs = []
# tc = time()
# print("Computing receiver derivatives")
# parallel_blocks = compute(parallel_blocks)[0]
# print("Runtime:", time() - tc)

for block_derivs_chunks, addresses_chunks in tqdm(
zip(blocks_receiver_derivs, blocks),
ncols=len(blocks_receiver_derivs),
zip(parallel_blocks, self.addresses),
ncols=len(parallel_blocks),
desc=f"Sensitivities at {list(self.Ainv)[0]} Hz",
):
Jmatrix = parallel_block_compute(
Expand All @@ -250,7 +276,10 @@ def parallel_block_compute(
self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses
):
m_size = self.model.size
block_stack = sp.hstack(blocks_receiver_derivs).toarray()
print("Computing block")
tc = time()
block_stack = array.hstack(blocks_receiver_derivs).compute().toarray()
print("Runtime:", time() - tc)
ATinvdf_duT = delayed(A_i * block_stack)
count = 0
rows = []
Expand Down Expand Up @@ -291,24 +320,17 @@ def parallel_block_compute(


@delayed
def receiver_derivs(survey, mesh, fields, blocks):
field_derivatives = []
for address in blocks:
source = survey.source_list[address[0][0]]
receiver = source.receiver_list[address[0][1]]

if isinstance(source, PlanewaveXYPrimary):
v = np.eye(receiver.nD, dtype=float)
else:
v = sp.csr_matrix(np.ones(receiver.nD), dtype=float)
def receiver_derivs(frequency, receiver, mesh, e, h, simulation):
v = sdiag(np.ones(receiver.nD))

# Assume the derivatives in terms of model are Zero (seems to always be case)
dfduT, _ = receiver.evalDeriv(
source, mesh, fields, v=v[:, address[1][0]], adjoint=True
)
field_derivatives.append(dfduT)
if isinstance(receiver, Point3DTipper):
if receiver.component == "imag":
v = -1j * v
dfduT = receiver._eval_tipper_deriv(frequency, mesh, h, simulation, v=v, adjoint=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[black-format] reported by reviewdog 🐶

Suggested change
dfduT = receiver._eval_tipper_deriv(frequency, mesh, h, simulation, v=v, adjoint=True)
dfduT = receiver._eval_tipper_deriv(
frequency, mesh, h, simulation, v=v, adjoint=True
)

elif isinstance(receiver, PointNaturalSource):
dfduT, _ = receiver._eval_impedance_deriv(frequency, mesh, e, h, simulation, v=v, adjoint=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[black-format] reported by reviewdog 🐶

Suggested change
dfduT, _ = receiver._eval_impedance_deriv(frequency, mesh, e, h, simulation, v=v, adjoint=True)
dfduT, _ = receiver._eval_impedance_deriv(
frequency, mesh, e, h, simulation, v=v, adjoint=True
)


return field_derivatives
return dfduT


@delayed
Expand Down
244 changes: 244 additions & 0 deletions simpeg/dask/electromagnetics/natural_source/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import scipy.sparse as sp
import numpy as np
from simpeg.electromagnetics.natural_source.receivers import (
PointNaturalSource,
Point3DTipper,
)
from simpeg.utils import sdiag


def _eval_impedance_deriv(self, frequency, mesh, e, h, simulation, du_dm_v=None, v=None, adjoint=False):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[black-format] reported by reviewdog 🐶

Suggested change
def _eval_impedance_deriv(self, frequency, mesh, e, h, simulation, du_dm_v=None, v=None, adjoint=False):
def _eval_impedance_deriv(
self, frequency, mesh, e, h, simulation, du_dm_v=None, v=None, adjoint=False
):

if mesh.dim < 3 and self.orientation in ["xx", "yy"]:
if adjoint:
return 0 * v
else:
return 0 * du_dm_v
# e = f[src, "e"]
# h = f[src, "h"]
if mesh.dim == 3:
if self.orientation[0] == "x":
Pe = self.getP(mesh, "Ex", "e")
e = Pe @ e
else:
Pe = self.getP(mesh, "Ey", "e")
e = Pe @ e

Phx = self.getP(mesh, "Fx", "h")
Phy = self.getP(mesh, "Fy", "h")
hx = Phx @ h
hy = Phy @ h
if self.orientation[1] == "x":
h = hy
else:
h = -hx

top = e[:, 0] * h[:, 1] - e[:, 1] * h[:, 0]
bot = hx[:, 0] * hy[:, 1] - hx[:, 1] * hy[:, 0]
imp = top / bot
else:
if mesh.dim == 1:
e_loc = f.aliasFields["e"][1]
h_loc = f.aliasFields["h"][1]
PE = self.getP(mesh, e_loc)
PH = self.getP(mesh, h_loc)
elif mesh.dim == 2:
if self.orientation == "xy":
PE = self.getP(mesh, "Ex")
PH = self.getP(mesh, "CC")
elif self.orientation == "yx":
PE = self.getP(mesh, "CC")
PH = self.getP(mesh, "Ex")

top = PE @ e[:, 0]
bot = PH @ h[:, 0]

if mesh.dim == 1 and self.orientation != f.field_directions:
bot = -bot

imp = top / bot

if adjoint:
if self.component == "phase":
# gradient of arctan2(y, x) is (-y/(x**2 + y**2), x/(x**2 + y**2))
v = 180 / np.pi * imp / (imp.real**2 + imp.imag**2) * v
# switch real and imaginary, and negate real part of output
v = -v.imag - 1j * v.real
# imaginary part gets extra (-) due to conjugate transpose
elif self.component == "apparent_resistivity":
v = 2 * _alpha(src) * imp * v
v = v.real - 1j * v.imag
elif self.component == "imag":
v = -1j * v

# Work backwards!
gtop_v = sdiag(bot**-1) @ v
gbot_v = sdiag(-imp / bot) @ v

if mesh.dim == 3:
block_a = sp.kron(sdiag(hy[:, 1]) @ gbot_v, [1, 0])
block_b = sp.kron(sdiag(-hy[:, 0]) @ gbot_v, [0, 1])
ghx_v = block_a + block_b

block_a = sp.kron(sdiag(-hx[:, 1]) @ gbot_v, [1, 0])
block_b = sp.kron(sdiag(hx[:, 0]) @ gbot_v, [0, 1])
ghy_v = block_a + block_b

block_a = sp.kron(sdiag(h[:, 1]) @ gtop_v, [1, 0])
block_b = sp.kron(sdiag(-h[:, 0]) @ gtop_v, [0, 1])
ge_v = block_a + block_b

block_a = sp.kron(sdiag(-e[:, 1]) @ gtop_v, [1, 0])
block_b = sp.kron(sdiag(e[:, 0]) @ gtop_v, [0, 1])
gh_v = block_a + block_b

if self.orientation[1] == "x":
ghy_v += gh_v
else:
ghx_v -= gh_v

gh_v = Phx.T @ ghx_v + Phy.T @ ghy_v
ge_v = Pe.T @ ge_v
else:
if mesh.dim == 1 and self.orientation != f.field_directions:
gbot_v = -gbot_v

gh_v = PH.T @ gbot_v
ge_v = PE.T @ gtop_v

gfu_h_v = -1.0 / (1j * 2 * np.pi * frequency) * (mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[black-format] reported by reviewdog 🐶

Suggested change
gfu_h_v = -1.0 / (1j * 2 * np.pi * frequency) * (mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v)))
gfu_h_v = (
-1.0
/ (1j * 2 * np.pi * frequency)
* (mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v)))
)


return gfu_h_v + ge_v, None

if mesh.dim == 3:
de_v = Pe @ f._eDeriv(src, du_dm_v, v, adjoint=False)
dh_v = f._hDeriv(src, du_dm_v, v, adjoint=False)
dhx_v = Phx @ dh_v
dhy_v = Phy @ dh_v
if self.orientation[1] == "x":
dh_dm_v = dhy_v
else:
dh_dm_v = -dhx_v

dtop_v = (
e[:, 0] * dh_dm_v[:, 1]
+ de_v[:, 0] * h[:, 1]
- e[:, 1] * dh_dm_v[:, 0]
- de_v[:, 1] * h[:, 0]
)
dbot_v = (
hx[:, 0] * dhy_v[:, 1]
+ dhx_v[:, 0] * hy[:, 1]
- hx[:, 1] * dhy_v[:, 0]
- dhx_v[:, 1] * hy[:, 0]
)
imp_deriv = (bot * dtop_v - top * dbot_v) / (bot * bot)
else:
de_v = PE @ f._eDeriv(src, du_dm_v, v, adjoint=False)
dh_v = PH @ f._hDeriv(src, du_dm_v, v, adjoint=False)

if mesh.dim == 1 and self.orientation != f.field_directions:
dh_v = -dh_v

imp_deriv = (de_v - imp * dh_v) / bot

if self.component == "apparent_resistivity":
rx_deriv = (
2 * _alpha(src) * (imp.real * imp_deriv.real + imp.imag * imp_deriv.imag)
)
elif self.component == "phase":
amp2 = imp.imag**2 + imp.real**2
deriv_re = -imp.imag / amp2 * imp_deriv.real
deriv_im = imp.real / amp2 * imp_deriv.imag

rx_deriv = (180 / np.pi) * (deriv_re + deriv_im)
else:
rx_deriv = getattr(imp_deriv, self.component)
return rx_deriv


PointNaturalSource._eval_impedance_deriv = _eval_impedance_deriv


def _eval_tipper_deriv(self, frequency, mesh, h, simulation, du_dm_v=None, v=None, adjoint=False):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[black-format] reported by reviewdog 🐶

Suggested change
def _eval_tipper_deriv(self, frequency, mesh, h, simulation, du_dm_v=None, v=None, adjoint=False):
def _eval_tipper_deriv(
self, frequency, mesh, h, simulation, du_dm_v=None, v=None, adjoint=False
):

# will grab both primary and secondary and sum them!

# if not isinstance(f, np.ndarray):
# h = f[src, "h"]
# else:
# h = f

Phx = self.getP(mesh, "Fx", "h")
Phy = self.getP(mesh, "Fy", "h")
Phz = self.getP(mesh, "Fz", "h")
hx = Phx @ h
hy = Phy @ h
hz = Phz @ h

if self.orientation[1] == "x":
h = -hy
else:
h = hx

top = h[:, 0] * hz[:, 1] - h[:, 1] * hz[:, 0]
bot = hx[:, 0] * hy[:, 1] - hx[:, 1] * hy[:, 0]
imp = top / bot

if adjoint:

gtop_v = sdiag(bot**-1) @ v
gbot_v = sdiag(-imp / bot) @ v

block_a = sp.kron(sdiag(hy[:, 1]) @ gbot_v, [1, 0])
block_b = sp.kron(sdiag(-hy[:, 0]) @ gbot_v, [0, 1])
ghx_v = block_a + block_b

block_a = sp.kron(sdiag(-hx[:, 1]) @ gbot_v, [1, 0])
block_b = sp.kron(sdiag(hx[:, 0]) @ gbot_v, [0, 1])
ghy_v = block_a + block_b

block_a = sp.kron(sdiag(-h[:, 1]) @ gtop_v, [1, 0])
block_b = sp.kron(sdiag(h[:, 0]) @ gtop_v, [0, 1])
ghz_v = block_a + block_b

block_a = sp.kron(sdiag(hz[:, 1]) @ gtop_v, [1, 0])
block_b = sp.kron(sdiag(-hz[:, 0]) @ gtop_v, [0, 1])
gh_v = block_a + block_b

if self.orientation[1] == "x":
ghy_v -= gh_v
else:
ghx_v += gh_v

gh_v = Phx.T @ ghx_v + Phy.T @ ghy_v + Phz.T @ ghz_v

gfu_h_v = -1.0 / (1j * 2 * np.pi * frequency) * (
mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v)))
Comment on lines +214 to +215

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[black-format] reported by reviewdog 🐶

Suggested change
gfu_h_v = -1.0 / (1j * 2 * np.pi * frequency) * (
mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v)))
gfu_h_v = (
-1.0
/ (1j * 2 * np.pi * frequency)
* (mesh.edge_curl.T * (simulation.MfMui.T * (simulation.MfI.T * gh_v)))
)


return gfu_h_v

dh_v = f._hDeriv(src, du_dm_v, v, adjoint=False)
dhx_v = Phx @ dh_v
dhy_v = Phy @ dh_v
dhz_v = Phz @ dh_v
if self.orientation[1] == "x":
dh_v = -dhy_v
else:
dh_v = dhx_v

dtop_v = (
h[:, 0] * dhz_v[:, 1]
+ dh_v[:, 0] * hz[:, 1]
- h[:, 1] * dhz_v[:, 0]
- dh_v[:, 1] * hz[:, 0]
)
dbot_v = (
hx[:, 0] * dhy_v[:, 1]
+ dhx_v[:, 0] * hy[:, 1]
- hx[:, 1] * dhy_v[:, 0]
- dhx_v[:, 1] * hy[:, 0]
)

return (bot * dtop_v - top * dbot_v) / (bot * bot)


Point3DTipper._eval_tipper_deriv = _eval_tipper_deriv
Loading