Skip to content

Commit

Permalink
Refactor ropt event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
verveerpj committed Feb 27, 2025
1 parent b35e66a commit 9c06e6d
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 109 deletions.
15 changes: 6 additions & 9 deletions src/ert/run_models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,20 @@ class RunModelStatusEvent(RunModelEvent):
class EverestStatusEvent(BaseModel):
batch: int | None
event_type: Literal["EverestStatusEvent"] = "EverestStatusEvent"

# Reflects what is currently in ROPT,
# changes in ROPT should appear here accordingly
everest_event: Literal[
"START_EVALUATION",
"START_OPTIMIZER_EVALUATION",
"START_SAMPLING_EVALUATION",
"START_OPTIMIZER_STEP",
"FINISHED_OPTIMIZER_STEP",
"START_EVALUATOR_STEP",
"FINISHED_EVALUATOR_STEP",
]


class EverestBatchResultEvent(BaseModel):
batch: int
event_type: Literal["EverestBatchResultEvent"] = "EverestBatchResultEvent"
everest_event: Literal["FINISHED_EVALUATION", "FINISHED_SAMPLING_EVALUATION"]
everest_event: Literal[
"OPTIMIZATION_RESULT",
"FINISHED_OPTIMIZER_EVALUATION",
"FINISHED_SAMPLING_EVALUATION",
]
result_type: Literal["FunctionResult", "GradientResult"]


Expand Down
65 changes: 32 additions & 33 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def _init_domain_transforms(

self.send_event(
EverestStatusEvent(
batch=None, # Always 0, but omitting it for consistency
batch=self._batch_id,
everest_event="START_SAMPLING_EVALUATION",
)
)
Expand All @@ -365,6 +365,9 @@ def _init_domain_transforms(
)
)

# Increase the batch ID for the next evaluation:
self._batch_id += 1

if transforms.objectives.has_auto_scale:
transforms.objectives.calculate_auto_scales(objectives)
if (
Expand Down Expand Up @@ -399,45 +402,29 @@ def _create_optimizer(self) -> BasicOptimizer:
),
)

def _forward_ropt_event(everest_event: OptimizerEvent) -> None:
def _forward_results(everest_event: OptimizerEvent) -> None:
has_results = bool(everest_event.data and everest_event.data.get("results"))

# The batch these results pertain to
# If the event has results, they usually pertain to the
# batch before self._batch_id, i.e., self._batch_id - 1

if has_results:
# A ROPT event may contain multiple results, here we send one
# event per result
results = everest_event.data["results"]
batch_id = results[0].batch_id

for r in results:
self.send_event(
EverestBatchResultEvent(
batch=batch_id,
everest_event=everest_event.event_type.name,
batch=r.batch_id, # From the event, the batch that produced it.
everest_event="OPTIMIZATION_RESULT",
result_type=(
"FunctionResult"
if isinstance(r, FunctionResults)
else "GradientResult"
),
)
)
else:
# Events indicating the start of an evaluation,
# start of optimizer step holds no results
# but may still be relevant to the subscriber
self.send_event(
EverestStatusEvent(
batch=None,
everest_event=everest_event.event_type.name,
)
)

# Forward ROPT events to queue
for event_type in EventType:
optimizer.add_observer(event_type, _forward_ropt_event)
# Send events when results are received.
optimizer.add_observer(EventType.FINISHED_EVALUATION, _forward_results)

return optimizer

Expand Down Expand Up @@ -513,9 +500,6 @@ def _run_forward_model(
# Gather the results
objectives, constraints = self._gather_simulation_results(ensemble)

# Increase the batch ID for the next evaluation:
self._batch_id += 1

# Return the results, together with the indices of the evaluated controls:
return objectives, constraints

Expand Down Expand Up @@ -705,8 +689,6 @@ def _forward_model_evaluator(
all_results=all_results,
)

batch_id = self._batch_id # Save the batch ID, it will be modified.

control_values_to_simulate = np.array(
[
c.control_vector
Expand All @@ -720,19 +702,31 @@ def _forward_model_evaluator(
sim_infos = [
c for c in evaluation_infos if c.status == _EvaluationStatus.TO_SIMULATE
]

self.send_event(
EverestStatusEvent(
batch=self._batch_id,
everest_event="START_OPTIMIZER_EVALUATION",
)
)

sim_objectives, sim_constraints = self._run_forward_model(
control_values=control_values_to_simulate,
model_realizations=[c.model_realization for c in sim_infos],
perturbations=[c.perturbation for c in sim_infos],
)

self.send_event(
EverestBatchResultEvent(
batch=self._batch_id,
everest_event="FINISHED_OPTIMIZER_EVALUATION",
result_type="FunctionResult",
)
)
else:
sim_objectives = np.array([], dtype=np.float64)
sim_constraints = np.array([], dtype=np.float64)

# Note: removing this causes some tests to fail, but this behavior
# should maybe be revised?
self._batch_id += 1

# Assign simulated results to corresponding evaluation infos
for ei in evaluation_infos:
if ei.simulation_id is not None:
Expand Down Expand Up @@ -774,13 +768,18 @@ def _forward_model_evaluator(
dtype=np.int32,
)

return EvaluatorResult(
evaluator_result = EvaluatorResult(
objectives=objectives,
constraints=constraints,
batch_id=batch_id,
batch_id=self._batch_id,
evaluation_ids=sim_ids,
)

# increase the batch ID for the next evaluation:
self._batch_id += 1

return evaluator_result

def _create_simulation_controls(
self,
control_values: NDArray[np.float64],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,116 +1,142 @@
{
"everest_events": [
{
"batch": null,
"batch": 0,
"event_type": "EverestStatusEvent",
"everest_event": "START_OPTIMIZER_STEP"
"everest_event": "START_OPTIMIZER_EVALUATION"
},
{
"batch": null,
"event_type": "EverestStatusEvent",
"everest_event": "START_EVALUATION"
"batch": 0,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_OPTIMIZER_EVALUATION",
"result_type": "FunctionResult"
},
{
"batch": 0,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "FunctionResult"
},
{
"batch": 0,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "GradientResult"
},
{
"batch": null,
"batch": 1,
"event_type": "EverestStatusEvent",
"everest_event": "START_EVALUATION"
"everest_event": "START_OPTIMIZER_EVALUATION"
},
{
"batch": 1,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_OPTIMIZER_EVALUATION",
"result_type": "FunctionResult"
},
{
"batch": 1,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "FunctionResult"
},
{
"batch": 1,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "GradientResult"
},
{
"batch": null,
"batch": 2,
"event_type": "EverestStatusEvent",
"everest_event": "START_EVALUATION"
"everest_event": "START_OPTIMIZER_EVALUATION"
},
{
"batch": 2,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "FINISHED_OPTIMIZER_EVALUATION",
"result_type": "FunctionResult"
},
{
"batch": 2,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "FunctionResult"
},
{
"batch": 2,
"event_type": "EverestBatchResultEvent",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "GradientResult"
},
{
"batch": null,
"batch": 3,
"event_type": "EverestStatusEvent",
"everest_event": "START_EVALUATION"
"everest_event": "START_OPTIMIZER_EVALUATION"
},
{
"batch": 3,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "FINISHED_OPTIMIZER_EVALUATION",
"result_type": "FunctionResult"
},
{
"batch": 3,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "FunctionResult"
},
{
"batch": 3,
"event_type": "EverestBatchResultEvent",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "GradientResult"
},
{
"batch": null,
"batch": 4,
"event_type": "EverestStatusEvent",
"everest_event": "START_EVALUATION"
"everest_event": "START_OPTIMIZER_EVALUATION"
},
{
"batch": 4,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_OPTIMIZER_EVALUATION",
"result_type": "FunctionResult"
},
{
"batch": 4,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "FunctionResult"
},
{
"batch": 4,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "GradientResult"
},
{
"batch": null,
"batch": 5,
"event_type": "EverestStatusEvent",
"everest_event": "START_EVALUATION"
"everest_event": "START_OPTIMIZER_EVALUATION"
},
{
"batch": 5,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"everest_event": "FINISHED_OPTIMIZER_EVALUATION",
"result_type": "FunctionResult"
},
{
"batch": 5,
"event_type": "EverestBatchResultEvent",
"everest_event": "FINISHED_EVALUATION",
"result_type": "GradientResult"
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "FunctionResult"
},
{
"batch": null,
"event_type": "EverestStatusEvent",
"everest_event": "FINISHED_OPTIMIZER_STEP"
"batch": 5,
"event_type": "EverestBatchResultEvent",
"everest_event": "OPTIMIZATION_RESULT",
"result_type": "GradientResult"
}
],
"num_full_snapshots": 6
Expand Down
Loading

0 comments on commit 9c06e6d

Please sign in to comment.