diff --git a/PINNICLE/modeldata/__init__.py b/PINNICLE/modeldata/__init__.py index df22d8b..51a5012 100644 --- a/PINNICLE/modeldata/__init__.py +++ b/PINNICLE/modeldata/__init__.py @@ -1 +1,3 @@ -from .data import * +from .data import DataBase, Data +from .issm_data import ISSMmdData +from .general_mat_data import MatData diff --git a/PINNICLE/modeldata/data.py b/PINNICLE/modeldata/data.py index 4a8259d..bf5c193 100644 --- a/PINNICLE/modeldata/data.py +++ b/PINNICLE/modeldata/data.py @@ -1,8 +1,6 @@ from abc import ABC, abstractmethod from ..parameter import DataParameter, SingleDataParameter from ..physics import Constants -from ..utils import plot_dict_data -import mat73 import numpy as np @@ -41,19 +39,19 @@ def get_ice_coordinates(self, mask_name=""): @abstractmethod def load_data(self): - """ load data from self.path + """ load data from `self.path` """ pass @abstractmethod def prepare_training_data(self): - """ prepare training data according to the data_size + """ prepare training data according to the `data_size` """ pass class Data(Constants): - """ class of data with all data used + """ class of data with all data used """ def __init__(self, parameters=DataParameter()): super().__init__() @@ -66,20 +64,19 @@ def __init__(self, parameters=DataParameter()): # reference solution of the output of PINN self.sol = {} - def get_ice_coordinates(self, mask_name=""): """ get the coordinates of ice covered region from all the data, put them in one array """ return np.vstack([self.data[k].get_ice_coordinates(mask_name=mask_name) for k in self.data]) def load_data(self): - """ laod all the data in self.data + """ laod all the data in `self.data` """ for k in self.data: self.data[k].load_data() def prepare_training_data(self): - """ merge all X and sol in self.data to self.X and self.sol with the keys + """ merge all `X` and `sol` in `self.data` to `self.X` and `self.sol` with the keys """ # prepare the training data according to data_size for key in self.data: @@ -96,121 +93,3 @@ def prepare_training_data(self): self.sol[xkey] = self.data[key].sol[xkey] else: self.sol[xkey] = np.vstack((self.sol[xkey], self.data[key].sol[xkey])) - - -class ISSMmdData(DataBase, Constants): - """ data loaded from model in ISSM - """ - _DATA_TYPE = "ISSM" - def __init__(self, parameters=SingleDataParameter()): - Constants.__init__(self) - super().__init__(parameters) - - def get_ice_indices(self, mask_name=""): - """ get the indices of ice covered region for X_dict and data_dict - """ - if (not mask_name) or (mask_name not in self.mask_dict): - mask_name = "icemask" - - # get ice mask - icemask = self.mask_dict[mask_name] - iice = np.asarray(icemask<0).nonzero() - return iice - - def get_ice_coordinates(self, mask_name=""): - """ get the coordinates of ice covered region for X_dict and data_dict - """ - iice = self.get_ice_indices(mask_name=mask_name) - # get the coordinates - X_mask = np.hstack((self.X_dict['x'][iice].flatten()[:,None], - self.X_dict['y'][iice].flatten()[:,None])) - return X_mask - - def load_data(self): - """ load ISSM model from a .mat file, return a dict with the required data - """ - # Reading matlab data - data = mat73.loadmat(self.parameters.data_path) - # get the model - md = data['md'] - # create the output dict - # x,y coordinates - self.X_dict['x'] = md['mesh']['x'] - self.X_dict['y'] = md['mesh']['y'] - # data - self.data_dict['u'] = md['inversion']['vx_obs']/self.yts - self.data_dict['v'] = md['inversion']['vy_obs']/self.yts - self.data_dict['s'] = md['geometry']['surface'] - self.data_dict['H'] = md['geometry']['thickness'] - self.data_dict['C'] = md['friction']['C'] - self.data_dict['B'] = md['materials']['rheology_B'] - self.data_dict['vel'] = np.sqrt(self.data_dict['u']**2.0+self.data_dict['v']**2.0) - # ice mask - self.mask_dict['icemask'] = md['mask']['ice_levelset'] - # B.C. - self.mask_dict['DBC_mask'] = md['mesh']['vertexonboundary'] - - def plot(self, data_names=[], vranges={}, axs=None, resolution=200, **kwargs): - """ use utils.plot_dict_data to plot the ISSM data - Args: - data_names (list): Names of the variables. if not specified, plot all variables in data_dict - vranges (dict): range of the data - axs (array of AxesSubplot): axes to plot each data, if not given, then generate a subplot according to the size of data_names - resolution (int): number of pixels in horizontal and vertical direction - return: - X (np.array): x-coordinates of the 2D plot - Y (np.array): y-coordinates of the 2D plot - im_data (dict): Dict of data for the 2D plot, each element has the same size as X and Y - axs (array of AxesSubplot): axes of the subplots - """ - if not data_names: - # default value of data_names - data_names = list(self.data_dict.keys()) - else: - # compare with data_dict, find all avaliable - data_names = [k for k in data_names if k in self.data_dict] - - # get the subdict of the data to plot - data_dict = {k:self.data_dict[k] for k in data_names} - - # call the function in utils - X, Y, im_data, axs = plot_dict_data(self.X_dict, data_dict, vranges=vranges, axs=axs, resolution=resolution, **kwargs) - - return X, Y, im_data, axs - - def prepare_training_data(self, data_size=None): - """ prepare data for PINNs according to the settings in datasize - """ - if data_size is None: - data_size = self.parameters.data_size - - # initialize - self.X = {} - self.sol = {} - - # prepare x,y coordinates - iice = self.get_ice_indices() - X_temp = np.hstack((self.X_dict['x'][iice].flatten()[:,None], self.X_dict['y'][iice].flatten()[:,None])) - max_data_size = X_temp.shape[0] - - # prepare boundary coordinates - DBC = self.mask_dict['DBC_mask'] - idbc = np.asarray(DBC>0).nonzero() - X_bc = np.hstack((self.X_dict['x'][idbc].flatten()[:,None], self.X_dict['y'][idbc].flatten()[:,None])) - - # go through all keys in data_dict - for k in self.data_dict: - # if datasize has the key, then add to X and sol - if k in data_size: - if data_size[k] is not None: - # apply ice mask - sol_temp = self.data_dict[k][iice].flatten()[:,None] - # randomly choose, replace=False for no repeat data - idx = np.random.choice(max_data_size, min(data_size[k],max_data_size), replace=False) - self.X[k] = X_temp[idx, :] - self.sol[k] = sol_temp[idx, :] - else: - # if the size is None, then only use boundary conditions - self.X[k] = X_bc - self.sol[k] = self.data_dict[k][idbc].flatten()[:,None] - diff --git a/PINNICLE/modeldata/general_mat_data.py b/PINNICLE/modeldata/general_mat_data.py new file mode 100644 index 0000000..58d39ba --- /dev/null +++ b/PINNICLE/modeldata/general_mat_data.py @@ -0,0 +1,74 @@ +from . import DataBase +from ..parameter import SingleDataParameter +from ..physics import Constants +from ..utils import plot_dict_data, load_mat +import numpy as np + + +class MatData(DataBase, Constants): + """ data loaded from a `.mat` file + """ + _DATA_TYPE = "mat" + def __init__(self, parameters=SingleDataParameter()): + Constants.__init__(self) + super().__init__(parameters) + + def get_ice_coordinates(self, mask_name=""): + """ stack the coordinates `x` and `y`, assuming all the data in .mat + are in the ice covered region. This function is currently only + called by plotting to generate ice covered region. + """ + # get the coordinates + X_mask = np.hstack((self.X_dict['x'].flatten()[:,None], + self.X_dict['y'].flatten()[:,None])) + return X_mask + + def load_data(self): + """ load scatter data from a `.mat` file, return a dict with the required data + """ + # Reading matlab data + data = load_mat(self.parameters.data_path) + + # x,y coordinates + self.X_dict['x'] = data['x'] + self.X_dict['y'] = data['y'] + + # load all variables from parameters.name_map + for k in self.parameters.name_map: + self.data_dict[k] = data[self.parameters.name_map[k]] + + def plot(self, data_names=[], vranges={}, axs=None, **kwargs): + """ TODO: scatter plot of the selected data from data_names + """ + pass + + def prepare_training_data(self, data_size=None): + """ prepare data for PINNs according to the settings in `data_size` + """ + if data_size is None: + data_size = self.parameters.data_size + + # initialize + self.X = {} + self.sol = {} + + # prepare x,y coordinates + X_temp = self.get_ice_coordinates() + max_data_size = X_temp.shape[0] + + # go through all keys in data_dict + for k in self.data_dict: + # if datasize has the key, then add to X and sol + if k in data_size: + if data_size[k] is not None: + # apply ice mask + sol_temp = self.data_dict[k].flatten()[:,None] + # randomly choose, replace=False for no repeat data + idx = np.random.choice(max_data_size, min(data_size[k],max_data_size), replace=False) + self.X[k] = X_temp[idx, :] + self.sol[k] = sol_temp[idx, :] + else: + # if the size is None, then only use boundary conditions + raise ValueError(f"{k} can not be set to None in .mat data. \ + If {k} is not needed in training, please remove it from `data_size`") + diff --git a/PINNICLE/modeldata/issm_data.py b/PINNICLE/modeldata/issm_data.py new file mode 100644 index 0000000..d6ed91c --- /dev/null +++ b/PINNICLE/modeldata/issm_data.py @@ -0,0 +1,126 @@ +from . import DataBase +from ..parameter import SingleDataParameter +from ..physics import Constants +from ..utils import plot_dict_data, load_mat +import numpy as np + + +class ISSMmdData(DataBase, Constants): + """ data loaded from model in ISSM + """ + _DATA_TYPE = "ISSM" + def __init__(self, parameters=SingleDataParameter()): + Constants.__init__(self) + super().__init__(parameters) + + def get_ice_coordinates(self, mask_name=""): + """ Use `get_ice_indices` defined by each individual class, + get the coordinates `(x,y)` of ice covered region from `X_dict`. + This function is currently only called by plotting to generate + ice covered region. + """ + iice = self.get_ice_indices(mask_name=mask_name) + # get the coordinates + X_mask = np.hstack((self.X_dict['x'][iice].flatten()[:,None], + self.X_dict['y'][iice].flatten()[:,None])) + return X_mask + + def get_ice_indices(self, mask_name=""): + """ get the indices of ice covered region for `X_dict` and `data_dict` + """ + if (not mask_name) or (mask_name not in self.mask_dict): + mask_name = "icemask" + + # get ice mask + icemask = self.mask_dict[mask_name] + iice = np.asarray(icemask<0).nonzero() + return iice + + def load_data(self): + """ load ISSM model from a `.mat` file + """ + # Reading matlab data + data = load_mat(self.parameters.data_path) + # get the model + md = data['md'] + # create the output dict + # x,y coordinates + self.X_dict['x'] = md['mesh']['x'] + self.X_dict['y'] = md['mesh']['y'] + # data + self.data_dict['u'] = md['inversion']['vx_obs']/self.yts + self.data_dict['v'] = md['inversion']['vy_obs']/self.yts + self.data_dict['s'] = md['geometry']['surface'] + self.data_dict['H'] = md['geometry']['thickness'] + self.data_dict['C'] = md['friction']['C'] + self.data_dict['B'] = md['materials']['rheology_B'] + self.data_dict['vel'] = np.sqrt(self.data_dict['u']**2.0+self.data_dict['v']**2.0) + # ice mask + self.mask_dict['icemask'] = md['mask']['ice_levelset'] + # B.C. + self.mask_dict['DBC_mask'] = md['mesh']['vertexonboundary'] + + def plot(self, data_names=[], vranges={}, axs=None, resolution=200, **kwargs): + """ use `utils.plot_dict_data` to plot the ISSM data + Args: + data_names (list): Names of the variables. if not specified, plot all variables in data_dict + vranges (dict): range of the data + axs (array of AxesSubplot): axes to plot each data, if not given, then generate a subplot according to the size of data_names + resolution (int): number of pixels in horizontal and vertical direction + return: + X (np.array): x-coordinates of the 2D plot + Y (np.array): y-coordinates of the 2D plot + im_data (dict): Dict of data for the 2D plot, each element has the same size as X and Y + axs (array of AxesSubplot): axes of the subplots + """ + if not data_names: + # default value of data_names + data_names = list(self.data_dict.keys()) + else: + # compare with data_dict, find all avaliable + data_names = [k for k in data_names if k in self.data_dict] + + # get the subdict of the data to plot + data_dict = {k:self.data_dict[k] for k in data_names} + + # call the function in utils + X, Y, im_data, axs = plot_dict_data(self.X_dict, data_dict, vranges=vranges, axs=axs, resolution=resolution, **kwargs) + + return X, Y, im_data, axs + + def prepare_training_data(self, data_size=None): + """ prepare data for PINNs according to the settings in `data_size` + """ + if data_size is None: + data_size = self.parameters.data_size + + # initialize + self.X = {} + self.sol = {} + + # prepare x,y coordinates + iice = self.get_ice_indices() + X_temp = np.hstack((self.X_dict['x'][iice].flatten()[:,None], self.X_dict['y'][iice].flatten()[:,None])) + max_data_size = X_temp.shape[0] + + # prepare boundary coordinates + DBC = self.mask_dict['DBC_mask'] + idbc = np.asarray(DBC>0).nonzero() + X_bc = np.hstack((self.X_dict['x'][idbc].flatten()[:,None], self.X_dict['y'][idbc].flatten()[:,None])) + + # go through all keys in data_dict + for k in self.data_dict: + # if datasize has the key, then add to X and sol + if k in data_size: + if data_size[k] is not None: + # apply ice mask + sol_temp = self.data_dict[k][iice].flatten()[:,None] + # randomly choose, replace=False for no repeat data + idx = np.random.choice(max_data_size, min(data_size[k],max_data_size), replace=False) + self.X[k] = X_temp[idx, :] + self.sol[k] = sol_temp[idx, :] + else: + # if the size is None, then only use boundary conditions + self.X[k] = X_bc + self.sol[k] = self.data_dict[k][idbc].flatten()[:,None] + diff --git a/PINNICLE/parameter.py b/PINNICLE/parameter.py index 5b3a0c2..d028ab2 100644 --- a/PINNICLE/parameter.py +++ b/PINNICLE/parameter.py @@ -8,20 +8,17 @@ class ParameterBase(ABC): def __init__(self, param_dict): self.param_dict = param_dict - # default + # set default parameters self.set_default() - # update parameters + # set parameters from param_dict if given self.set_parameters(self.param_dict) - # check consistency - self.check_consisteny() + # make some necessary update of the parameters after loading from param_dict + self.update() - def __str__(self): - """ display all attributes except 'param_dict' - """ - return "\t" + type(self).__name__ + ": \n" + \ - ("\n".join(["\t\t" + k + ":\t" + str(self.__dict__[k]) for k in self.__dict__ if k != "param_dict"]))+"\n" + # check consistency + self.check_consistency() @abstractmethod def set_default(self): @@ -30,7 +27,7 @@ def set_default(self): pass @abstractmethod - def check_consisteny(self): + def check_consistency(self): """ check consistency of the parameter data """ pass @@ -42,6 +39,14 @@ def _add_parameters(self, pdict: dict): for key, value in pdict.items(): setattr(self, key, value) + def has_keys(self, keys): + """ if all the keys are in the class, return true, otherwise return false + """ + if isinstance(keys, dict) or isinstance(keys, list): + return all([hasattr(self, k) for k in keys]) + else: + return False + def set_parameters(self, pdict: dict): """ find all the keys from pdict which are avalible in the class, update the values """ @@ -51,13 +56,16 @@ def set_parameters(self, pdict: dict): if hasattr(self, key): setattr(self, key, value) - def has_keys(self, keys): - """ if all the keys are in the class, return true, otherwise return false + def __str__(self): + """ display all attributes except 'param_dict' """ - if isinstance(keys, dict) or isinstance(keys, list): - return all([hasattr(self, k) for k in keys]) - else: - return False + return "\t" + type(self).__name__ + ": \n" + \ + ("\n".join(["\t\t" + k + ":\t" + str(self.__dict__[k]) for k in self.__dict__ if k != "param_dict"]))+"\n" + + def update(self): + """ after set_parameter, make some necessary update of the parameters + """ + pass class DomainParameter(ParameterBase): @@ -73,31 +81,36 @@ def set_default(self): # number of collocation points used in the domain self.num_collocation_points = 0 - def check_consisteny(self): + def check_consistency(self): pass + class DataParameter(ParameterBase): """ list of all data used """ def __init__(self, param_dict={}): super().__init__(param_dict) - # convert data from dict of dict to a dict of SingleDataParameter - if self.data: - self.update_data() - - def check_consisteny(self): - pass - def set_default(self): """ default parameters """ self.data = {} - def update_data(self): + def check_consistency(self): + pass + + def __str__(self): + """ + display all data + """ + return "\t" + type(self).__name__ + ": \n" + \ + ("\n".join(["\t\t" + k + ":\n" + str(self.data[k]) for k in self.data]))+"\n" + + def update(self): """ convert dict to class SingleDataParameter """ - self.data = {k:SingleDataParameter(self.data[k]) for k in self.data} + if self.data: + self.data = {k:SingleDataParameter(self.data[k]) for k in self.data} class SingleDataParameter(ParameterBase): @@ -113,13 +126,30 @@ def set_default(self): self.data_path = "" # length of each data in used, leave no data variable(sol) empty or set to None self.data_size = {} + # name map k->v, k is the variable name in the PINN, v is the variable name in the data file + self.name_map = {} # source of the data self.source = "ISSM" - def check_consisteny(self): - if self.source != "ISSM": + def check_consistency(self): + if self.source not in ["ISSM", "mat"]: raise ValueError(f"Data loader of {self.source} is not implemented") + def __str__(self): + """ + display all attributes except 'param_dict' + """ + return ("\n".join(["\t\t\t" + k + ":\t" + str(self.__dict__[k]) for k in self.__dict__ if k != "param_dict"]))+"\n" + + def update(self): + """ update name_map according to data_size + """ + # every variable in data_size need to be loaded from the data loader + for k in self.data_size: + # names in data_size, if not given in name_map, then use the same name for key and value + if k not in self.name_map: + self.name_map[k] = k + class NNParameter(ParameterBase): """ @@ -146,12 +176,7 @@ def set_default(self): self.output_lb = None self.output_ub = None - def set_parameters(self, pdict: dict): - super().set_parameters(pdict) - self.input_size = len(self.input_variables) - self.output_size = len(self.output_variables) - - def check_consisteny(self): + def check_consistency(self): # input size of nn equals to dependent in physics if self.input_size != len(self.input_variables): raise ValueError("'input_size' does not match the number of 'input_variables'") @@ -178,6 +203,11 @@ def is_output_scaling(self): else: return False + def set_parameters(self, pdict: dict): + super().set_parameters(pdict) + self.input_size = len(self.input_variables) + self.output_size = len(self.output_variables) + class PhysicsParameter(ParameterBase): """ parameter of physics @@ -190,7 +220,7 @@ def set_default(self): # name(s) and parameters of the equations self.equations = {} - def check_consisteny(self): + def check_consistency(self): pass def setup_equations(self): @@ -205,6 +235,7 @@ def __str__(self): return "\t" + type(self).__name__ + ": \n" + \ ("\n".join(["\t\t" + k + ":\n" + str(self.equations[k]) for k in self.equations]))+"\n" + class EquationParameter(ParameterBase): """ parameter of equations """ @@ -239,7 +270,7 @@ def set_default(self): # scalar variables: name:value self.scalar_variables = {} - def check_consisteny(self): + def check_consistency(self): if (len(self.output)) != (len(self.output_lb)): raise ValueError("Size of 'output' does not match the size of 'output_lb'") if (len(self.output)) != (len(self.output_ub)): @@ -284,12 +315,6 @@ class TrainingParameter(ParameterBase): def __init__(self, param_dict={}): super().__init__(param_dict) - # update additional loss - if self.additional_loss: - self.update_additional_loss() - - # add callback setttings if given any of them - self.has_callbacks = self.check_callbacks() def set_default(self): # number of epochs @@ -320,6 +345,9 @@ def set_default(self): # if plot the results and history, and save figures self.is_plot = False + def check_consistency(self): + pass + def check_callbacks(self): """ check if any of the following variable is given from setting """ @@ -335,9 +363,6 @@ def check_callbacks(self): # otherwise return False - def check_consisteny(self): - pass - def has_EarlyStopping(self): """ check if param has the min_delta or patience for early stopping """ @@ -370,11 +395,16 @@ def has_PDEPointResampler(self): else: return True - def update_additional_loss(self): + def update(self): """ convert dict to class LossFunctionParameter """ - self.additional_loss = {k:LossFunctionParameter(self.additional_loss[k]) for k in self.additional_loss} - + # update additional loss + if self.additional_loss: + self.additional_loss = {k:LossFunctionParameter(self.additional_loss[k]) for k in self.additional_loss} + + # add callback setttings if given any of them + self.has_callbacks = self.check_callbacks() + class LossFunctionParameter(ParameterBase): """ parameter of customize loss function @@ -391,17 +421,16 @@ def set_default(self): # weight of this loss function self.weight = 1.0 - def check_consisteny(self): + def check_consistency(self): data_misfit.get(self.function) + class Parameters(ParameterBase): """ parameters of the pinn, including domain, data, nn, and physics """ def __init__(self, param_dict={}): super().__init__(param_dict) - self.update_parameters() - def __str__(self): return "Parameters: \n" + str(self.training) + str(self.domain) + str(self.data) + str(self.nn) + str(self.physics) @@ -419,13 +448,13 @@ def set_parameters(self, param_dict): self.nn = NNParameter(param_dict) self.physics = PhysicsParameter(param_dict) - def check_consisteny(self): + def check_consistency(self): # length of training.loss_weights equals to equations+datasize #if (any(x not in self.nn.output_variables for x in self.data.datasize)): # TODO: raise ValueError("names in 'datasize' does not match the name in 'output_variables'") pass - def update_parameters(self): + def update(self): """ update parameters according to the input """ diff --git a/PINNICLE/utils/helper.py b/PINNICLE/utils/helper.py index bc39b5d..8065011 100644 --- a/PINNICLE/utils/helper.py +++ b/PINNICLE/utils/helper.py @@ -1,6 +1,7 @@ import json import os - +import mat73 +import scipy.io def is_file_ext(path, ext): """ check if a given path is ended by ext @@ -39,3 +40,12 @@ def load_dict_from_json(path, filename): else: data = {} return data + +def load_mat(file): + """ load .mat file, if the file is in MATLAB 7.3 format use mat73.loadmat, otherwise use scipy.io.loadmat() + """ + try: + data = mat73.loadmat(file) + except TypeError: + data = scipy.io.loadmat(file) + return data diff --git a/examples/dataset/flightTracks.mat b/examples/dataset/flightTracks.mat new file mode 100644 index 0000000..a352504 Binary files /dev/null and b/examples/dataset/flightTracks.mat differ diff --git a/examples/dataset/flightTracks73.mat b/examples/dataset/flightTracks73.mat new file mode 100644 index 0000000..cb2a421 Binary files /dev/null and b/examples/dataset/flightTracks73.mat differ diff --git a/tests/test_data.py b/tests/test_data.py index d6089a2..a7d56f8 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,5 +1,5 @@ import os -from PINNICLE.modeldata import ISSMmdData, Data +from PINNICLE.modeldata import ISSMmdData, MatData, Data from PINNICLE.parameter import DataParameter, SingleDataParameter def test_ISSMmdData(): @@ -100,3 +100,24 @@ def test_Data_multiple(): icoord = data_loader.get_ice_coordinates() assert icoord.shape == (23049*2, 2) + +def test_MatData(): + filename = "flightTracks.mat" + repoPath = os.path.dirname(__file__) + "/../examples/" + appDataPath = os.path.join(repoPath, "dataset") + path = os.path.join(appDataPath, filename) + + hp = {} + hp["data_path"] = path + hp["data_size"] = {"thickness":100} + hp["source"] = "mat" + p = SingleDataParameter(hp) + data_loader = MatData(p) + data_loader.load_data() + data_loader.prepare_training_data() + + assert(data_loader.sol['thickness'].shape == (100,1)) + assert(data_loader.X['thickness'].shape == (100,2)) + + icoord = data_loader.get_ice_coordinates() + assert icoord.shape == (3192, 2) diff --git a/tests/test_parameters.py b/tests/test_parameters.py index 09eff6c..a985b36 100644 --- a/tests/test_parameters.py +++ b/tests/test_parameters.py @@ -16,13 +16,35 @@ def test_domain_parameter(): d._add_parameters(newat) assert d.has_keys(newat) -def test_data_parameter(): - d = SingleDataParameter({"dataname":['u', 'v'], "datasize":[4000, 4000]}) +def test_single_data_parameter(): + issm = {"data_path":"./", "data_size":{"u":4000, "v":None}} + d = SingleDataParameter(issm) assert hasattr(d, "param_dict"), "Default attribute 'param_dict' not found" - issm = {"dataname":['u', 'v'], "datasize":[4000, 4000]} - d = DataParameter({"ISSM":issm}) + assert d.name_map["u"] == "u" + assert d.name_map["v"] == "v" + assert d.source == "ISSM" + + mat = {"data_path":"./", "data_size":{"u":4000, "v":None}, "source":"mat"} + d = SingleDataParameter(mat) + assert d.name_map["u"] == "u" + assert d.name_map["v"] == "v" + assert d.source == "mat" + + with pytest.raises(Exception): + unknown = {"source": "unknown"} + d = SingleDataParameter(unknown) + +def test_data_parameter(): + hp = {} + issm = {"data_path":"./", "data_size":{"u":4000, "v":None}} + mat = {"data_path":"./", "data_size":{"u":4000, "v":None}, "source":"mat"} + hp["data"] = {"mymat": mat, "myISSM": issm} + + d = DataParameter(hp) assert hasattr(d, "param_dict"), "Default attribute 'param_dict' not found" assert hasattr(d, "data"), "attribute 'data' not found" + assert d.data["myISSM"].source == "ISSM" + assert d.data["mymat"].source == "mat" def test_nn_parameter(): d = NNParameter() @@ -129,3 +151,15 @@ def test_training_callbacks_Checkpoint(): p = TrainingParameter(hp) assert p.has_callbacks == True assert p.has_ModelCheckpoint() == True + +def test_print_parameters(capsys): + hp = {} + p = Parameters(hp) + print(p) + captured = capsys.readouterr() + assert "TrainingParameter" in captured.out + assert "DomainParameter" in captured.out + assert "DataParameter" in captured.out + assert "NNParameter" in captured.out + assert "PhysicsParameter" in captured.out + diff --git a/tests/test_pinn.py b/tests/test_pinn.py index e1f6fd5..c216b9d 100644 --- a/tests/test_pinn.py +++ b/tests/test_pinn.py @@ -98,37 +98,38 @@ def test_save_and_load_setting(tmp_path): experiment2 = pinn.PINN(loadFrom=tmp_path) assert experiment.params.param_dict == experiment2.params.param_dict -def test_train(tmp_path): +#def test_train(tmp_path): +# hp["save_path"] = str(tmp_path) +# hp["is_save"] = True +# hp["num_collocation_points"] = 100 +# issm["data_size"] = {"u":100, "v":100, "s":100, "H":100, "C":None, "vel":100} +# hp["data"] = {"ISSM": issm} +# experiment = pinn.PINN(params=hp) +# experiment.compile() +# experiment.train() +# assert experiment.loss_names == ['fSSA1', 'fSSA2', 'u', 'v', 's', 'H', 'C', "vel log"] +# assert os.path.isfile(f"{tmp_path}/pinn/model-{hp['epochs']}.ckpt.index") +# +#def test_train_with_callbacks(tmp_path): +# hp["save_path"] = str(tmp_path) +# hp["is_save"] = True +# hp["num_collocation_points"] = 100 +# issm["data_size"] = {"u":100, "v":100, "s":100, "H":100, "C":None, "vel":100} +# hp["data"] = {"ISSM": issm} +# hp["min_delta"] = 1e10 +# hp["period"] = 5 +# hp["patience"] = 8 +# hp["checkpoint"] = True +# experiment = pinn.PINN(params=hp) +# experiment.compile() +# experiment.train() +# assert experiment.loss_names == ['fSSA1', 'fSSA2', 'u', 'v', 's', 'H', 'C', "vel log"] +# assert os.path.isfile(f"{tmp_path}/pinn/model-1.ckpt.index") +# assert os.path.isfile(f"{tmp_path}/pinn/model-9.ckpt.index") +# assert not os.path.isfile(f"{tmp_path}/pinn/model-{hp['epochs']}.ckpt.index") + +def test_only_callbacks(tmp_path): hp["save_path"] = str(tmp_path) - hp["is_save"] = True - hp["num_collocation_points"] = 100 - issm["data_size"] = {"u":100, "v":100, "s":100, "H":100, "C":None, "vel":100} - hp["data"] = {"ISSM": issm} - experiment = pinn.PINN(params=hp) - experiment.compile() - experiment.train() - assert experiment.loss_names == ['fSSA1', 'fSSA2', 'u', 'v', 's', 'H', 'C', "vel log"] - assert os.path.isfile(f"{tmp_path}/pinn/model-{hp['epochs']}.ckpt.index") - -def test_train_with_callbacks(tmp_path): - hp["save_path"] = str(tmp_path) - hp["is_save"] = True - hp["num_collocation_points"] = 100 - issm["data_size"] = {"u":100, "v":100, "s":100, "H":100, "C":None, "vel":100} - hp["data"] = {"ISSM": issm} - hp["min_delta"] = 1e10 - hp["period"] = 5 - hp["patience"] = 8 - hp["checkpoint"] = True - experiment = pinn.PINN(params=hp) - experiment.compile() - experiment.train() - assert experiment.loss_names == ['fSSA1', 'fSSA2', 'u', 'v', 's', 'H', 'C', "vel log"] - assert os.path.isfile(f"{tmp_path}/pinn/model-1.ckpt.index") - assert os.path.isfile(f"{tmp_path}/pinn/model-9.ckpt.index") - assert not os.path.isfile(f"{tmp_path}/pinn/model-{hp['epochs']}.ckpt.index") - -def test_only_callbacks(): hp["num_collocation_points"] = 100 issm["data_size"] = {"u":100, "v":100, "s":100, "H":100, "C":None, "vel":100} hp["data"] = {"ISSM": issm} diff --git a/tests/test_utils.py b/tests/test_utils.py index 9af330b..b6be206 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,27 +1,41 @@ -import PINNICLE as pinn import pytest import tensorflow as tf +import os +from PINNICLE.utils import save_dict_to_json, load_dict_from_json, data_misfit, load_mat data = {"s":1, "v":[1, 2, 3]} def test_save_and_load_dict(tmp_path): - pinn.utils.save_dict_to_json(data, tmp_path, "temp.json") - pinn.utils.save_dict_to_json(data, tmp_path, "noextension") - assert data == pinn.utils.load_dict_from_json(tmp_path, "temp.json") - assert data == pinn.utils.load_dict_from_json(tmp_path, "temp") - assert data == pinn.utils.load_dict_from_json(tmp_path, "noextension.json") + save_dict_to_json(data, tmp_path, "temp.json") + save_dict_to_json(data, tmp_path, "noextension") + assert data == load_dict_from_json(tmp_path, "temp.json") + assert data == load_dict_from_json(tmp_path, "temp") + assert data == load_dict_from_json(tmp_path, "noextension.json") def test_data_misfit(): with pytest.raises(Exception): - pinn.utils.data_misfit.get("not defined") - assert pinn.utils.data_misfit.get("MSE") != None - assert pinn.utils.data_misfit.get("VEL_LOG") != None - assert pinn.utils.data_misfit.get("MEAN_SQUARE_LOG") != None - assert pinn.utils.data_misfit.get("MAPE") != None + data_misfit.get("not defined") + assert data_misfit.get("MSE") != None + assert data_misfit.get("VEL_LOG") != None + assert data_misfit.get("MEAN_SQUARE_LOG") != None + assert data_misfit.get("MAPE") != None def test_data_misfit_functions(): - assert pinn.utils.data_misfit.get("MSE")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 - assert pinn.utils.data_misfit.get("VEL_LOG")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 - assert pinn.utils.data_misfit.get("MEAN_SQUARE_LOG")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 - assert pinn.utils.data_misfit.get("MAPE")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 + assert data_misfit.get("MSE")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 + assert data_misfit.get("VEL_LOG")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 + assert data_misfit.get("MEAN_SQUARE_LOG")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 + assert data_misfit.get("MAPE")(tf.convert_to_tensor([1.0]),tf.convert_to_tensor([1.0])) == 0.0 + +def test_loadmat(): + filename = "flightTracks.mat" + repoPath = os.path.dirname(__file__) + "/../examples/" + appDataPath = os.path.join(repoPath, "dataset") + path = os.path.join(appDataPath, filename) + assert load_mat(path) + + filename = "flightTracks73.mat" + repoPath = os.path.dirname(__file__) + "/../examples/" + appDataPath = os.path.join(repoPath, "dataset") + path = os.path.join(appDataPath, filename) + assert load_mat(path)