Skip to content

Commit

Permalink
Extended transpose, and tensor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lucas-wilkins committed Oct 21, 2024
1 parent b202e21 commit 76a5626
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 21 deletions.
152 changes: 152 additions & 0 deletions sasdata/quantities/math_operations_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
""" Tests for math operations """

import pytest

import numpy as np
from sasdata.quantities.quantity import NamedQuantity, tensordot, transpose
from sasdata.quantities import units

order_list = [
[0, 1, 2, 3],
[0, 2, 1],
[1, 0],
[0, 1],
[2, 0, 1],
[3, 1, 2, 0]
]

@pytest.mark.parametrize("order", order_list)
def test_transpose_raw(order: list[int]):
""" Check that the transpose operation changes the order of indices correctly - uses sizes as way of tracking"""

input_shape = tuple([i+1 for i in range(len(order))])
expected_shape = tuple([i+1 for i in order])

input_mat = np.zeros(input_shape)

measured_mat = transpose(input_mat, axes=tuple(order))

assert measured_mat.shape == expected_shape


@pytest.mark.parametrize("order", order_list)
def test_transpose_raw(order: list[int]):
""" Check that the transpose operation changes the order of indices correctly - uses sizes as way of tracking"""
input_shape = tuple([i + 1 for i in range(len(order))])
expected_shape = tuple([i + 1 for i in order])

input_mat = NamedQuantity("testmat", np.zeros(input_shape), units=units.none)

measured_mat = transpose(input_mat, axes=tuple(order))

assert measured_mat.value.shape == expected_shape


rng_seed = 1979
tensor_product_with_identity_sizes = (4,6,5)

@pytest.mark.parametrize("index, size", [tup for tup in enumerate(tensor_product_with_identity_sizes)])
def test_tensor_product_with_identity_quantities(index, size):
""" Check the correctness of the tensor product by multiplying by the identity (quantity, quantity)"""
np.random.seed(rng_seed)

x = NamedQuantity("x", np.random.rand(*tensor_product_with_identity_sizes), units=units.meters)
y = NamedQuantity("y", np.eye(size), units.seconds)

z = tensordot(x, y, index, 0)

# Check units
assert z.units == units.meters * units.seconds

# Expected sizes - last index gets moved to end
output_order = [i for i in (0, 1, 2) if i != index] + [index]
output_sizes = [tensor_product_with_identity_sizes[i] for i in output_order]

assert z.value.shape == tuple(output_sizes)

# Restore original order and check
reverse_order = [-1, -1, -1]
for to_index, from_index in enumerate(output_order):
reverse_order[from_index] = to_index

z_reordered = transpose(z, axes = tuple(reverse_order))

assert z_reordered.value.shape == tensor_product_with_identity_sizes

# Check values

mat_in = x.in_si()
mat_out = transpose(z, axes=tuple(reverse_order)).in_si()

assert np.all(np.abs(mat_in - mat_out) < 1e-10)


@pytest.mark.parametrize("index, size", [tup for tup in enumerate(tensor_product_with_identity_sizes)])
def test_tensor_product_with_identity_quantity_matrix(index, size):
""" Check the correctness of the tensor product by multiplying by the identity (quantity, matrix)"""
np.random.seed(rng_seed)

x = NamedQuantity("x", np.random.rand(*tensor_product_with_identity_sizes), units.meters)
y = np.eye(size)

z = tensordot(x, y, index, 0)

assert z.units == units.meters

# Expected sizes - last index gets moved to end
output_order = [i for i in (0, 1, 2) if i != index] + [index]
output_sizes = [tensor_product_with_identity_sizes[i] for i in output_order]

assert z.value.shape == tuple(output_sizes)

# Restore original order and check
reverse_order = [-1, -1, -1]
for to_index, from_index in enumerate(output_order):
reverse_order[from_index] = to_index

z_reordered = transpose(z, axes = tuple(reverse_order))

assert z_reordered.value.shape == tensor_product_with_identity_sizes

# Check values

mat_in = x.in_si()
mat_out = transpose(z, axes=tuple(reverse_order)).in_si()

assert np.all(np.abs(mat_in - mat_out) < 1e-10)


@pytest.mark.parametrize("index, size", [tup for tup in enumerate(tensor_product_with_identity_sizes)])
def test_tensor_product_with_identity_matrix_quantity(index, size):
""" Check the correctness of the tensor product by multiplying by the identity (matrix, quantity)"""
np.random.seed(rng_seed)

x = np.random.rand(*tensor_product_with_identity_sizes)
y = NamedQuantity("y", np.eye(size), units.seconds)

z = tensordot(x, y, index, 0)

assert z.units == units.seconds


# Expected sizes - last index gets moved to end
output_order = [i for i in (0, 1, 2) if i != index] + [index]
output_sizes = [tensor_product_with_identity_sizes[i] for i in output_order]

assert z.value.shape == tuple(output_sizes)

# Restore original order and check
reverse_order = [-1, -1, -1]
for to_index, from_index in enumerate(output_order):
reverse_order[from_index] = to_index

z_reordered = transpose(z, axes = tuple(reverse_order))

assert z_reordered.value.shape == tensor_product_with_identity_sizes

# Check values

mat_in = x
mat_out = transpose(z, axes=tuple(reverse_order)).in_si()

assert np.all(np.abs(mat_in - mat_out) < 1e-10)
80 changes: 59 additions & 21 deletions sasdata/quantities/quantity.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@

################### Quantity based operations, need to be here to avoid cyclic dependencies #####################


def transpose(a: Union["Quantity[ArrayLike]", ArrayLike]):
""" Transpose an array or an array based quantity """
def transpose(a: Union["Quantity[ArrayLike]", ArrayLike], axes: tuple | None = None):
""" Transpose an array or an array based quantity, can also do reordering of axes"""
if isinstance(a, Quantity):
return DerivedQuantity(value=np.transpose(a.value),
units=a.units,
history=QuantityHistory.apply_operation(Transpose, a.history))

if axes is None:
return DerivedQuantity(value=np.transpose(a.value, axes=axes),
units=a.units,
history=QuantityHistory.apply_operation(Transpose, a.history))

else:
return DerivedQuantity(value=np.transpose(a.value, axes=axes),
units=a.units,
history=QuantityHistory.apply_operation(Transpose, a.history, axes=axes))

else:
return np.transpose(a)
return np.transpose(a, axes=axes)


def dot(a: Union["Quantity[ArrayLike]", ArrayLike], b: Union["Quantity[ArrayLike]", ArrayLike]):
""" Dot product of two arrays or two array based quantities """
Expand All @@ -43,10 +51,10 @@ def dot(a: Union["Quantity[ArrayLike]", ArrayLike], b: Union["Quantity[ArrayLike
# If its only one of them that is a quantity, convert the other one

if not a_is_quantity:
a = Quantity(a, units.dimensionless)
a = Quantity(a, units.none)

if not b_is_quantity:
b = Quantity(b, units.dimensionless)
b = Quantity(b, units.none)

return DerivedQuantity(
value=np.dot(a.value, b.value),
Expand All @@ -57,6 +65,18 @@ def dot(a: Union["Quantity[ArrayLike]", ArrayLike], b: Union["Quantity[ArrayLike
return np.dot(a, b)

def tensordot(a: Union["Quantity[ArrayLike]", ArrayLike] | ArrayLike, b: Union["Quantity[ArrayLike]", ArrayLike], a_index: int, b_index: int):
""" Tensor dot product - equivalent to contracting two tensors, such as
A_{i0, i1, i2, i3...} and B_{j0, j1, j2...}
e.g. if a_index is 1 and b_index is zero, it will be the sum
C_{i0, i2, i3 ..., j1, j2 ...} = sum_k A_{i0, k, i2, i3 ...} B_{k, j1, j2 ...}
(I think, have to check what happens with indices TODO!)
"""

a_is_quantity = isinstance(a, Quantity)
b_is_quantity = isinstance(b, Quantity)

Expand All @@ -65,10 +85,10 @@ def tensordot(a: Union["Quantity[ArrayLike]", ArrayLike] | ArrayLike, b: Union["
# If its only one of them that is a quantity, convert the other one

if not a_is_quantity:
a = Quantity(a, units.dimensionless)
a = Quantity(a, units.none)

if not b_is_quantity:
b = Quantity(b, units.dimensionless)
b = Quantity(b, units.none)

return DerivedQuantity(
value=np.tensordot(a.value, b.value, axes=(a_index, b_index)),
Expand Down Expand Up @@ -791,11 +811,15 @@ def __eq__(self, other):
# Matrix operations
#

class Transpose(UnaryOperation):
class Transpose(Operation):
""" Transpose operation - as per numpy"""

serialisation_name = "transpose"

def __init__(self, a: Operation, axes: tuple[int] | None = None):
self.a = a
self.axes = axes

def evaluate(self, variables: dict[int, T]) -> T:
return np.transpose(self.a.evaluate(variables))

Expand All @@ -806,9 +830,27 @@ def _clean(self):
clean_a = self.a._clean()
return Transpose(clean_a)


def _serialise_parameters(self) -> dict[str, Any]:
if self.axes is None:
return { "a": self.a._serialise_json() }
else:
return {
"a": self.a._serialise_json(),
"axes": list(self.axes)
}


@staticmethod
def _deserialise(parameters: dict) -> "Operation":
return Transpose(Operation.deserialise_json(parameters["a"]))
if "axes" in parameters:
return Transpose(
a=Operation.deserialise_json(parameters["a"]),
axes=tuple(parameters["axes"]))
else:
return Transpose(
a=Operation.deserialise_json(parameters["a"]))


def _summary_open(self):
return "Transpose"
Expand Down Expand Up @@ -974,6 +1016,10 @@ def jacobian(self) -> list[Operation]:
# Use the hash value to specify the variable of differentiation
return [self.operation_tree.derivative(key) for key in self.reference_key_list]

def _recalculate(self):
""" Recalculate the value of this object - primary use case is for testing """
return self.operation_tree.evaluate(self.references)

def variance_propagate(self, quantity_units: Unit, covariances: dict[tuple[int, int]: "Quantity"] = {}):
""" Do standard error propagation to calculate the uncertainties associated with this quantity
Expand All @@ -985,14 +1031,6 @@ def variance_propagate(self, quantity_units: Unit, covariances: dict[tuple[int,
raise NotImplementedError("User specified covariances not currently implemented")

jacobian = self.jacobian()
# jacobian_units = [quantity_units / self.references[key].units for key in self.reference_key_list]
#
# # Evaluate the jacobian
# # TODO: should we use quantities here, does that work automatically?
# evaluated_jacobian = [Quantity(
# value=entry.evaluate(self.si_reference_values),
# units=unit.si_equivalent())
# for entry, unit in zip(jacobian, jacobian_units)]

evaluated_jacobian = [entry.evaluate(self.references) for entry in jacobian]

Expand Down

0 comments on commit 76a5626

Please sign in to comment.