diff --git a/hydrogym/core.py b/hydrogym/core.py index 1f0b499..9ab3770 100644 --- a/hydrogym/core.py +++ b/hydrogym/core.py @@ -312,15 +312,15 @@ def solve( cb.close() return flow - + def solve_multistep( - self, - num_substeps: int, - callbacks: Iterable[CallbackBase] = [], - controller: Callable = None, - start_iteration_value: int = 0, - ) -> PDEBase: - """Solve the initial-value problem for the PDE. + self, + num_substeps: int, + callbacks: Iterable[CallbackBase] = [], + controller: Callable = None, + start_iteration_value: int = 0, + ) -> PDEBase: + """Solve the initial-value problem for the PDE. Args: t_span (Tuple[float, float]): Tuple of start and end times @@ -332,22 +332,22 @@ def solve_multistep( Returns: PDEBase: The state of the PDE at the end of the solve """ - for iter in range(num_substeps): - iter = iter + start_iteration_value - t = iter * self.dt - if controller is not None: - y = self.flow.get_observations() - u = controller(t, y) - else: - u = None - flow = self.step(iter, control=u) - for cb in callbacks: - cb(iter, t, flow) - - for cb in callbacks: - cb.close() - - return flow + for iter in range(num_substeps): + iter = iter + start_iteration_value + t = iter * self.dt + if controller is not None: + y = self.flow.get_observations() + u = controller(t, y) + else: + u = None + flow = self.step(iter, control=u) + for cb in callbacks: + cb(iter, t, flow) + + for cb in callbacks: + cb.close() + + return flow def step(self, iter: int, control: Iterable[float] = None, **kwargs): """Advance the transient simulation by one time step @@ -368,37 +368,17 @@ class FlowEnv(gym.Env): def __init__(self, env_config: dict): self.flow: PDEBase = env_config.get("flow")( **env_config.get("flow_config", {})) - self.solver: TransientSolver = env_config.get("solver")( self.flow, **env_config.get("solver_config", {})) self.callbacks: Iterable[CallbackBase] = env_config.get("callbacks", []) - self.rewardLogCallback: Iterable[CallbackBase] = env_config.get("actuation_config", {}).get("rewardLogCallback", None) + self.rewardLogCallback: Iterable[CallbackBase] = env_config.get( + "actuation_config", {}).get("rewardLogCallback", None) self.max_steps: int = env_config.get("max_steps", int(1e6)) self.iter: int = 0 self.restart_ckpts = env_config.get("flow_config", {}).get("restart", None) if self.restart_ckpts is None: - self.q0: self.flow.StateType = self.flow.copy_state() - - # if len(env_config.get("flow_config", {}).get("restart", None)) > 1: - # self.dummy_flow: PDEBase = env_config.get("flow")(**env_config.get("flow_config", {})) - # self.restart_ckpts = env_config.get("flow_config", {}).get("restart", None) - - # print("Restart ckpts:", self.restart_ckpts, flush=True) - # print("len:", len(self.restart_ckpts), flush=True) - # print("0 ckpt:", self.restart_ckpts[0], flush=True) - - # self.q0s = [] - # for ckpt in range(len(self.restart_ckpts)): - # # self.dummy_flow.mesh = self.dummy_flow.load_mesh(name=env_config.get("flow_config", {}).get("mesh", self.dummy_flow.DEFAULT_MESH)) - - # print("ckpt:", ckpt, self.restart_ckpts[ckpt],flush=True) - # self.dummy_flow.load_checkpoint(self.restart_ckpts[ckpt]) - # self.q0s.append(self.dummy_flow.copy_state()) - - # print("self.q0s loaded", flush=True) - # # self.q0 = self.q0s[-1] - # self.q0: self.flow.StateType = self.flow.copy_state() + self.q0: self.flow.StateType = self.flow.copy_state() self.observation_space = gym.spaces.Box( low=-np.inf, @@ -416,23 +396,30 @@ def __init__(self, env_config: dict): self.t = 0. self.dt = env_config.get("solver_config", {}).get("dt", None) assert self.dt is not None, f"Error: Solver timestep dt ({self.dt}) must not be None" - self.num_sim_substeps_per_actuation = env_config.get("actuation_config", {}).get("num_sim_substeps_per_actuation", None) + self.num_sim_substeps_per_actuation = env_config.get( + "actuation_config", {}).get("num_sim_substeps_per_actuation", None) if self.num_sim_substeps_per_actuation is not None and self.num_sim_substeps_per_actuation > 1: - assert self.rewardLogCallback is not None, f"Error: If num_sim_substeps_per_actuation ({self.num_sim_substeps_per_actuation}) is set a reward callback function must be given, {self.rewardLogCallback}" - self.reward_aggreation_rule = env_config.get("actuation_config", {}).get("reward_aggreation_rule", None) - assert self.reward_aggreation_rule in ['mean', 'sum', 'median'], f"Error: reward aggregation rule ({self.reward_aggreation_rule}) is not given or not implemented yet" + assert self.rewardLogCallback is not None,\ + f"Error: If num_sim_substeps_per_actuation ({self.num_sim_substeps_per_actuation}) " \ + "is set a reward callback function must be given, {self.rewardLogCallback}" + self.reward_aggreation_rule = env_config.get("actuation_config", {}).get( + "reward_aggreation_rule", None) + assert self.reward_aggreation_rule in [ + 'mean', 'sum', 'median' + ], f"Error: reward aggregation rule ({self.reward_aggreation_rule}) is not given or not implemented yet" def constant_action_controller(self, t, y): - return self.action + return self.action def set_callbacks(self, callbacks: Iterable[CallbackBase]): self.callbacks = callbacks def step( - self, action: Iterable[ArrayLike] = None - ) -> Tuple[ArrayLike, float, bool, dict]: - """Advance the state of the environment. See gym.Env documentation + self, + action: Iterable[ArrayLike] = None + ) -> Tuple[ArrayLike, float, bool, dict]: + """Advance the state of the environment. See gym.Env documentation Args: action (Iterable[ActType], optional): Control inputs. Defaults to None. @@ -440,43 +427,45 @@ def step( Returns: Tuple[ObsType, float, bool, dict]: obs, reward, done, info """ - action = action * self.flow.CONTROL_SCALING - - if self.num_sim_substeps_per_actuation is not None and self.num_sim_substeps_per_actuation > 1: - self.action = action - self.flow = self.solver.solve_multistep(num_substeps=self.num_sim_substeps_per_actuation, callbacks=[self.rewardLogCallback], controller=self.constant_action_controller, start_iteration_value=self.iter) - if self.reward_aggreation_rule == "mean": - # print('flow_array', self.flow.reward_array,flush=True) - # print('mean flow_array', np.mean(self.flow.reward_array, axis=0),flush=True) - averaged_objective_values = np.mean(self.flow.reward_array, axis=0) - elif self.reward_aggreation_rule == "sum": - averaged_objective_values = np.sum(self.flow.reward_array, axis=0) - elif self.reward_aggreation_rule == "median": - averaged_objective_values = np.median(self.flow.reward_array, axis=0) - else: - raise NotImplementedError(f"The {self.reward_aggreation_rule} function is not implemented yet.") - - self.iter += self.num_sim_substeps_per_actuation - self.t += self.dt * self.num_sim_substeps_per_actuation - reward = self.get_reward(averaged_objective_values) - else: - self.solver.step(self.iter, control=action) - self.iter += 1 - t = self.iter * self.solver.dt - self.t += self.dt - reward = self.get_reward() - - for cb in self.callbacks: - cb(self.iter, self.t, self.flow) - obs = self.flow.get_observations() - - done = self.check_complete() - # print('max_steps', self.max_steps, 'current iter:', self.iter, 'done', done, flush=True) - info = {} - - obs = self.stack_observations(obs) - - return obs, reward, done, info + action = action * self.flow.CONTROL_SCALING + + if self.num_sim_substeps_per_actuation is not None and self.num_sim_substeps_per_actuation > 1: + self.action = action + self.flow = self.solver.solve_multistep( + num_substeps=self.num_sim_substeps_per_actuation, + callbacks=[self.rewardLogCallback], + controller=self.constant_action_controller, + start_iteration_value=self.iter) + if self.reward_aggreation_rule == "mean": + averaged_objective_values = np.mean(self.flow.reward_array, axis=0) + elif self.reward_aggreation_rule == "sum": + averaged_objective_values = np.sum(self.flow.reward_array, axis=0) + elif self.reward_aggreation_rule == "median": + averaged_objective_values = np.median(self.flow.reward_array, axis=0) + else: + raise NotImplementedError( + f"The {self.reward_aggreation_rule} function is not implemented yet." + ) + + self.iter += self.num_sim_substeps_per_actuation + self.t += self.dt * self.num_sim_substeps_per_actuation + reward = self.get_reward(averaged_objective_values) + else: + self.solver.step(self.iter, control=action) + self.iter += 1 + t = self.iter * self.solver.dt + self.t += self.dt + reward = self.get_reward() + + for cb in self.callbacks: + cb(self.iter, self.t, self.flow) + obs = self.flow.get_observations() + done = self.check_complete() + info = {} + + obs = self.stack_observations(obs) + + return obs, reward, done, info # TODO: Use this to allow for arbitrary returns from collect_observations # That are then converted to a list/tuple/ndarray here @@ -485,11 +474,13 @@ def stack_observations(self, obs): def get_reward(self, averaged_objective_values=None): if averaged_objective_values is None: - # return -self.solver.dt * self.flow.evaluate_objective() - return -self.flow.evaluate_objective() + # return -self.solver.dt * self.flow.evaluate_objective() + return -self.flow.evaluate_objective() else: - # return -self.solver.dt * self.num_sim_substeps_per_actuation * self.flow.evaluate_objective(averaged_objective_values=averaged_objective_values) - return -self.flow.evaluate_objective(averaged_objective_values=averaged_objective_values) + # return -self.solver.dt * self.num_sim_substeps_per_actuation\ + # * self.flow.evaluate_objective(averaged_objective_values=averaged_objective_values) + return -self.flow.evaluate_objective( + averaged_objective_values=averaged_objective_values) def check_complete(self): return self.iter > self.max_steps @@ -499,7 +490,8 @@ def reset(self, t=0.0) -> Union[ArrayLike, Tuple[ArrayLike, dict]]: self.flow.reset(q0=self.q0, t=t) self.solver.reset() - return self.flow.get_observations(), info + # Previously: return self.flow.get_observations(), info + return self.flow.get_observations() def render(self, mode="human", **kwargs): self.flow.render(mode=mode, **kwargs) @@ -507,6 +499,3 @@ def render(self, mode="human", **kwargs): def close(self): for cb in self.callbacks: cb.close() - - -