Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes issue for saving model files with missing keys #1151

Merged
merged 12 commits into from
Nov 10, 2023
16 changes: 12 additions & 4 deletions watertap/tools/analysis_tools/loop_tool/data_merging_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@
# check if there is a back up
if isinstance(backup_file_name, str):
f_old_solutions = h5py.File(backup_file_name, "r")
solved_values = sum(
np.array(
f_old_solutions[directory]["solve_successful"]["solve_successful"][()]
if (
directory in f_old_solutions
and "solve_successful" in f_old_solutions[directory]
):
solved_values = sum(
np.array(
f_old_solutions[directory]["solve_successful"]["solve_successful"][
()
]
)
)
)
else:
solved_values = None

Check warning on line 62 in watertap/tools/analysis_tools/loop_tool/data_merging_tool.py

View check run for this annotation

Codecov / codecov/patch

watertap/tools/analysis_tools/loop_tool/data_merging_tool.py#L62

Added line #L62 was not covered by tests
else:
solved_values = None
if force_rerun:
Expand Down
6 changes: 4 additions & 2 deletions watertap/tools/parameter_sweep/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ def build_and_init(self, sweep_params=None, local_value_k=None):
# update paramters before init if enabled by user
if (
self.ps_conf.update_sweep_params_before_init
and sweep_params != None
and local_value_k != None
and sweep_params is not None
and local_value_k is not None
):
self.update_model_params(sweep_params, local_value_k)
# init

self.init_model()

# raise error if user sets to init before sweep, but does not provide
# initilize function
elif self.ps_conf.update_sweep_params_before_init:
Expand Down
39 changes: 34 additions & 5 deletions watertap/tools/parameter_sweep/parameter_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def _param_sweep_kernel(self, sweep_params, local_value_k):
self.model_manager.build_and_init(sweep_params, local_value_k)
# try to solve our model
self.model_manager.update_model_params(sweep_params, local_value_k)
results = self.model_manager.solve_model()
self.model_manager.solve_model()

# if model failed to solve from a prior paramter solved state, lets try
# to re-init and solve again
Expand All @@ -610,7 +610,7 @@ def _param_sweep_kernel(self, sweep_params, local_value_k):
):
self.model_manager.build_and_init(sweep_params, local_value_k)
self.model_manager.update_model_params(sweep_params, local_value_k)
results = self.model_manager.solve_model()
self.model_manager.solve_model()
# return model solved state
return self.model_manager.is_solved

Expand Down Expand Up @@ -745,8 +745,13 @@ def _combine_gather_results(self, all_results):

# for each result, concat the "value" array of results into the
# gathered results to combine them all
for result in all_results[1:]:

# get length of data in first result for finding missing keys
total_chunk_length = len(all_results[0].results["solve_successful"])

for i, result in enumerate(all_results[1:]):
results = result.results

for key, val in results.items():
if key == "solve_successful":
combined_results[key] = np.append(
Expand All @@ -755,13 +760,38 @@ def _combine_gather_results(self, all_results):
continue

for subkey, subval in val.items():
# lets catch any keys that don' exist in result[0] and
# create empty array with expected length, after which we will add
# additional values, or add nan's instead
if subkey not in combined_results[key]:
# create empty array, as none of results so far had this key\

combined_results[key][subkey] = {}
for sub_subkey, value in subval.items():
if sub_subkey == "value":
combined_results[key][subkey]["value"] = (
np.zeros(total_chunk_length) * np.nan
)
else:
combined_results[key][subkey][sub_subkey] = value
combined_results[key][subkey]["value"] = np.append(
combined_results[key][subkey]["value"],
copy.deepcopy(
subval["value"],
),
)

# keep track of our subchunk_length
sub_chunk_length = len(subval["value"])

# make sure we add any empty value to missing keys

for subkey in combined_results[key]:
if subkey not in val.keys():
empty_chunk = np.zeros(sub_chunk_length) * np.nan
combined_results[key][subkey]["value"] = np.append(
combined_results[key][subkey]["value"], empty_chunk
)
total_chunk_length += sub_chunk_length
return combined_results

"""
Expand All @@ -782,7 +812,6 @@ def _combine_output_array(self, gathered_results):
for _, output in outputs.items():
for i in range(len(output["value"])):
combined_outputs[i] = np.append(combined_outputs[i], output["value"][i])

return np.asarray(combined_outputs)

"""
Expand Down
121 changes: 118 additions & 3 deletions watertap/tools/parameter_sweep/tests/test_parameter_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def dummy_kernel_logic(solution_succesful):
# kernel reinits model and then tries solving again
init_state.append(True)
solved_state.append(False)
# but it fails as again
# but it fails again
init_state.append(False)
solved_state.append(False)
else:
Expand Down Expand Up @@ -965,6 +965,109 @@ def test_parameter_sweep_optimize(self, model, tmp_path):
_assert_dictionary_correctness(truth_dict, read_dict)
_assert_h5_csv_agreement(csv_results_file_name, read_dict)

@pytest.mark.component
def test_parameter_sweep_optimize_with_added_var(self, model, tmp_path):
# this will run a solve function that adds a variable but only in some
# of the solves.
"""THIS TEST IS DESIGNED FOR 2 Parallel workers!!!!!!"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be needed to make this test work with an arbitrary number of workers? This could be dealt with in a subsequent PR.

Copy link
Contributor Author

@avdudchenko avdudchenko Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a minum number - if you have 1 worker then we don't test for missing keys. I think we should really just change how we store results from each solve (e.g instead of storing all results in arrays inside a result dictionory, we just store the value for each parameter in a dict and store an array of dicts--not sure how MPI safe that is but that would solve this issue. - this would be same as async multiprocessing or ray implementation, where we split parmans into size of 1...

comm = MPI.COMM_WORLD

tmp_path = _get_rank0_path(comm, tmp_path)
results_fname = os.path.join(tmp_path, "global_results")
csv_results_file_name = str(results_fname) + ".csv"
h5_results_file_name = str(results_fname) + ".h5"

ps = ParameterSweep(
optimize_function=_optimization,
initialize_function=_initialize_with_added_var,
update_sweep_params_before_init=True,
initialize_before_sweep=True,
optimize_kwargs={"relax_feasibility": True},
probe_function=_good_test_function,
csv_results_file_name=csv_results_file_name,
h5_results_file_name=h5_results_file_name,
debugging_data_dir=tmp_path,
interpolate_nan_outputs=False,
number_of_subprocesses=2,
parallel_back_end="MultiProcessing",
)

results_fname = os.path.join(tmp_path, "global_results")
csv_results_file_name = str(results_fname) + ".csv"
h5_results_file_name = str(results_fname) + ".h5"

# Call the parameter_sweep function
ps.parameter_sweep(
build_model_for_tps,
build_sweep_params_for_tps,
)

# NOTE: rank 0 "owns" tmp_path, so it needs to be
# responsible for doing any output file checking
# tmp_path can be deleted as soon as this method
# returns
# Check that test var array was created
if ps.parallel_manager.is_root_process():
truth_dict = {
"outputs": {
"fs.test_var": {
"lower bound": 0,
"units": "None",
"upper bound": 10,
"value": np.array(
[
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
5,
np.nan,
np.nan,
np.nan,
]
),
},
"objective": {
"value": np.array(
[
0.2,
9.50000020e-01,
-4.98799990e02,
1.0,
1.75,
-4.97999990e02,
-7.98999990e02,
-7.98249990e02,
2.0 - 1000.0 * ((2.0 * 0.9 - 1.0) + (3.0 * 0.5 - 1.0)),
]
)
},
},
"solve_successful": [True] * 9,
"sweep_params": {
"fs.input[a]": {
"lower bound": 0,
"units": "None",
"upper bound": 1,
"value": np.array(
[0.1, 0.1, 0.1, 0.5, 0.5, 0.5, 0.9, 0.9, 0.9]
),
},
"fs.input[b]": {
"lower bound": 0,
"units": "None",
"upper bound": 1,
"value": np.array(
[0.0, 0.25, 0.5, 0.0, 0.25, 0.5, 0.0, 0.25, 0.5]
),
},
},
}

read_dict = _read_output_h5(h5_results_file_name)
_assert_dictionary_correctness(truth_dict, read_dict, rtol=1e-2)

@pytest.mark.component
def test_parameter_sweep_bad_initialize_call_2(self, model, tmp_path):
comm = MPI.COMM_WORLD
Expand Down Expand Up @@ -2004,6 +2107,14 @@ def _optimization(m, relax_feasibility=False):
return results


def _initialize_with_added_var(m):
if (abs(m.fs.input["a"].value - 0.5) < 1e-6) and abs(
m.fs.input["b"].value - 0.5
) < 1e-6:
m.fs.test_var = pyo.Var(initialize=5, bounds=(0, 10))
m.fs.test_var.fix()


def _reinitialize(m, slack_penalty=10.0):
m.fs.slack.setub(None)
m.fs.slack_penalty.value = slack_penalty
Expand All @@ -2027,7 +2138,7 @@ def _bad_test_function(m):
return False


def _assert_dictionary_correctness(truth_dict, test_dict):
def _assert_dictionary_correctness(truth_dict, test_dict, rtol=1e-05, atol=1e-08):
assert truth_dict.keys() == test_dict.keys()

for key, item in truth_dict.items():
Expand All @@ -2039,13 +2150,17 @@ def _assert_dictionary_correctness(truth_dict, test_dict):
test_dict[key][subkey]["value"],
subitem["value"],
equal_nan=True,
rtol=rtol,
atol=atol,
)
else:
assert subsubitem == test_dict[key][subkey][subsubkey]
elif key == "solve_successful":
assert item == test_dict[key]
elif key in ["nominal_idx", "differential_idx"]:
assert np.allclose(test_dict[key], item, equal_nan=True)
assert np.allclose(
test_dict[key], item, equal_nan=True, rtol=rtol, atol=atol
)


def _assert_h5_csv_agreement(csv_filename, h5_dict):
Expand Down
Loading