diff --git a/.gitignore b/.gitignore index 9599c050e..621d97f26 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ dist/ .ipynb_checkpoints/ venv/ my_db_config.yaml +database.yaml diff --git a/docs/sources/experiments/algorithm.md b/docs/sources/experiments/algorithm.md index 4964bfcae..ab260322f 100644 --- a/docs/sources/experiments/algorithm.md +++ b/docs/sources/experiments/algorithm.md @@ -309,7 +309,7 @@ The trained model's prediction probabilities (`predict_proba()`) are computed bo ### Individual Feature Importance Feature importances (of a configurable number of top features, defaulting to 5) for each prediction are computed and written to the `test_results.individual_importances` table. Right now, there are no sophisticated calculation methods integrated into the experiment; simply the top 5 global feature importances for the model are copied to the `individual_importances` table. -#### Metrics +### Metrics Triage allows for the computation of both testing set and training set evaluation metrics. Evaluation metrics, such as precision and recall at various thresholds, are written to either the `train_results.evaluations` table or the `test_results.evaluations`. Triage defines a number of [Evaluation Metrics](https://github.com/dssg/triage/blob/master/src/triage/component/catwalk/evaluation.py#L45-L58) metrics that can be addressed by name in the experiment definition, along with a list of thresholds and/or other parameters (such as the 'beta' value for fbeta) to iterate through. Thresholding is done either via absolute value (top k) or percentile by sorting the predictions and labels by the row's predicted probability score, with ties broken at random (the random seed can be passed in the config file to make this deterministic), and assigning the predicted value as True for those above the threshold. Note that the percentile thresholds are in terms of the population percentage, not a cutoff threshold for the predicted probability. Sometimes test matrices may not have labels for every row, so it's worth mentioning here how that is handled and interacts with thresholding. Rows with missing labels are not considered in the metric calculations, and if some of these rows are in the top k of the test matrix, no more rows are taken from the rest of the list for consideration. So if the experiment is calculating precision at the top 100 rows, and 40 of the top 100 rows are missing a label, the precision will actually be calculated on the 60 of the top 100 rows that do have a label. To make the results of this more transparent for users, a few extra pieces of metadata are written to the evaluations table for each metric score. @@ -319,5 +319,14 @@ Sometimes test matrices may not have labels for every row, so it's worth mention labels * `num_positive_labels` - The number of positive labels in the test matrix +Triage also supports evaluating a model on a subset of the predictions made. +This is done by passing a subset query in the prediction config. The model +evaluator will then subset the predictions on valid entity-date pairs for the +given model and will calculate metrics for the subset, re-applying thresholds +as necessary to the predictions in the subset. Subset definitions are stored in +the `model_metadata.subsets` table, and the evaluations are stored in the +`evaluations` tables. A hash of the subset configuration identifies subset +evaluations and links the `subsets` table. + ### Recap At this point, the 'model_metadata', 'train_results', and 'test_results' database schemas are fully populated with data about models, model groups, predictions, feature importances, and evaluation metrics for the researcher to query. In addition, the trained model pickle files are saved in the configured project path. The experiment is considered finished. diff --git a/docs/sources/experiments/running.md b/docs/sources/experiments/running.md index 024d491f6..ff0d70b75 100644 --- a/docs/sources/experiments/running.md +++ b/docs/sources/experiments/running.md @@ -266,11 +266,12 @@ After the experiment run, a variety of schemas and tables will be created and po * model_metadata.experiment_models - A many-to-many table between experiments and models. This will have a row if the experiment used the model, regardless of whether or not it had to build it * model_metadata.model_groups - A model groups refers to all models that share parameters like classifier type, hyperparameters, etc, but *have different training windows*. Look at these to see how classifiers perform over different training windows. * model_metadata.matrices - Each matrix that was used for training and testing has metadata written about it such as the matrix hash, length, and time configuration. +* model_metadata.subsets - Each evaluation subset that was used for model scoring has its configuation and a hash written here * train_results.feature_importances - The sklearn feature importances results for each trained model * train_results.predictions - Prediction probabilities for train matrix entities generated against trained models * train_results.evaluations - Metric scores of trained models on the training data. * test_results.predictions - Prediction probabilities for test matrix entities generated against trained models -* test_results.evaluations - Metric scores of trained models over given testing windows +* test_results.evaluations - Metric scores of trained models over given testing windows and subsets * test_results.individual_importances - Individual feature importance scores for test matrix entities. Here's an example query, which returns the top 10 model groups by precision at the top 100 entities: diff --git a/example/config/experiment.yaml b/example/config/experiment.yaml index 6c4f52643..97aa0d3ae 100644 --- a/example/config/experiment.yaml +++ b/example/config/experiment.yaml @@ -316,6 +316,26 @@ grid_config: # sort_seed, if passed, will seed the random number generator for each model's # metric creation phase. This affects how entities with the same probabilities # are sorted +# +# subsets, if passed, will add evaluations for subset(s) of the predictions to +# the subset_evaluations tables, using the same testing and training metric +# groups as used for overall evaluations but with any thresholds reapplied only +# to entities in the subset on the relevant as_of_dates. For example, when +# calculating precision@5_pct for the subset of women, the ModelEvaluator will +# count as positively labeled the top 5% of women, rather than any women in the +# top 5% overall. This is useful if, for example, different interventions will +# be applied to different subsets of entities (e.g., one program will provide +# subsidies to the top 500 women with children and another program will provide +# shelter to the top 150 women without children) and you would like to see +# whether a single model can be used for both applications. Subsets can also be +# used to see how a model's performance would be affected if the requirements +# for intervention eligibility became more restricted. +# +# Subsets should be a list of dictionaries with the following keys: +# - "name": a shorthand name for the subset +# - "query": a query that returns distinct entity_ids belonging to the +# subset on a given as_of_date with a placeholder for the +# as_of_date being queried scoring: sort_seed: 5 testing_metric_groups: @@ -336,6 +356,14 @@ scoring: training_metric_groups: - metrics: [accuracy] + subsets: + - + name: women + query: | + select distinct entity_id + from demographics p + where d.gender = 'woman' + and demographic_date < '{as_of_date}'::date # INDIVIDUAL IMPORTANCES diff --git a/src/tests/architect_tests/test_cohort_table_generators.py b/src/tests/architect_tests/test_entity_date_table_generators.py similarity index 80% rename from src/tests/architect_tests/test_cohort_table_generators.py rename to src/tests/architect_tests/test_entity_date_table_generators.py index 6393cec02..21f6b5901 100644 --- a/src/tests/architect_tests/test_cohort_table_generators.py +++ b/src/tests/architect_tests/test_entity_date_table_generators.py @@ -4,7 +4,7 @@ import testing.postgresql from sqlalchemy.engine import create_engine -from triage.component.architect.cohort_table_generators import CohortTableGenerator +from triage.component.architect.entity_date_table_generators import EntityDateTableGenerator from . import utils @@ -18,19 +18,19 @@ def test_empty_output(): with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) utils.create_binary_outcome_events(engine, "events", []) - table_generator = CohortTableGenerator( + table_generator = EntityDateTableGenerator( query="select entity_id from events where outcome_date < '{as_of_date}'::date", db_engine=engine, - cohort_table_name="exp_hash_cohort", + entity_date_table_name="exp_hash_cohort", ) with pytest.raises(ValueError): # Request time outside of available intervals - table_generator.generate_cohort_table([datetime(2015, 12, 31)]) + table_generator.generate_entity_date_table([datetime(2015, 12, 31)]) (cohort_count,) = engine.execute( f"""\ - select count(*) from {table_generator.cohort_table_name} + select count(*) from {table_generator.entity_date_table_name} """ ).first() @@ -39,7 +39,7 @@ def test_empty_output(): engine.dispose() -def test_cohort_table_generator_replace(): +def test_entity_date_table_generator_replace(): input_data = [ (1, datetime(2016, 1, 1), True), (1, datetime(2016, 4, 1), False), @@ -53,10 +53,10 @@ def test_cohort_table_generator_replace(): with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) utils.create_binary_outcome_events(engine, "events", input_data) - table_generator = CohortTableGenerator( + table_generator = EntityDateTableGenerator( query="select entity_id from events where outcome_date < '{as_of_date}'::date", db_engine=engine, - cohort_table_name="exp_hash_cohort", + entity_date_table_name="exp_hash_entity_date", replace=True ) as_of_dates = [ @@ -67,7 +67,7 @@ def test_cohort_table_generator_replace(): datetime(2016, 5, 1), datetime(2016, 6, 1), ] - table_generator.generate_cohort_table(as_of_dates) + table_generator.generate_entity_date_table(as_of_dates) expected_output = [ (1, datetime(2016, 2, 1), True), (1, datetime(2016, 3, 1), True), @@ -91,20 +91,20 @@ def test_cohort_table_generator_replace(): results = list( engine.execute( f""" - select entity_id, as_of_date, active from {table_generator.cohort_table_name} + select entity_id, as_of_date, active from {table_generator.entity_date_table_name} order by entity_id, as_of_date """ ) ) assert results == expected_output - utils.assert_index(engine, table_generator.cohort_table_name, "entity_id") - utils.assert_index(engine, table_generator.cohort_table_name, "as_of_date") + utils.assert_index(engine, table_generator.entity_date_table_name, "entity_id") + utils.assert_index(engine, table_generator.entity_date_table_name, "as_of_date") - table_generator.generate_cohort_table(as_of_dates) + table_generator.generate_entity_date_table(as_of_dates) assert results == expected_output -def test_cohort_table_generator_noreplace(): +def test_entity_date_table_generator_noreplace(): input_data = [ (1, datetime(2016, 1, 1), True), (1, datetime(2016, 4, 1), False), @@ -118,10 +118,10 @@ def test_cohort_table_generator_noreplace(): with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) utils.create_binary_outcome_events(engine, "events", input_data) - table_generator = CohortTableGenerator( + table_generator = EntityDateTableGenerator( query="select entity_id from events where outcome_date < '{as_of_date}'::date", db_engine=engine, - cohort_table_name="exp_hash_cohort", + entity_date_table_name="exp_hash_entity_date", replace=False ) @@ -131,7 +131,7 @@ def test_cohort_table_generator_noreplace(): datetime(2016, 2, 1), datetime(2016, 3, 1), ] - table_generator.generate_cohort_table(as_of_dates) + table_generator.generate_entity_date_table(as_of_dates) expected_output = [ (1, datetime(2016, 2, 1), True), (1, datetime(2016, 3, 1), True), @@ -143,16 +143,16 @@ def test_cohort_table_generator_noreplace(): results = list( engine.execute( f""" - select entity_id, as_of_date, active from {table_generator.cohort_table_name} + select entity_id, as_of_date, active from {table_generator.entity_date_table_name} order by entity_id, as_of_date """ ) ) assert results == expected_output - utils.assert_index(engine, table_generator.cohort_table_name, "entity_id") - utils.assert_index(engine, table_generator.cohort_table_name, "as_of_date") + utils.assert_index(engine, table_generator.entity_date_table_name, "entity_id") + utils.assert_index(engine, table_generator.entity_date_table_name, "as_of_date") - table_generator.generate_cohort_table(as_of_dates) + table_generator.generate_entity_date_table(as_of_dates) assert results == expected_output # 2. generate a cohort for a different subset of as-of-dates, @@ -163,7 +163,7 @@ def test_cohort_table_generator_noreplace(): datetime(2016, 5, 1), datetime(2016, 6, 1), ] - table_generator.generate_cohort_table(as_of_dates) + table_generator.generate_entity_date_table(as_of_dates) expected_output = [ (1, datetime(2016, 2, 1), True), (1, datetime(2016, 3, 1), True), @@ -187,7 +187,7 @@ def test_cohort_table_generator_noreplace(): results = list( engine.execute( f""" - select entity_id, as_of_date, active from {table_generator.cohort_table_name} + select entity_id, as_of_date, active from {table_generator.entity_date_table_name} order by entity_id, as_of_date """ ) diff --git a/src/tests/architect_tests/test_integration.py b/src/tests/architect_tests/test_integration.py index cb2be100e..174d0427f 100644 --- a/src/tests/architect_tests/test_integration.py +++ b/src/tests/architect_tests/test_integration.py @@ -16,7 +16,7 @@ FeatureGroupMixer, ) from triage.component.architect.label_generators import LabelGenerator -from triage.component.architect.cohort_table_generators import CohortTableGenerator +from triage.component.architect.entity_date_table_generators import EntityDateTableGenerator from triage.component.architect.planner import Planner from triage.component.architect.builders import MatrixBuilder from triage.component.catwalk.storage import ProjectStorage @@ -160,9 +160,9 @@ def basic_integration_test( test_durations=["1months"], ) - cohort_table_generator = CohortTableGenerator( + entity_date_table_generator = EntityDateTableGenerator( db_engine=db_engine, - cohort_table_name="cohort_abcd", + entity_date_table_name="cohort_abcd", query="select distinct(entity_id) from events" ) @@ -217,8 +217,8 @@ def basic_integration_test( all_as_of_times.extend(test_matrix["as_of_times"]) all_as_of_times = list(set(all_as_of_times)) - # generate cohort state table - cohort_table_generator.generate_cohort_table(as_of_dates=all_as_of_times) + # generate entity_date state table + entity_date_table_generator.generate_entity_date_table(as_of_dates=all_as_of_times) # create labels table label_generator.generate_all_labels( @@ -263,7 +263,7 @@ def basic_integration_test( }, ], feature_dates=all_as_of_times, - state_table=cohort_table_generator.cohort_table_name, + state_table=entity_date_table_generator.entity_date_table_name, ) feature_table_agg_tasks = feature_generator.generate_all_table_tasks( aggregations, task_type="aggregation" diff --git a/src/tests/catwalk_tests/test_evaluation.py b/src/tests/catwalk_tests/test_evaluation.py index 45d3ce0ca..196d45641 100644 --- a/src/tests/catwalk_tests/test_evaluation.py +++ b/src/tests/catwalk_tests/test_evaluation.py @@ -1,13 +1,29 @@ -from triage.component.catwalk.evaluation import ModelEvaluator, generate_binary_at_x +from triage.component.catwalk.evaluation import ( + ModelEvaluator, + generate_binary_at_x, + query_subset_table, + subset_labels_and_predictions, +) from triage.component.catwalk.metrics import Metric import testing.postgresql import datetime +import warnings +import re +import factory import numpy -from sqlalchemy import create_engine -from triage.component.catwalk.db import ensure_db +import pandas +from sqlalchemy.sql.expression import text +from triage.component.catwalk.utils import filename_friendly_hash, get_subset_table_name from tests.utils import fake_labels, fake_trained_model, MockMatrixStore -from tests.results_tests.factories import ModelFactory, EvaluationFactory, init_engine, session +from tests.results_tests.factories import ( + ModelFactory, + EvaluationFactory, + PredictionFactory, + SubsetFactory, + session, +) +from tests.conftest import db_engine_with_results_schema @Metric(greater_is_better=True) @@ -15,299 +31,584 @@ def always_half(predictions_proba, predictions_binary, labels, parameters): return 0.5 -def test_evaluating_early_warning(): - with testing.postgresql.Postgresql() as postgresql: - db_engine = create_engine(postgresql.url()) - ensure_db(db_engine) - testing_metric_groups = [ - { - "metrics": [ - "precision@", - "recall@", - "true positives@", - "true negatives@", - "false positives@", - "false negatives@", - ], - "thresholds": {"percentiles": [5.0, 10.0], "top_n": [5, 10]}, - }, - { - "metrics": [ - "f1", - "mediocre", - "accuracy", - "roc_auc", - "average precision score", - ] - }, - {"metrics": ["fbeta@"], "parameters": [{"beta": 0.75}, {"beta": 1.25}]}, - ] +SUBSETS = [ + { + "name": "evens", + "query": """ + select distinct entity_id + from events + where entity_id % 2 = 0 + and outcome_date < '{as_of_date}'::date + """, + }, + { + "name": "odds", + "query": """ + select distinct entity_id + from events + where entity_id % 2 = 1 + and outcome_date < '{as_of_date}'::date + """, + }, + { + "name": "empty", + "query": """ + select distinct entity_id + from events + where entity_id = -1 + and outcome_date < '{as_of_date}'::date + """, + }, +] + +TRAIN_END_TIME = datetime.datetime(2016, 1, 1) + + +def populate_subset_data(db_engine, subset, entity_ids, as_of_date=TRAIN_END_TIME): + table_name = get_subset_table_name(subset) + query_where_clause = re.search("where.*[0-9]", subset["query"]).group() + + db_engine.execute( + f""" + create table {table_name} ( + entity_id int, + as_of_date date, + active bool + ) + """ + ) - training_metric_groups = [{"metrics": ["accuracy", "roc_auc"]}] + for entity_id in entity_ids: + insert_query = f""" + with unfiltered_row as ( + select {entity_id} as entity_id, + '{as_of_date}'::date as as_of_date, + true as active + ) + insert into {table_name} + select entity_id, as_of_date, active + from unfiltered_row + {query_where_clause} + """ + db_engine.execute(text(insert_query).execution_options(autocommit=True)) + + +def test_all_same_labels(db_engine_with_results_schema): + num_entities = 5 + trained_model, model_id = fake_trained_model( + db_engine_with_results_schema, + train_end_time=TRAIN_END_TIME, + ) - custom_metrics = {"mediocre": always_half} + for label_value in [0, 1]: + labels = [label_value] * num_entities + # We should be able to calculate accuracy even if all of the labels + # are the same, but ROC_AUC requires some positive and some + # negative labels, so we should get one warning and one NULL value + # for this config + training_metric_groups = [{"metrics": ["accuracy", "roc_auc"]}] + + # Acquire fake data and objects to be used in the tests model_evaluator = ModelEvaluator( - testing_metric_groups, + {}, training_metric_groups, - db_engine, - custom_metrics=custom_metrics, + db_engine_with_results_schema, ) - - labels = fake_labels(5) - fake_train_matrix_store = MockMatrixStore("train", "efgh", 5, db_engine, labels) - fake_test_matrix_store = MockMatrixStore("test", "1234", 5, db_engine, labels) - - trained_model, model_id = fake_trained_model(db_engine) - - # Evaluate the testing metrics and test for all of them. - model_evaluator.evaluate( - trained_model.predict_proba(labels)[:, 1], fake_test_matrix_store, model_id + fake_matrix_store = MockMatrixStore( + matrix_type="train", + matrix_uuid=str(labels), + label_count=num_entities, + db_engine=db_engine_with_results_schema, + init_labels=pandas.DataFrame( + { + "label_value": labels, + "entity_id": list(range(num_entities)), + "as_of_date": [TRAIN_END_TIME] * num_entities, + } + ).set_index(["entity_id", "as_of_date"]).label_value, + init_as_of_dates=[TRAIN_END_TIME], ) - records = [ - row[0] - for row in db_engine.execute( - """select distinct(metric || parameter) - from test_results.evaluations - where model_id = %s and - evaluation_start_time = %s - order by 1""", - (model_id, fake_test_matrix_store.as_of_dates[0]), - ) - ] - assert records == [ - "accuracy", - "average precision score", - "f1", - "false negatives@10.0_pct", - "false negatives@10_abs", - "false negatives@5.0_pct", - "false negatives@5_abs", - "false positives@10.0_pct", - "false positives@10_abs", - "false positives@5.0_pct", - "false positives@5_abs", - "fbeta@0.75_beta", - "fbeta@1.25_beta", - "mediocre", - "precision@10.0_pct", - "precision@10_abs", - "precision@5.0_pct", - "precision@5_abs", - "recall@10.0_pct", - "recall@10_abs", - "recall@5.0_pct", - "recall@5_abs", - "roc_auc", - "true negatives@10.0_pct", - "true negatives@10_abs", - "true negatives@5.0_pct", - "true negatives@5_abs", - "true positives@10.0_pct", - "true positives@10_abs", - "true positives@5.0_pct", - "true positives@5_abs", - ] - - # ensure that the matrix uuid is present - matrix_uuids = [ - row[0] - for row in db_engine.execute("select matrix_uuid from test_results.evaluations") - ] - assert all(matrix_uuid == "1234" for matrix_uuid in matrix_uuids) - # Evaluate the training metrics and test - model_evaluator.evaluate( - trained_model.predict_proba(labels)[:, 1], fake_train_matrix_store, model_id - ) - records = [ - row[0] - for row in db_engine.execute( - """select distinct(metric || parameter) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + model_evaluator.evaluate( + trained_model.predict_proba(labels)[:, 1], fake_matrix_store, model_id + ) + assert len(w) == 1 + assert issubclass(w[-1].category, RuntimeWarning) + assert "NULL" in str(w[-1].message) + + for row in db_engine_with_results_schema.execute( + f"""select metric, value from train_results.evaluations where model_id = %s and evaluation_start_time = %s order by 1""", - (model_id, fake_train_matrix_store.as_of_dates[0]), - ) - ] - assert records == ["accuracy", "roc_auc"] - - # ensure that the matrix uuid is present - matrix_uuids = [ - row[0] - for row in db_engine.execute("select matrix_uuid from train_results.evaluations") - ] - assert all(matrix_uuid == "efgh" for matrix_uuid in matrix_uuids) - - -def test_model_scoring_inspections(): - with testing.postgresql.Postgresql() as postgresql: - db_engine = create_engine(postgresql.url()) - ensure_db(db_engine) - testing_metric_groups = [ - { - "metrics": ["precision@", "recall@", "fpr@"], - "thresholds": {"percentiles": [50.0], "top_n": [3]}, - }, + ( + model_id, + fake_matrix_store.as_of_dates[0] + ), + ): + if row[0] == "accuracy": + assert row[1] is not None + else: + assert row[1] is None + + +def test_subset_labels_and_predictions(db_engine_with_results_schema): + num_entities = 5 + labels = [0, 1, 0, 1, 0] + predictions_proba = numpy.array([0.6, 0.4, 0.55, 0.70, 0.3]) + + fake_matrix_store = MockMatrixStore( + matrix_type="test", + matrix_uuid="abcde", + label_count=num_entities, + db_engine=db_engine_with_results_schema, + init_labels=pandas.DataFrame( { - # ensure we test a non-thresholded metric as well - "metrics": ["accuracy"] - }, - ] - training_metric_groups = [ - {"metrics": ["accuracy"], "thresholds": {"percentiles": [50.0]}} - ] + "label_value": labels, + "entity_id": list(range(num_entities)), + "as_of_date": [TRAIN_END_TIME] * num_entities, + } + ).set_index(["entity_id", "as_of_date"]).label_value, + init_as_of_dates=[TRAIN_END_TIME], + ) - model_evaluator = ModelEvaluator( - testing_metric_groups, training_metric_groups, db_engine + for subset in SUBSETS: + if subset["name"] == "evens": + expected_result = 3 + elif subset["name"] == "odds": + expected_result = 2 + elif subset["name"] == "empty": + expected_result = 0 + + populate_subset_data(db_engine_with_results_schema, subset, list(range(num_entities))) + subset_labels, subset_predictions = subset_labels_and_predictions( + subset_df=query_subset_table( + db_engine_with_results_schema, + fake_matrix_store.as_of_dates, + get_subset_table_name(subset), + ), + predictions_proba=predictions_proba, + labels=fake_matrix_store.labels, ) - testing_labels = numpy.array([True, False, numpy.nan, True, False]) - testing_prediction_probas = numpy.array([0.56, 0.4, 0.55, 0.5, 0.3]) + assert len(subset_labels) == expected_result + assert len(subset_predictions) == expected_result + + +def test_evaluating_early_warning(db_engine_with_results_schema): + num_entities = 10 + labels = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1] + + # Set up testing configuration parameters + testing_metric_groups = [ + { + "metrics": [ + "precision@", + "recall@", + "true positives@", + "true negatives@", + "false positives@", + "false negatives@", + ], + "thresholds": {"percentiles": [5.0, 10.0], "top_n": [5, 10]}, + }, + { + "metrics": [ + "f1", + "mediocre", + "accuracy", + "roc_auc", + "average precision score", + ] + }, + {"metrics": ["fbeta@"], "parameters": [{"beta": 0.75}, {"beta": 1.25}]}, + ] - training_labels = numpy.array( - [False, False, True, True, True, False, True, True] - ) - training_prediction_probas = numpy.array( - [0.6, 0.4, 0.55, 0.70, 0.3, 0.2, 0.8, 0.6] - ) + training_metric_groups = [{"metrics": ["accuracy", "roc_auc"]}] - fake_train_matrix_store = MockMatrixStore( - "train", "efgh", 5, db_engine, training_labels - ) - fake_test_matrix_store = MockMatrixStore( - "test", "1234", 5, db_engine, testing_labels - ) + custom_metrics = {"mediocre": always_half} - trained_model, model_id = fake_trained_model(db_engine) + # Acquire fake data and objects to be used in the tests + model_evaluator = ModelEvaluator( + testing_metric_groups, + training_metric_groups, + db_engine_with_results_schema, + custom_metrics=custom_metrics, + ) - # Evaluate testing matrix and test the results - model_evaluator.evaluate( - testing_prediction_probas, fake_test_matrix_store, model_id - ) - for record in db_engine.execute( - """select * from test_results.evaluations - where model_id = %s and evaluation_start_time = %s - order by 1""", - (model_id, fake_test_matrix_store.as_of_dates[0]), - ): - assert record["num_labeled_examples"] == 4 - assert record["num_positive_labels"] == 2 - if record["parameter"] == "": - assert record["num_labeled_above_threshold"] == 4 - elif "pct" in record["parameter"]: - assert record["num_labeled_above_threshold"] == 1 - else: - assert record["num_labeled_above_threshold"] == 2 - - # Evaluate the training matrix and test the results - model_evaluator.evaluate( - training_prediction_probas, fake_train_matrix_store, model_id - ) - for record in db_engine.execute( - """select * from train_results.evaluations - where model_id = %s and evaluation_start_time = %s + fake_test_matrix_store = MockMatrixStore( + matrix_type="test", + matrix_uuid="efgh", + label_count=num_entities, + db_engine=db_engine_with_results_schema, + init_labels=pandas.DataFrame( + { + "label_value": labels, + "entity_id": list(range(num_entities)), + "as_of_date": [TRAIN_END_TIME] * num_entities, + } + ).set_index(["entity_id", "as_of_date"]).label_value, + init_as_of_dates=[TRAIN_END_TIME], + ) + fake_train_matrix_store = MockMatrixStore( + matrix_type="train", + matrix_uuid="1234", + label_count=num_entities, + db_engine=db_engine_with_results_schema, + init_labels=pandas.DataFrame( + { + "label_value": labels, + "entity_id": list(range(num_entities)), + "as_of_date": [TRAIN_END_TIME] * num_entities, + } + ).set_index(["entity_id", "as_of_date"]).label_value, + init_as_of_dates=[TRAIN_END_TIME], + ) + + trained_model, model_id = fake_trained_model( + db_engine_with_results_schema, + train_end_time=TRAIN_END_TIME, + ) + + # ensure that the matrix uuid is present + matrix_uuids = [ + row[0] + for row in db_engine_with_results_schema.execute("select matrix_uuid from test_results.evaluations") + ] + assert all(matrix_uuid == "efgh" for matrix_uuid in matrix_uuids) + + # Evaluate the training metrics and test + model_evaluator.evaluate( + trained_model.predict_proba(labels)[:, 1], fake_train_matrix_store, model_id + ) + records = [ + row[0] + for row in db_engine_with_results_schema.execute( + """select distinct(metric || parameter) + from train_results.evaluations + where model_id = %s and + evaluation_start_time = %s order by 1""", (model_id, fake_train_matrix_store.as_of_dates[0]), - ): - assert record["num_labeled_examples"] == 8 - assert record["num_positive_labels"] == 5 - assert record["value"] == 0.625 + ) + ] + assert records == ["accuracy", "roc_auc"] + + # Run tests for overall and subset evaluations + for subset in [None] + SUBSETS: + if subset is None: + where_hash = "" + else: + populate_subset_data(db_engine_with_results_schema, subset, list(range(num_entities))) + SubsetFactory(subset_hash=filename_friendly_hash(subset)) + session.commit() + where_hash = f"and subset_hash = '{filename_friendly_hash(subset)}'" + # Evaluate the testing metrics and test for all of them. + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + model_evaluator.evaluate( + trained_model.predict_proba(labels)[:, 1], + fake_test_matrix_store, + model_id, + subset, + ) + records = [ + row[0] + for row in db_engine_with_results_schema.execute( + f"""\ + select distinct(metric || parameter) + from test_results.evaluations + where model_id = %s and + evaluation_start_time = %s + {where_hash} + order by 1 + """, + ( + model_id, + fake_test_matrix_store.as_of_dates[0] + ), + ) + ] + assert records == [ + "accuracy", + "average precision score", + "f1", + "false negatives@10.0_pct", + "false negatives@10_abs", + "false negatives@5.0_pct", + "false negatives@5_abs", + "false positives@10.0_pct", + "false positives@10_abs", + "false positives@5.0_pct", + "false positives@5_abs", + "fbeta@0.75_beta", + "fbeta@1.25_beta", + "mediocre", + "precision@10.0_pct", + "precision@10_abs", + "precision@5.0_pct", + "precision@5_abs", + "recall@10.0_pct", + "recall@10_abs", + "recall@5.0_pct", + "recall@5_abs", + "roc_auc", + "true negatives@10.0_pct", + "true negatives@10_abs", + "true negatives@5.0_pct", + "true negatives@5_abs", + "true positives@10.0_pct", + "true positives@10_abs", + "true positives@5.0_pct", + "true positives@5_abs", + ] + if subset is not None and subset["name"] == "empty": + assert issubclass(w[-1].category, RuntimeWarning) + assert "NULL" in str(w[-1].message) -def test_ModelEvaluator_needs_evaluation(db_engine): - ensure_db(db_engine) - init_engine(db_engine) + # Evaluate the training metrics and test + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + model_evaluator.evaluate( + trained_model.predict_proba(labels)[:, 1], + fake_train_matrix_store, + model_id, + subset, + ) + + records = [ + row[0] + for row in db_engine_with_results_schema.execute( + f"""select distinct(metric || parameter) + from train_results.evaluations + where model_id = %s and + evaluation_start_time = %s + {where_hash} + order by 1""", + ( + model_id, + fake_train_matrix_store.as_of_dates[0] + ), + ) + ] + assert records == ["accuracy", "roc_auc"] + if subset is not None and subset["name"] == "empty": + assert issubclass(w[-1].category, RuntimeWarning) + assert "NULL" in str(w[-1].message) + + # ensure that the matrix uuid is present + matrix_uuids = [ + row[0] + for row in db_engine_with_results_schema.execute("select matrix_uuid from train_results.evaluations") + ] + assert all(matrix_uuid == "1234" for matrix_uuid in matrix_uuids) + + +def test_model_scoring_inspections(db_engine_with_results_schema): + testing_metric_groups = [ + { + "metrics": ["precision@", "recall@", "fpr@"], + "thresholds": {"percentiles": [50.0], "top_n": [3]}, + }, + { + # ensure we test a non-thresholded metric as well + "metrics": ["accuracy"] + }, + ] + training_metric_groups = [ + {"metrics": ["accuracy"], "thresholds": {"percentiles": [50.0]}} + ] + + model_evaluator = ModelEvaluator( + testing_metric_groups, + training_metric_groups, + db_engine_with_results_schema, + ) + + testing_labels = numpy.array([True, False, numpy.nan, True, False]) + testing_prediction_probas = numpy.array([0.56, 0.4, 0.55, 0.5, 0.3]) + + training_labels = numpy.array( + [False, False, True, True, True, False, True, True] + ) + training_prediction_probas = numpy.array( + [0.6, 0.4, 0.55, 0.70, 0.3, 0.2, 0.8, 0.6] + ) + + fake_train_matrix_store = MockMatrixStore( + "train", "efgh", 5, db_engine_with_results_schema, training_labels + ) + fake_test_matrix_store = MockMatrixStore( + "test", "1234", 5, db_engine_with_results_schema, testing_labels + ) + + trained_model, model_id = fake_trained_model( + db_engine_with_results_schema, + train_end_time=TRAIN_END_TIME, + ) + + # Evaluate testing matrix and test the results + model_evaluator.evaluate( + testing_prediction_probas, fake_test_matrix_store, model_id + ) + for record in db_engine_with_results_schema.execute( + """select * from test_results.evaluations + where model_id = %s and evaluation_start_time = %s + order by 1""", + (model_id, fake_test_matrix_store.as_of_dates[0]), + ): + assert record["num_labeled_examples"] == 4 + assert record["num_positive_labels"] == 2 + if record["parameter"] == "": + assert record["num_labeled_above_threshold"] == 4 + elif "pct" in record["parameter"]: + assert record["num_labeled_above_threshold"] == 1 + else: + assert record["num_labeled_above_threshold"] == 2 + + # Evaluate the training matrix and test the results + model_evaluator.evaluate( + training_prediction_probas, fake_train_matrix_store, model_id + ) + for record in db_engine_with_results_schema.execute( + """select * from train_results.evaluations + where model_id = %s and evaluation_start_time = %s + order by 1""", + (model_id, fake_train_matrix_store.as_of_dates[0]), + ): + assert record["num_labeled_examples"] == 8 + assert record["num_positive_labels"] == 5 + assert record["value"] == 0.625 + + +def test_ModelEvaluator_needs_evaluation(db_engine_with_results_schema): # TEST SETUP: # create two models: one that has zero evaluations, # one that has an evaluation for precision@100_abs + # both overall and for each subset model_with_evaluations = ModelFactory() model_without_evaluations = ModelFactory() eval_time = datetime.datetime(2016, 1, 1) as_of_date_frequency = "3d" - EvaluationFactory( - model_rel=model_with_evaluations, - evaluation_start_time=eval_time, - evaluation_end_time=eval_time, - as_of_date_frequency=as_of_date_frequency, - metric="precision@", - parameter="100_abs" - ) + for subset_hash in [""] + [filename_friendly_hash(subset) for subset in SUBSETS]: + EvaluationFactory( + model_rel=model_with_evaluations, + evaluation_start_time=eval_time, + evaluation_end_time=eval_time, + as_of_date_frequency=as_of_date_frequency, + metric="precision@", + parameter="100_abs", + subset_hash=subset_hash, + ) session.commit() # make a test matrix to pass in metadata_overrides = { 'as_of_date_frequency': as_of_date_frequency, - 'end_time': eval_time, + 'as_of_times': [eval_time], } test_matrix_store = MockMatrixStore( - "test", "1234", 5, db_engine, metadata_overrides=metadata_overrides + "test", "1234", 5, db_engine_with_results_schema, metadata_overrides=metadata_overrides ) train_matrix_store = MockMatrixStore( - "train", "2345", 5, db_engine, metadata_overrides=metadata_overrides + "train", "2345", 5, db_engine_with_results_schema, metadata_overrides=metadata_overrides ) # the evaluated model has test evaluations for precision, but not recall, # so this needs evaluations - assert ModelEvaluator( - testing_metric_groups=[{ - "metrics": ["precision@", "recall@"], - "thresholds": {"top_n": [100]}, - }], - training_metric_groups=[], - db_engine=db_engine - ).needs_evaluations( - matrix_store=test_matrix_store, - model_id=model_with_evaluations.model_id, - ) + for subset in [None] + SUBSETS: + if not subset: + subset_hash = "" + else: + subset_hash = filename_friendly_hash(subset) + + assert ModelEvaluator( + testing_metric_groups=[{ + "metrics": ["precision@", "recall@"], + "thresholds": {"top_n": [100]}, + }], + training_metric_groups=[], + db_engine=db_engine_with_results_schema, + ).needs_evaluations( + matrix_store=test_matrix_store, + model_id=model_with_evaluations.model_id, + subset_hash=subset_hash, + ) # the evaluated model has test evaluations for precision, # so this should not need evaluations - assert not ModelEvaluator( - testing_metric_groups=[{ - "metrics": ["precision@"], - "thresholds": {"top_n": [100]}, - }], - training_metric_groups=[], - db_engine=db_engine - ).needs_evaluations( - matrix_store=test_matrix_store, - model_id=model_with_evaluations.model_id, - ) + for subset in [None] + SUBSETS: + if not subset: + subset_hash = "" + else: + subset_hash = filename_friendly_hash(subset) + + assert not ModelEvaluator( + testing_metric_groups=[{ + "metrics": ["precision@"], + "thresholds": {"top_n": [100]}, + }], + training_metric_groups=[], + db_engine=db_engine_with_results_schema, + ).needs_evaluations( + matrix_store=test_matrix_store, + model_id=model_with_evaluations.model_id, + subset_hash=subset_hash, + ) # the non-evaluated model has no evaluations, # so this should need evaluations - assert ModelEvaluator( - testing_metric_groups=[{ - "metrics": ["precision@"], - "thresholds": {"top_n": [100]}, - }], - training_metric_groups=[], - db_engine=db_engine - ).needs_evaluations( - matrix_store=test_matrix_store, - model_id=model_without_evaluations.model_id, - ) + for subset in [None] + SUBSETS: + if not subset: + subset_hash = "" + else: + subset_hash = filename_friendly_hash(subset) + + assert ModelEvaluator( + testing_metric_groups=[{ + "metrics": ["precision@"], + "thresholds": {"top_n": [100]}, + }], + training_metric_groups=[], + db_engine=db_engine_with_results_schema, + ).needs_evaluations( + matrix_store=test_matrix_store, + model_id=model_without_evaluations.model_id, + subset_hash=subset_hash, + ) # the evaluated model has no *train* evaluations, # so the train matrix should need evaluations - assert ModelEvaluator( - testing_metric_groups=[{ - "metrics": ["precision@"], - "thresholds": {"top_n": [100]}, - }], - training_metric_groups=[{ - "metrics": ["precision@"], - "thresholds": {"top_n": [100]}, - }], - db_engine=db_engine - ).needs_evaluations( - matrix_store=train_matrix_store, - model_id=model_with_evaluations.model_id, - ) + for subset in [None] + SUBSETS: + if not subset: + subset_hash = "" + else: + subset_hash = filename_friendly_hash(subset) + + assert ModelEvaluator( + testing_metric_groups=[{ + "metrics": ["precision@"], + "thresholds": {"top_n": [100]}, + }], + training_metric_groups=[{ + "metrics": ["precision@"], + "thresholds": {"top_n": [100]}, + }], + db_engine=db_engine_with_results_schema, + ).needs_evaluations( + matrix_store=train_matrix_store, + model_id=model_with_evaluations.model_id, + subset_hash=subset_hash, + ) session.close() session.remove() diff --git a/src/tests/results_tests/factories.py b/src/tests/results_tests/factories.py index ce891077e..623845cff 100644 --- a/src/tests/results_tests/factories.py +++ b/src/tests/results_tests/factories.py @@ -90,6 +90,16 @@ class ModelFactory(BaseModelFactory): experiment_association = factory.RelatedFactory(ExperimentModelFactory, 'model_rel') +class SubsetFactory(factory.alchemy.SQLAlchemyModelFactory): + class Meta: + model = schema.Subset + sqlalchemy_session = session + + subset_hash = factory.fuzzy.FuzzyText() + config = {} + created_timestamp = factory.fuzzy.FuzzyNaiveDateTime(datetime(2008, 1, 1)) + + class FeatureImportanceFactory(factory.alchemy.SQLAlchemyModelFactory): class Meta: model = schema.FeatureImportance @@ -118,6 +128,7 @@ class Meta: test_label_timespan = "3m" + class ListPredictionFactory(factory.alchemy.SQLAlchemyModelFactory): class Meta: model = schema.ListPrediction @@ -153,6 +164,7 @@ class Meta: sqlalchemy_session = session model_rel = factory.SubFactory(ModelFactory) + subset_hash = '' evaluation_start_time = factory.fuzzy.FuzzyNaiveDateTime(datetime(2008, 1, 1)) evaluation_end_time = factory.fuzzy.FuzzyNaiveDateTime(datetime(2008, 1, 1)) as_of_date_frequency = "3d" @@ -167,6 +179,25 @@ class Meta: matrix_uuid = factory.SelfAttribute("matrix_rel.matrix_uuid") +class SubsetEvaluationFactory(factory.alchemy.SQLAlchemyModelFactory): + class Meta: + model = schema.TestEvaluation + sqlalchemy_session = session + + model_rel = factory.SubFactory(ModelFactory) + subset_rel = factory.SubFactory(SubsetFactory) + evaluation_start_time = factory.fuzzy.FuzzyNaiveDateTime(datetime(2008, 1, 1)) + evaluation_end_time = factory.fuzzy.FuzzyNaiveDateTime(datetime(2008, 1, 1)) + as_of_date_frequency = "3d" + metric = "precision@" + parameter = "100_abs" + value = factory.fuzzy.FuzzyDecimal(0, 1) + num_labeled_examples = 10 + num_labeled_above_threshold = 8 + num_positive_labels = 5 + sort_seed = 8 + + def init_engine(new_engine): global sessionmaker, engine, session engine = new_engine diff --git a/src/tests/results_tests/test_factories.py b/src/tests/results_tests/test_factories.py index 8fc68aa16..f1bef4cca 100644 --- a/src/tests/results_tests/test_factories.py +++ b/src/tests/results_tests/test_factories.py @@ -6,6 +6,7 @@ from .factories import ( ModelGroupFactory, ModelFactory, + SubsetFactory, EvaluationFactory, PredictionFactory, IndividualImportanceFactory, @@ -14,7 +15,7 @@ ) -def test_evaluation_factories(): +def test_evaluation_factories_no_subset(): with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) Base.metadata.create_all(engine) @@ -28,17 +29,55 @@ def test_evaluation_factories(): ) session.commit() results = engine.execute( + """\ + select + model_group_id, + m.model_id, + e.metric, + e.value, + e.subset_hash + from + test_results.evaluations e + join model_metadata.models m using (model_id) """ + ) + for model_group_id, model_id, metric, value, subset_hash in results: + # if the evaluations are created with the model group and model, + # as opposed to an autoprovisioned one, + # the ids in a fresh DB should be 1 + assert model_group_id == 1 + assert model_id == 1 + assert not subset_hash + + +def test_subset_evaluation_factories_with_subset(): + with testing.postgresql.Postgresql() as postgresql: + engine = create_engine(postgresql.url()) + Base.metadata.create_all(engine) + init_engine(engine) + + model_group = ModelGroupFactory() + model = ModelFactory(model_group_rel=model_group) + subset = SubsetFactory() + for metric, value in [("precision@", 0.4), ("recall@", 0.3)]: + EvaluationFactory( + model_rel=model, metric=metric, parameter="100_abs", value=value + ) + session.commit() + results = engine.execute( + """\ select model_group_id, m.model_id, + s.subset_hash as s_subset_hash, + e.subset_hash as e_subset_hash, e.metric, e.value from test_results.evaluations e join model_metadata.models m using (model_id) - join model_metadata.matrices test_matrix on (e.matrix_uuid = test_matrix.matrix_uuid) - """ + join model_metadata.subsets s using (subset_hash) + """ ) for model_group_id, model_id, metric, value in results: # if the evaluations are created with the model group and model, @@ -46,6 +85,7 @@ def test_evaluation_factories(): # the ids in a fresh DB should be 1 assert model_group_id == 1 assert model_id == 1 + assert s_subset_hash == e_subset_hash def test_prediction_factories(): @@ -53,34 +93,41 @@ def test_prediction_factories(): engine = create_engine(postgresql.url()) Base.metadata.create_all(engine) init_engine(engine) - - # create some basic predictions, but with the same model group and - # model to test the factory relationships + model_group = ModelGroupFactory() model = ModelFactory(model_group_rel=model_group) - for entity_id, as_of_date in [ + + entity_dates = [ (1, "2016-01-01"), (1, "2016-04-01"), (2, "2016-01-01"), (2, "2016-04-01"), (3, "2016-01-01"), (3, "2016-04-01"), - ]: - PredictionFactory( + ] + + for entity_id, as_of_date in entity_dates: + IndividualImportanceFactory( model_rel=model, entity_id=entity_id, as_of_date=as_of_date ) - IndividualImportanceFactory( + session.commit() + + # create some basic predictions, but with the same model group and + # model to test the factory relationships + for entity_id, as_of_date in entity_dates: + PredictionFactory( model_rel=model, entity_id=entity_id, as_of_date=as_of_date ) session.commit() + results = engine.execute( - """ + f""" select m.*, p.* from test_results.predictions p join model_metadata.models m using (model_id) join test_results.individual_importances i using (model_id, entity_id, as_of_date) - """ + """ ) assert len([row for row in results]) == 6 # if the predictions are created with the model, diff --git a/src/tests/test_experiments.py b/src/tests/test_experiments.py index cf56a274d..fda05e2ed 100644 --- a/src/tests/test_experiments.py +++ b/src/tests/test_experiments.py @@ -135,8 +135,31 @@ def test_simple_experiment(experiment_class, matrix_storage_class): ] ) assert num_evaluations > 0 + + # 5. subset evaluations linked to subsets and predictions linked to + # models, for training and testing + for set_type in ("train", "test"): + num_evaluations = len( + [ + row + for row in db_engine.execute( + """ + select * from {}_results.evaluations e + join model_metadata.models using (model_id) + join model_metadata.subsets using (subset_hash) + join {}_results.predictions p on ( + e.model_id = p.model_id and + e.evaluation_start_time <= p.as_of_date and + e.evaluation_end_time >= p.as_of_date) + """.format( + set_type, set_type + ) + ) + ] + ) + assert num_evaluations > 0 - # 5. experiment + # 6. experiment num_experiments = len( [ row @@ -145,7 +168,7 @@ def test_simple_experiment(experiment_class, matrix_storage_class): ) assert num_experiments == 1 - # 6. that models are linked to experiments + # 7. that models are linked to experiments num_models_with_experiment = len( [ row @@ -160,7 +183,7 @@ def test_simple_experiment(experiment_class, matrix_storage_class): ) assert num_models == num_models_with_experiment - # 7. that models have the train end date and label timespan + # 8. that models have the train end date and label timespan results = [ (model["train_end_time"], model["training_label_timespan"]) for model in db_engine.execute("select * from model_metadata.models") @@ -170,7 +193,7 @@ def test_simple_experiment(experiment_class, matrix_storage_class): (datetime(2013, 6, 1), timedelta(180)), ] - # 8. that the right number of individual importances are present + # 9. that the right number of individual importances are present individual_importances = [ row for row in db_engine.execute( @@ -182,7 +205,7 @@ def test_simple_experiment(experiment_class, matrix_storage_class): ] assert len(individual_importances) == num_predictions * 2 # only 2 features - # 9. Checking the proper matrices created and stored + # 10. Checking the proper matrices created and stored matrices = [ row for row in db_engine.execute( @@ -198,7 +221,7 @@ def test_simple_experiment(experiment_class, matrix_storage_class): assert i > 0 assert len(matrices) == 4 - # 10. Checking that all matrices are associated with the experiment + # 11. Checking that all matrices are associated with the experiment linked_matrices = list(db_engine.execute( """select * from model_metadata.matrices join model_metadata.experiment_matrices using (matrix_uuid) @@ -265,8 +288,8 @@ def test_noload_if_wrong_version(self): @parametrize_experiment_classes @mock.patch( - "triage.component.architect.cohort_table_generators." - "CohortTableGenerator.clean_up", + "triage.component.architect.entity_date_table_generators." + "EntityDateTableGenerator.clean_up", side_effect=lambda: time.sleep(1), ) def test_cleanup_timeout(_clean_up_mock, experiment_class): @@ -307,8 +330,8 @@ def test_build_error(experiment_class): @parametrize_experiment_classes @mock.patch( - "triage.component.architect.cohort_table_generators." - "CohortTableGenerator.clean_up", + "triage.component.architect.entity_date_table_generators." + "EntityDateTableGenerator.clean_up", side_effect=lambda: time.sleep(1), ) def test_build_error_cleanup_timeout(_clean_up_mock, experiment_class): diff --git a/src/tests/utils.py b/src/tests/utils.py index 6cc07c053..e920ccef6 100644 --- a/src/tests/utils.py +++ b/src/tests/utils.py @@ -61,6 +61,7 @@ def __init__( init_labels=None, metadata_overrides=None, matrix=None, + init_as_of_dates=None, ): base_metadata = { "feature_start_time": datetime.date(2014, 1, 1), @@ -71,6 +72,7 @@ def __init__( "label_timespan": "3month", "indices": ["entity_id"], "matrix_type": matrix_type, + "as_of_times": [datetime.date(2014, 10, 1), datetime.date(2014, 7, 1)], } metadata_overrides = metadata_overrides or {} base_metadata.update(metadata_overrides) @@ -91,20 +93,35 @@ def __init__( self.label_count = label_count self.init_labels = init_labels self.matrix_uuid = matrix_uuid + if init_as_of_dates is None: + init_as_of_dates = [] + self.init_as_of_dates = init_as_of_dates session = sessionmaker(db_engine)() session.add(Matrix(matrix_uuid=matrix_uuid)) session.commit() + + @property + def as_of_dates(self): + """The list of as-of-dates in the matrix""" + if len(self.init_as_of_dates) == 0: + return self.metadata["as_of_times"] + else: + return self.init_as_of_dates @property def labels(self): - if self.init_labels == []: + if len(self.init_labels) == 0: return fake_labels(self.label_count) else: return self.init_labels -def fake_trained_model(db_engine, train_matrix_uuid="efgh"): +def fake_trained_model( + db_engine, + train_matrix_uuid="efgh", + train_end_time=datetime.datetime(2016, 1, 1) +): """Creates and stores a trivial trained model and training matrix Args: @@ -118,7 +135,11 @@ def fake_trained_model(db_engine, train_matrix_uuid="efgh"): # Create the fake trained model and store in db trained_model = MockTrainedModel() - db_model = Model(model_hash="abcd", train_matrix_uuid=train_matrix_uuid) + db_model = Model( + model_hash="abcd", + train_matrix_uuid=train_matrix_uuid, + train_end_time=train_end_time, + ) session.add(db_model) session.commit() return trained_model, db_model.model_id @@ -345,6 +366,17 @@ def sample_config(): "training_metric_groups": [ {"metrics": ["precision@"], "thresholds": {"top_n": [3]}} ], + "subsets": [ + { + "name": "evens", + "query": """\ + select distinct entity_id + from events + where entity_id %% 2 = 0 + and outcome_date < '{as_of_date}'::date + """, + }, + ] } grid_config = { diff --git a/src/triage/component/architect/cohort_table_generators.py b/src/triage/component/architect/entity_date_table_generators.py similarity index 54% rename from src/triage/component/architect/cohort_table_generators.py rename to src/triage/component/architect/entity_date_table_generators.py index 5157f3f2e..9eca268c2 100644 --- a/src/triage/component/architect/cohort_table_generators.py +++ b/src/triage/component/architect/entity_date_table_generators.py @@ -7,14 +7,14 @@ DEFAULT_ACTIVE_STATE = "active" -class CohortTableGenerator(object): - """Create a table containing cohort membership on different dates +class EntityDateTableGenerator(object): + """Create a table containing state membership on different dates The structure of the output table is: entity_id date active (boolean): Whether or not the entity is considered 'active' - (in the cohort) on that date + (i.e., in the cohort or subset) on that date Args: db_engine (sqlalchemy.engine) @@ -24,85 +24,95 @@ class CohortTableGenerator(object): replace (boolean) Whether or not to overwrite old rows. If false, each as-of-date will query to see if there are existing rows and not run the query if so. - If true, the existing cohort table will be dropped and recreated. + If true, the existing table will be dropped and recreated. """ - def __init__(self, query, db_engine, cohort_table_name, replace=True): + def __init__(self, query, db_engine, entity_date_table_name, replace=True): self.db_engine = db_engine self.query = query - self.cohort_table_name = cohort_table_name + self.entity_date_table_name = entity_date_table_name self.replace = replace - def generate_cohort_table(self, as_of_dates): + def generate_entity_date_table(self, as_of_dates): """Convert the object's input table - into a cohort states table for the given as_of_dates + into a states table for the given as_of_dates Args: - as_of_dates (list of datetime.dates) Dates to include in the cohort + as_of_dates (list of datetime.dates) Dates to include in the state table """ - logging.debug("Generating cohort table using as_of_dates: %s", as_of_dates) - self._create_and_populate_cohort_table(as_of_dates) + logging.debug( + "Generating entity_date table %s using as_of_dates: %s", + self.entity_date_table_name, + as_of_dates, + ) + self._create_and_populate_entity_date_table(as_of_dates) self.db_engine.execute( - "create index on {} (entity_id, as_of_date)".format(self.cohort_table_name) + "create index on {} (entity_id, as_of_date)".format(self.entity_date_table_name) ) logging.info( - "Indices created on entity_id and as_of_date for cohort table" + "Indices created on entity_id and as_of_date for entity_date table %s", + self.entity_date_table_name, ) - if not table_has_data(self.cohort_table_name, self.db_engine): + if not table_has_data(self.entity_date_table_name, self.db_engine): raise ValueError(self._empty_table_message(as_of_dates)) - logging.info("Cohort table generated at %s", self.cohort_table_name) - logging.info("Generating stats on %s", self.cohort_table_name) + logging.info("Entity-date table generated at %s", self.entity_date_table_name) + logging.info("Generating stats on %s", self.entity_date_table_name) logging.info( "Row count of %s: %s", - self.cohort_table_name, - table_row_count(self.cohort_table_name, self.db_engine), + self.entity_date_table_name, + table_row_count(self.entity_date_table_name, self.db_engine), ) - def _maybe_create_cohort_table(self): - if self.replace or not table_exists(self.cohort_table_name, self.db_engine): - self.db_engine.execute(f"drop table if exists {self.cohort_table_name}") + def _maybe_create_entity_date_table(self): + if self.replace or not table_exists(self.entity_date_table_name, self.db_engine): + self.db_engine.execute(f"drop table if exists {self.entity_date_table_name}") self.db_engine.execute( - f"""create table {self.cohort_table_name} ( + f"""create table {self.entity_date_table_name} ( entity_id integer, as_of_date timestamp, {DEFAULT_ACTIVE_STATE} boolean ) """ ) - logging.info("Created cohort table") + logging.info("Created entity_date table %s", self.entity_date_table_name) else: - logging.info("Not dropping and recreating cohort table because " - "replace flag was set to False and table was found to exist") + logging.info( + "Not dropping and recreating entity_date %s table because " + "replace flag was set to False and table was found to exist", + self.entity_date_table_name, + ) - def _create_and_populate_cohort_table(self, as_of_dates): - """Create a cohort table by sequentially running a + def _create_and_populate_entity_date_table(self, as_of_dates): + """Create an entity_date table by sequentially running a given date-parameterized query for all known dates. Args: as_of_dates (list of datetime.date): Dates to calculate entity states as of """ - self._maybe_create_cohort_table() - logging.info("Inserting rows into cohort table") + self._maybe_create_entity_date_table() + logging.info("Inserting rows into entity_date table %s", self.entity_date_table_name) for as_of_date in as_of_dates: formatted_date = f"{as_of_date.isoformat()}" - logging.info("Looking for existing cohort rows for as of date %s", as_of_date) + logging.info("Looking for existing entity_date rows for as of date %s", as_of_date) any_existing = list(self.db_engine.execute( - f"""select 1 from {self.cohort_table_name} + f"""select 1 from {self.entity_date_table_name} where as_of_date = '{formatted_date}' limit 1 """ )) if len(any_existing) == 1: - logging.info("Since >0 cohort rows found for date %s, skipping", as_of_date) + logging.info("Since >0 entity_date rows found for date %s, skipping", as_of_date) continue dated_query = self.query.format(as_of_date=formatted_date) - full_query = f"""insert into {self.cohort_table_name} + full_query = f"""insert into {self.entity_date_table_name} select q.entity_id, '{formatted_date}'::timestamp, true from ({dated_query}) q group by 1, 2, 3 """ - logging.info("Running cohort query for date: %s, %s", as_of_date, full_query) + logging.info(type(as_of_date)) + logging.info(type(full_query)) + logging.info("Running entity_date query for date: %s, %s", as_of_date, full_query) self.db_engine.execute(full_query) def _empty_table_message(self, as_of_dates): @@ -119,14 +129,14 @@ def _empty_table_message(self, as_of_dates): ) def clean_up(self): - self.db_engine.execute("drop table if exists {}".format(self.cohort_table_name)) + self.db_engine.execute("drop table if exists {}".format(self.entity_date_table_name)) -class CohortTableGeneratorNoOp(CohortTableGenerator): +class EntityDateTableGeneratorNoOp(EntityDateTableGenerator): def __init__(self): pass - def generate_cohort_table(self, as_of_dates): + def generate_entity_date_table(self, as_of_dates): logging.warning( "No cohort configuration is available, so no cohort will be created" ) @@ -137,5 +147,5 @@ def clean_up(self): return @property - def cohort_table_name(self): + def entity_date_table_name(self): return None diff --git a/src/triage/component/architect/planner.py b/src/triage/component/architect/planner.py index 78b7523e2..ee1717d2c 100644 --- a/src/triage/component/architect/planner.py +++ b/src/triage/component/architect/planner.py @@ -4,7 +4,7 @@ from triage.component import metta -from . import utils, cohort_table_generators +from . import utils, entity_date_table_generators class Planner(object): @@ -93,7 +93,7 @@ def _make_metadata( "test_label_timespan", matrix_definition.get("training_label_timespan", "0 days"), ), - "state": cohort_table_generators.DEFAULT_ACTIVE_STATE, + "state": entity_date_table_generators.DEFAULT_ACTIVE_STATE, "cohort_name": cohort_name, "matrix_id": matrix_id, "matrix_type": matrix_type, diff --git a/src/triage/component/catwalk/__init__.py b/src/triage/component/catwalk/__init__.py index 16a006f0e..ce8afd17a 100644 --- a/src/triage/component/catwalk/__init__.py +++ b/src/triage/component/catwalk/__init__.py @@ -4,6 +4,8 @@ from .evaluation import ModelEvaluator from .individual_importance import IndividualImportanceCalculator from .model_grouping import ModelGrouper +from .subsetters import Subsetter +from .utils import filename_friendly_hash import logging @@ -15,13 +17,15 @@ def __init__( model_trainer, model_evaluator, individual_importance_calculator, - predictor + predictor, + subsets, ): self.matrix_storage_engine = matrix_storage_engine self.model_trainer = model_trainer self.model_evaluator = model_evaluator self.individual_importance_calculator = individual_importance_calculator self.predictor = predictor + self.subsets = subsets def generate_tasks(self, split, grid_config, model_comment=None): logging.info("Generating train/test tasks for split %s", split["train_uuid"]) @@ -104,14 +108,16 @@ def process_task(self, test_store, train_store, train_kwargs): # Generate predictions for the testing data then training data for store in (test_store, train_store): - if self.predictor.replace or self.model_evaluator.needs_evaluations(store, model_id): + + if self.predictor.replace: logging.info( - "The evaluations needed for matrix %s-%s and model %s" - "are not all present in db, so predicting and evaluating", + "Replace flag set; generating new predictions and evaluations for" + "matrix %s-%s, and model %s", store.uuid, store.matrix_type, model_id ) + predictions_proba = self.predictor.predict( model_id, store, @@ -119,19 +125,46 @@ def process_task(self, test_store, train_store, train_kwargs): train_matrix_columns=train_store.columns(), ) - self.model_evaluator.evaluate( - predictions_proba=predictions_proba, - matrix_store=store, - model_id=model_id, - ) + for subset in self.subsets: + self.model_evaluator.evaluate( + predictions_proba=predictions_proba, + matrix_store=store, + model_id=model_id, + subset=subset, + ) + else: - logging.info( - "The evaluations needed for matrix %s-%s and model %s are all present" - "in db from a previous run (or none needed at all), so skipping!", - store.uuid, - store.matrix_type, - model_id - ) + for subset in self.subsets: + if subset: + subset_hash = filename_friendly_hash(subset) + else: + subset_hash = "" + + if self.model_evaluator.needs_evaluations(store, model_id, subset_hash): + logging.info( + "The evaluations needed for matrix %s-%s, subset %s, and model %s" + "are not all present in db, so evaluating", + store.uuid, + store.matrix_type, + subset_hash, + model_id + ) + self.model_evaluator.evaluate( + predictions_proba=predictions_proba, + matrix_store=store, + model_id=model_id, + subset=subset, + ) + else: + logging.info( + "The evaluations needed for matrix %s-%s, subset %s, and " + "model %s are all present" + "in db from a previous run (or none needed at all), so skipping!", + store.uuid, + store.matrix_type, + subset_hash, + model_id + ) __all__ = ( @@ -140,5 +173,6 @@ def process_task(self, test_store, train_store, train_kwargs): "ModelGrouper" "ModelTrainer", "Predictor", - "ModelTrainTester" + "ModelTrainTester", + "Subsetter", ) diff --git a/src/triage/component/catwalk/evaluation.py b/src/triage/component/catwalk/evaluation.py index 9e59fbca4..211b6e7a8 100644 --- a/src/triage/component/catwalk/evaluation.py +++ b/src/triage/component/catwalk/evaluation.py @@ -1,24 +1,73 @@ import functools +import io import logging import time +import warnings import numpy +import pandas from sqlalchemy.orm import sessionmaker from . import metrics -from .utils import db_retry, sort_predictions_and_labels +from .utils import ( + db_retry, + sort_predictions_and_labels, + get_subset_table_name, + filename_friendly_hash +) + + +def subset_labels_and_predictions( + subset_df, + labels, + predictions_proba, +): + indexed_predictions = pandas.Series(predictions_proba, index=labels.index) + + # The subset isn't specific to the cohort, so inner join to the labels/predictions + labels_subset = labels.align(subset_df, join="inner")[0] + predictions_subset = indexed_predictions.align(subset_df, join="inner")[0].values + + logging.debug(f"{len(labels_subset)} entities in subset out of {len(labels)} in matrix.") + + return (labels_subset, predictions_subset) + + +def query_subset_table(db_engine, as_of_dates, subset_table_name): + as_of_dates = [date.strftime("%Y-%m-%d %H:%M:%S.%f") for date in as_of_dates], + query_string = f""" + with dates as ( + select unnest(array{list(as_of_dates)}::timestamp[]) as as_of_date + ) + select entity_id, as_of_date, active + from {subset_table_name} + join dates using(as_of_date) + """ + copy_sql = f"COPY ({query_string}) TO STDOUT WITH CSV HEADER" + conn = db_engine.raw_connection() + cur = conn.cursor() + out = io.StringIO() + logging.debug(f"Running query {copy_sql} to get subset") + cur.copy_expert(copy_sql, out) + out.seek(0) + df = pandas.read_csv( + out, parse_dates=["as_of_date"], + index_col=["entity_id", "as_of_date"] + ) + + return df def generate_binary_at_x(test_predictions, x_value, unit="top_n"): - """Generate subset of predictions based on top% or absolute + """Assign predicted classes based based on top% or absolute rank of score Args: test_predictions (list) A list of predictions, sorted by risk desc x_value (int) The percentile or absolute value desired - unit (string, default 'top_n') The subsetting method desired, + unit (string, default 'top_n') The thresholding method desired, either percentile or top_n - Returns: (list) The predictions subset + Returns: (list) The predicted classes """ if unit == "percentile": cutoff_index = int(len(test_predictions) * (x_value / 100.00)) @@ -88,6 +137,8 @@ def __init__( training_metric_groups (list) metrics to be calculated on training set, in the same form as testing_metric_groups db_engine (sqlalchemy.engine) + sort_seed (int) the seed to set in random.set_seed() to break ties + when sorting predictions custom_metrics (dict) Functions to generate metrics not available by default Each function is expected take in the following params: @@ -184,8 +235,8 @@ def _evaluations_for_threshold( threshold_value (int) the numeric threshold, threshold_specified_by_user (bool) Whether or not there was any threshold specified by the user. Defaults to True - evaluation_table_obj (schema.TestEvaluation or TrainEvaluation) - specifies to which table to add the evaluations + evaluation_table_obj (schema.TestEvaluation or TrainEvaluation) specifies to + which table to add the evaluations Returns: (list) results_schema.TrainEvaluation or TestEvaluation objects Raises: UnknownMetricError if a given metric is not present in @@ -209,12 +260,20 @@ def _evaluations_for_threshold( raise metrics.UnknownMetricError() for parameter_combination in parameters: - value = self.available_metrics[metric]( - predictions_proba, - predicted_classes_with_labels, - present_labels, - parameter_combination, - ) + try: + value = self.available_metrics[metric]( + predictions_proba, + predicted_classes_with_labels, + present_labels, + parameter_combination, + ) + except (ValueError, IndexError) as e: + warnings.warn( + f"{metric} not defined for parameter " + "{parameter_combination}. Inserting NULL for value.", + RuntimeWarning + ) + value = None # convert the thresholds/parameters into something # more readable @@ -250,7 +309,11 @@ def _evaluations_for_threshold( return evaluations def _evaluations_for_group( - self, group, predictions_proba_sorted, labels_sorted, evaluation_table_obj + self, + group, + predictions_proba_sorted, + labels_sorted, + evaluation_table_obj, ): """Generate evaluations for a given metric group, and create ORM objects to hold them @@ -259,7 +322,9 @@ def _evaluations_for_group( Should contain the key 'metrics', and optionally 'parameters' or 'thresholds' predictions_proba (list) Probability predictions labels (list) True labels (may have NaNs) - + evaluation_table_obj (schema.TestEvaluation or TrainEvaluation) specifies to + which table to add the evaluations + Returns: (list) results_schema.Evaluation objects """ logging.info("Creating evaluations for metric group %s", group) @@ -296,13 +361,14 @@ def _evaluations_for_group( ) return evaluations - def needs_evaluations(self, matrix_store, model_id): + def needs_evaluations(self, matrix_store, model_id, subset_hash=""): """Returns whether or not all the configured metrics are present in the database for the given matrix and model. Args: matrix_store (triage.component.catwalk.storage.MatrixStore) model_id (int) A model id + subset_hash (str) An identifier for the subset to be evaluated Returns: (bool) whether or not this matrix and model are missing any evaluations in the db @@ -330,6 +396,7 @@ def needs_evaluations(self, matrix_store, model_id): evaluation_start_time=matrix_store.as_of_dates[0], evaluation_end_time=matrix_store.as_of_dates[-1], as_of_date_frequency=matrix_store.metadata["as_of_date_frequency"], + subset_hash=subset_hash, ).distinct(eval_obj.metric, eval_obj.parameter).all() # The list of needed metrics and parameters are all the unique metric/params from the config @@ -341,7 +408,13 @@ def needs_evaluations(self, matrix_store, model_id): session.close() return needed - def evaluate(self, predictions_proba, matrix_store, model_id): + def evaluate( + self, + predictions_proba, + matrix_store, + model_id, + subset=None, + ): """Evaluate a model based on predictions, and save the results Args: @@ -349,28 +422,47 @@ def evaluate(self, predictions_proba, matrix_store, model_id): matrix_store (catwalk.storage.MatrixStore) a wrapper for the prediction matrix and metadata model_id (int) The database identifier of the model + subset (dict) A dictionary containing a query and a + name for the subset to evaluate on, if any """ - labels = matrix_store.labels + evaluation_table_obj = matrix_store.matrix_type.evaluation_obj + + # If we are evaluating on a subset, we want to get just the labels and + # predictions for the included entity-date pairs + if subset: + logging.info("Subsetting labels and predictions") + labels, predictions_proba = subset_labels_and_predictions( + subset_df=query_subset_table( + self.db_engine, + matrix_store.as_of_dates, + get_subset_table_name(subset), + ), + predictions_proba=predictions_proba, + labels=matrix_store.labels, + ) + subset_hash = filename_friendly_hash(subset) + else: + labels = matrix_store.labels + subset_hash = "" + matrix_type = matrix_store.matrix_type.string_name evaluation_start_time = matrix_store.as_of_dates[0] evaluation_end_time = matrix_store.as_of_dates[-1] as_of_date_frequency = matrix_store.metadata["as_of_date_frequency"] - # Specifies which evaluation table to write to: TestEvaluation or TrainEvaluation - evaluation_table_obj = matrix_store.matrix_type.evaluation_obj - logging.info( "Generating evaluations for model id %s, evaluation range %s-%s, " - "as_of_date frequency %s", + "as_of_date frequency %s, subset %s", model_id, evaluation_start_time, evaluation_end_time, as_of_date_frequency, + subset_hash ) + predictions_proba_sorted, labels_sorted = sort_predictions_and_labels( predictions_proba, labels, self.sort_seed ) - evaluations = [] matrix_type = matrix_store.matrix_type if matrix_type.is_test: @@ -382,12 +474,17 @@ def evaluate(self, predictions_proba, matrix_store, model_id): group, predictions_proba_sorted, labels_sorted, - matrix_type.evaluation_obj, + evaluation_table_obj ) - logging.info("Writing metrics to db: %s table", matrix_type) + logging.info( + "Writing metrics to db: %s table for subset %s", + matrix_type, + subset_hash + ) self._write_to_db( model_id, + subset_hash, evaluation_start_time, evaluation_end_time, as_of_date_frequency, @@ -395,12 +492,17 @@ def evaluate(self, predictions_proba, matrix_store, model_id): evaluations, evaluation_table_obj, ) - logging.info("Done writing metrics to db: %s table", matrix_type) + logging.info( + "Done writing metrics to db: %s table for subset %s", + matrix_type, + subset_hash + ) @db_retry def _write_to_db( self, model_id, + subset_hash, evaluation_start_time, evaluation_end_time, as_of_date_frequency, @@ -415,10 +517,18 @@ def _write_to_db( Args: model_id (int) primary key of the model - as_of_date (datetime.date) Date the predictions were made as of - evaluations (list) results_schema.TestEvaluation or TrainEvaluation objects + evaluation_start_time (pandas._libs.tslibs.timestamps.Timestamp) + first as_of_date included in the evaluation period + evaluation_end_time (pandas._libs.tslibs.timestamps.Timestamp) last + as_of_date included in the evaluation period + as_of_date_frequency (str) the frequency with which as_of_dates + occur between the evaluation_start_time and evaluation_end_time + evaluations (list) results_schema.TestEvaluation or TrainEvaluation + objects evaluation_table_obj (schema.TestEvaluation or TrainEvaluation) specifies to which table to add the evaluations + subset_hash (str) the hash of the subset, if any, that the + evaluation is made on """ session = self.sessionmaker() @@ -427,14 +537,19 @@ def _write_to_db( evaluation_start_time=evaluation_start_time, evaluation_end_time=evaluation_end_time, as_of_date_frequency=as_of_date_frequency, + subset_hash=subset_hash ).delete() for evaluation in evaluations: evaluation.model_id = model_id + evaluation.as_of_date_frequency = as_of_date_frequency + evaluation.subset_hash = subset_hash evaluation.evaluation_start_time = evaluation_start_time evaluation.evaluation_end_time = evaluation_end_time evaluation.as_of_date_frequency = as_of_date_frequency evaluation.matrix_uuid = matrix_uuid + if subset_hash: + evaluation.subset_hash = subset_hash session.add(evaluation) session.commit() session.close() diff --git a/src/triage/component/catwalk/model_testers.py b/src/triage/component/catwalk/model_testers.py index 8f65b22f3..7822a4111 100644 --- a/src/triage/component/catwalk/model_testers.py +++ b/src/triage/component/catwalk/model_testers.py @@ -41,6 +41,9 @@ def __init__( training_metric_groups=evaluator_config.get("training_metric_groups", []), ) + self.subsets = evaluator_config.get("subsets") + logging.info("ModelTester: self.subsets = %s", self.subsets) + def generate_model_test_tasks(self, split, train_store, model_ids): test_tasks = [] for test_matrix_def, test_uuid in zip( @@ -51,8 +54,8 @@ def generate_model_test_tasks(self, split, train_store, model_ids): if test_store.empty: logging.warning( """Test matrix for uuid %s - was empty, no point in generating predictions. Not creating test task. - """, + was empty, no point in generating predictions. Not creating test task. + """, test_uuid, ) continue @@ -85,12 +88,19 @@ def process_model_test_task(self, test_store, train_store, model_ids): # Generate predictions for the testing data then training data for store in (test_store, train_store): + for subset in [None] + self.subsets: + + logging.info("ModelTester: evaluating subset %s", subset) + if self.replace: should_run = True reason = "Replace flag was set" - elif self.evaluator.needs_evaluations(store, model_id): + elif not subset and self.evaluator.needs_evaluations(store, model_id): + should_run = True + reason = "Needed overall evaluations are not all present in DB" + elif subset and self.evaluator.needs_evaluations(store, model_id, subset["hash"]): should_run = True - reason = "Needed evaluations are not all present in DB" + reason = f"Needed evaluations for {subset['name']}-{subset['hash']} are not all present in DB" elif self.predictor.needs_predictions(store, model_id): should_run = True reason = "Needed predictions are not all present in DB" @@ -100,7 +110,7 @@ def process_model_test_task(self, test_store, train_store, model_ids): if should_run: logging.info( - "Predicting and evaluating for matrix %s-%s and model %s . Reason? %s", + "Predicting and evaluating for matrix %s-%s, and model %s . Reason? %s", store.uuid, store.matrix_type, model_id, @@ -117,6 +127,7 @@ def process_model_test_task(self, test_store, train_store, model_ids): predictions_proba=predictions_proba, matrix_store=store, model_id=model_id, + subset, ) else: logging.info( diff --git a/src/triage/component/catwalk/subsetters.py b/src/triage/component/catwalk/subsetters.py new file mode 100644 index 000000000..db8d290c5 --- /dev/null +++ b/src/triage/component/catwalk/subsetters.py @@ -0,0 +1,61 @@ +import logging + +import pandas +from sqlalchemy.orm import sessionmaker + +from triage.component.architect.entity_date_table_generators import EntityDateTableGenerator +from triage.component.catwalk.utils import (filename_friendly_hash, get_subset_table_name) +from triage.component.results_schema import Subset + +class Subsetter(object): + def __init__( + self, + db_engine, + replace, + as_of_times, + ): + self.db_engine = db_engine + self.replace = replace + self.as_of_times = as_of_times + + def generate_tasks(self, subset_configs): + logging.info("Generating subset table creation tasks") + subset_tasks = [] + for subset_config in subset_configs: + if subset_config: + subset_hash = filename_friendly_hash(subset_config) + subset_table_generator = EntityDateTableGenerator( + entity_date_table_name=get_subset_table_name(subset_config), + db_engine=self.db_engine, + query=subset_config["query"], + replace=self.replace + ) + subset_tasks.append( + { + "subset_config": subset_config, + "subset_hash": subset_hash, + "subset_table_generator": subset_table_generator, + } + ) + return subset_tasks + + def process_all_tasks(self, tasks): + for task in tasks: + self.process_task(**task) + + def process_task(self, subset_config, subset_hash, subset_table_generator): + logging.info( + "Beginning subset creation for %s-%s", + subset_config["name"], + subset_hash + ) + subset_table_generator.generate_entity_date_table( + as_of_dates=self.as_of_times + ) + self.save_subset_to_db(subset_hash, subset_config) + + def save_subset_to_db(self, subset_hash, subset_config): + session = sessionmaker(bind=self.db_engine)() + session.merge(Subset(subset_hash=subset_hash, config=subset_config)) + session.commit() + session.close() diff --git a/src/triage/component/catwalk/utils.py b/src/triage/component/catwalk/utils.py index f2ddef961..2fcea53ad 100644 --- a/src/triage/component/catwalk/utils.py +++ b/src/triage/component/catwalk/utils.py @@ -16,7 +16,8 @@ Matrix, Model, ExperimentMatrix, - ExperimentModel + ExperimentModel, + Subset, ) @@ -31,6 +32,13 @@ def dt_handler(x): ).hexdigest() +def get_subset_table_name(subset_config): + return "subset_{}_{}".format( + subset_config.get("name", "default"), + filename_friendly_hash(subset_config), + ) + + def retry_if_db_error(exception): return isinstance(exception, sqlalchemy.exc.OperationalError) @@ -141,15 +149,19 @@ def __iter__(self): def sort_predictions_and_labels(predictions_proba, labels, sort_seed): - random.seed(sort_seed) - predictions_proba_sorted, labels_sorted = zip( - *sorted( - zip(predictions_proba, labels), - key=lambda pair: (pair[0], random.random()), - reverse=True, + if len(labels) == 0: + logging.debug("No labels present, skipping sorting.") + return predictions_proba, labels + else: + random.seed(sort_seed) + predictions_proba_sorted, labels_sorted = zip( + *sorted( + zip(predictions_proba, labels), + key=lambda pair: (pair[0], random.random()), + reverse=True, + ) ) - ) - return predictions_proba_sorted, labels_sorted + return predictions_proba_sorted, labels_sorted @db_retry diff --git a/src/triage/component/results_schema/README.md b/src/triage/component/results_schema/README.md index 37c1e4129..ec9f13df8 100644 --- a/src/triage/component/results_schema/README.md +++ b/src/triage/component/results_schema/README.md @@ -32,7 +32,7 @@ To make modifications to the schema, you should be working in a cloned version o 2. Make the desired modifications to [results_schema.schema](schema.py). -3. From within the results schema directory, autogenerate a migration: `manage alembic revision --autogenerate` - This will look at the difference between your schema definition and the database, and generate a new file in results_schema/alembic/versions/. +3. From the repo root, autogenerate a migration: `manage alembic revision --autogenerate` - This will look at the difference between your schema definition and the database, and generate a new file in results_schema/alembic/versions/. 4. Inspect the file generated in step 3 and make sure that the changes it is suggesting make sense. Make any modifications you want; the autogenerate functionality is just meant as a guideline. diff --git a/src/triage/component/results_schema/__init__.py b/src/triage/component/results_schema/__init__.py index bdea9c4d4..5af5a32fa 100644 --- a/src/triage/component/results_schema/__init__.py +++ b/src/triage/component/results_schema/__init__.py @@ -17,6 +17,7 @@ ExperimentModel, Model, ModelGroup, + Subset, TestEvaluation, TrainEvaluation, TestPrediction, @@ -35,6 +36,7 @@ "ExperimentModel", "Model", "ModelGroup", + "Subset", "TestEvaluation", "TrainEvaluation", "TestPrediction", diff --git a/src/triage/component/results_schema/alembic/versions/50e1f1bc2cac_add_subsets.py b/src/triage/component/results_schema/alembic/versions/50e1f1bc2cac_add_subsets.py new file mode 100644 index 000000000..53912f48b --- /dev/null +++ b/src/triage/component/results_schema/alembic/versions/50e1f1bc2cac_add_subsets.py @@ -0,0 +1,155 @@ +"""empty message + +Revision ID: 50e1f1bc2cac +Revises: 0bca1ba9706e +Create Date: 2019-02-19 17:14:31.702012 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '50e1f1bc2cac' +down_revision = '0bca1ba9706e' +branch_labels = None +depends_on = None + +def upgrade(): + """ + This upgrade: + 1. adds the model_metadata.subsets table to track evaluation subsets + 2. adds the subset_hash column to the evaluations table, defaulting to + '' for existing evaluations (on the assumption that they were over + the whole cohort) + 3. alters (really, drops and re-adds) the primary key for the + evaluations tables to include the subset_hash + """ + # 1. Add subsets table + op.create_table( + "subsets", + sa.Column("subset_hash", sa.String(), nullable=False), + sa.Column("config", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column( + "created_timestamp", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True + ), + sa.PrimaryKeyConstraint("subset_hash"), + schema="model_metadata", + ) + + # 2. Add subset_hash column + op.add_column( + "evaluations", + sa.Column("subset_hash", sa.String(), nullable=False, server_default=""), + schema="test_results" + ) + op.add_column( + "evaluations", + sa.Column("subset_hash", sa.String(), nullable=False, server_default=""), + schema="train_results" + ) + + # 3. Alter primary keys + # Actual triage databases have been observed with different variats of the + # primary key name in the train_results schema. To ensure that all + # databases can be appropriately updated, procedural is used to look up + # the name of the primary key before droppint it. + op.drop_constraint("evaluations_pkey", "evaluations", schema="test_results") + op.execute( + """ + DO + $body$ + DECLARE _pkey_name varchar(100) := ( + SELECT conname + FROM pg_catalog.pg_constraint con + INNER JOIN pg_catalog.pg_class rel ON rel.oid = con.conrelid + INNER JOIN pg_catalog.pg_namespace nsp ON nsp.oid = con.connamespace + WHERE rel.relname = 'evaluations' + AND nspname = 'train_results' + AND contype = 'p' + ); + BEGIN + EXECUTE('ALTER TABLE train_results.evaluations DROP CONSTRAINT ' || _pkey_name); + END + $body$ + """ + ) + + op.create_primary_key( + constraint_name="evaluations_pkey", + table_name="evaluations", + columns=[ + "model_id", + "subset_hash", + "evaluation_start_time", + "evaluation_end_time", + "as_of_date_frequency", + "metric", + "parameter" + ], + schema="test_results", + ) + op.create_primary_key( + constraint_name="train_evaluations_pkey", + table_name="evaluations", + columns=[ + "model_id", + "subset_hash", + "evaluation_start_time", + "evaluation_end_time", + "as_of_date_frequency", + "metric", + "parameter" + ], + schema="train_results", + ) + + +def downgrade(): + """ + This downgrade revereses the steps of the upgrade: + 1. Alters the primary key on the evaluations tables to exclude + subset_hash + 2. Drops the subset hash columns from the evaluations tables + 3. Drops the model_metadata.subsets table + """ + # 1. Alter primary keys + op.drop_constraint("evaluations_pkey", "evaluations", schema="test_results") + op.drop_constraint("train_evaluations_pkey", "evaluations", schema="train_results") + + op.create_primary_key( + name="evaluations_pkey", + table_name="evaluations", + columns=[ + "model_id", + "evaluation_start_time", + "evaluation_end_time", + "as_of_date_frequency", + "metric", + "parameter" + ], + schema="test_results", + ) + op.create_primary_key( + name="train_evaluations_pkey", + table_name="evaluations", + columns=[ + "model_id", + "evaluation_start_time", + "evaluation_end_time", + "as_of_date_frequency", + "metric", + "parameter" + ], + schema="train_results", + ) + + # 2. Drop subset_hash columns + op.drop_column("evaluations", "subset_hash", schema="train_results") + op.drop_column("evaluations", "subset_hash", schema="test_results") + + # 3. Drop subsets table + op.drop_table("subsets", schema="model_metadata") diff --git a/src/triage/component/results_schema/schema.py b/src/triage/component/results_schema/schema.py index b43acdf53..d71dd81d9 100644 --- a/src/triage/component/results_schema/schema.py +++ b/src/triage/component/results_schema/schema.py @@ -51,6 +51,16 @@ class Experiment(Base): config = Column(JSONB) +class Subset(Base): + + __tablename__ = "subsets" + __table_args__ = {"schema": "model_metadata"} + + subset_hash = Column(String, primary_key=True) + config = Column(JSONB) + created_timestamp = Column(DateTime(timezone=True), server_default=func.now()) + + class ModelGroup(Base): __tablename__ = "model_groups" @@ -249,6 +259,7 @@ class TestEvaluation(Base): model_id = Column( Integer, ForeignKey("model_metadata.models.model_id"), primary_key=True ) + subset_hash = Column(String, primary_key=True, default='') evaluation_start_time = Column(DateTime, primary_key=True) evaluation_end_time = Column(DateTime, primary_key=True) as_of_date_frequency = Column(Interval, primary_key=True) @@ -273,6 +284,7 @@ class TrainEvaluation(Base): model_id = Column( Integer, ForeignKey("model_metadata.models.model_id"), primary_key=True ) + subset_hash = Column(String, primary_key=True, default='') evaluation_start_time = Column(DateTime, primary_key=True) evaluation_end_time = Column(DateTime, primary_key=True) as_of_date_frequency = Column(Interval, primary_key=True) diff --git a/src/triage/experiments/base.py b/src/triage/experiments/base.py index 5418c3d1a..b07844e93 100644 --- a/src/triage/experiments/base.py +++ b/src/triage/experiments/base.py @@ -21,9 +21,9 @@ ) from triage.component.architect.planner import Planner from triage.component.architect.builders import MatrixBuilder -from triage.component.architect.cohort_table_generators import ( - CohortTableGenerator, - CohortTableGeneratorNoOp, +from triage.component.architect.entity_date_table_generators import ( + EntityDateTableGenerator, + EntityDateTableGeneratorNoOp, ) from triage.component.timechop import Timechop from triage.component.results_schema import upgrade_db @@ -33,7 +33,8 @@ Predictor, IndividualImportanceCalculator, ModelGrouper, - ModelTrainTester + ModelTrainTester, + Subsetter ) from triage.component.catwalk.utils import ( save_experiment_and_get_hash, @@ -41,7 +42,7 @@ associate_matrices_with_experiment, missing_matrix_uuids, missing_model_hashes, - filename_friendly_hash + filename_friendly_hash, ) from triage.component.catwalk.storage import ( CSVMatrixStore, @@ -63,8 +64,8 @@ class ExperimentBase(ABC): Subclasses must implement the following four methods: process_query_tasks process_matrix_build_tasks - process_train_tasks - process_model_test_tasks + process_subset_tasks + process_train_test_tasks Look at singlethreaded.py for reference implementation of each. @@ -119,13 +120,14 @@ def __init__( self.cleanup = cleanup if self.cleanup: logging.info( - "cleanup is set to True, so intermediate tables (labels and states) " - "will be removed after matrix creation" + "cleanup is set to True, so intermediate tables (labels and cohort) " + "will be removed after matrix creation and subset tables will be " + "removed after model training and testing" ) else: logging.info( - "cleanup is set to False, so intermediate tables (labels and states) " - "will not be removed after matrix creation" + "cleanup is set to False, so intermediate tables (labels, cohort, and subsets) " + "will not be removed" ) self.cleanup_timeout = ( self.cleanup_timeout if cleanup_timeout is None else cleanup_timeout @@ -160,8 +162,8 @@ def initialize_components(self): cohort_config.get('name', 'default'), filename_friendly_hash(cohort_config['query']) ) - self.cohort_table_generator = CohortTableGenerator( - cohort_table_name=self.cohort_table_name, + self.cohort_table_generator = EntityDateTableGenerator( + entity_date_table_name=self.cohort_table_name, db_engine=self.db_engine, query=cohort_config["query"], replace=self.replace @@ -174,7 +176,9 @@ def initialize_components(self): ) self.features_ignore_cohort = True self.cohort_table_name = "cohort_{}".format(self.experiment_hash) - self.cohort_table_generator = CohortTableGeneratorNoOp() + self.cohort_table_generator = EntityDateTableGeneratorNoOp() + + self.subsets = [None] + self.config.get("scoring", {}).get("subsets", []) if "label_config" in self.config: label_config = self.config["label_config"] @@ -243,6 +247,12 @@ def initialize_components(self): replace=self.replace, ) + self.subsetter = Subsetter( + db_engine=self.db_engine, + replace=self.replace, + as_of_times=self.all_as_of_times + ) + self.trainer = ModelTrainer( experiment_hash=self.experiment_hash, model_storage_engine=self.model_storage_engine, @@ -277,7 +287,8 @@ def initialize_components(self): model_evaluator=self.evaluator, model_trainer=self.trainer, individual_importance_calculator=self.individual_importance_calculator, - predictor=self.predictor + predictor=self.predictor, + subsets=self.subsets, ) @cachedproperty @@ -509,6 +520,10 @@ def all_label_timespans(self): ) ) + @cachedproperty + def subset_tasks(self): + return self.subsetter.generate_tasks(self.subsets) + def generate_labels(self): """Generate labels based on experiment configuration @@ -519,7 +534,12 @@ def generate_labels(self): ) def generate_cohort(self): - self.cohort_table_generator.generate_cohort_table( + self.cohort_table_generator.generate_entity_date_table( + as_of_dates=self.all_as_of_times + ) + + def generate_subset(self, subset_hash): + self.subsets["subset_hash"].subset_table_generator.generate_entity_date_table( as_of_dates=self.all_as_of_times ) @@ -532,6 +552,10 @@ def log_split(self, split_num, split): split["train_matrix"]["matrix_info_end_time"], ) + @abstractmethod + def process_subset_tasks(self, subset_tasks): + pass + @abstractmethod def process_train_test_tasks(self, train_tasks): pass @@ -580,6 +604,13 @@ def generate_matrices(self): logging.info("Building all matrices") self.build_matrices() + def generate_subsets(self): + if self.subsets: + logging.info("Beginning subset generation") + self.process_subset_tasks(self.subset_tasks) + else: + logging.info("No subsets found. Proceeding to training and testing models") + def _all_train_test_tasks(self): if "grid_config" not in self.config: logging.warning( @@ -599,6 +630,7 @@ def _all_train_test_tasks(self): return train_test_tasks def train_and_test_models(self): + self.generate_subsets() tasks = self._all_train_test_tasks() if not tasks: logging.warning("No train/test tasks found, so no training to do") @@ -621,11 +653,15 @@ def _run(self): self.generate_matrices() finally: if self.cleanup: - self.clean_up_tables() + self.clean_up_matrix_building_tables() - self.train_and_test_models() - logging.info("Experiment complete") - self._log_end_of_run_report() + try: + self.train_and_test_models() + finally: + if self.cleanup: + self.clean_up_subset_tables() + logging.info("Experiment complete") + self._log_end_of_run_report() def _log_end_of_run_report(self): missing_models = missing_model_hashes(self.experiment_hash, self.db_engine) @@ -652,12 +688,18 @@ def _log_end_of_run_report(self): else: logging.info("All matrices that were supposed to be build were built. Awesome!") - def clean_up_tables(self): - logging.info("Cleaning up state and labels tables") + def clean_up_matrix_building_tables(self): + logging.info("Cleaning up cohort and labels tables") with timeout(self.cleanup_timeout): self.cohort_table_generator.clean_up() self.label_generator.clean_up(self.labels_table_name) + def clean_up_subset_tables(self): + logging.info("Cleaning up cohort and labels tables") + with timeout(self.cleanup_timeout): + for subset_task in self.subset_tasks: + subset_task["subset_table_generator"].clean_up() + def _run_profile(self): cp = cProfile.Profile() cp.runcall(self._run) diff --git a/src/triage/experiments/multicore.py b/src/triage/experiments/multicore.py index 74602a7be..0c286c95c 100644 --- a/src/triage/experiments/multicore.py +++ b/src/triage/experiments/multicore.py @@ -89,6 +89,20 @@ def process_matrix_build_tasks(self, matrix_build_tasks): partial_build_matrix, self.matrix_build_tasks.values(), self.n_processes ) + def process_subset_tasks(self, subset_tasks): + partial_subset = partial( + run_task_with_splatted_arguments, self.subsetter.process_task + ) + + logging.info( + "Starting parallel subset creation: %s subsets, %s processes", + len(subset_tasks), + self.n_db_processes, + ) + parallelize( + partial_subset, subset_tasks, self.n_db_processes + ) + def insert_into_table(insert_statements, feature_generator): try: diff --git a/src/triage/experiments/rq.py b/src/triage/experiments/rq.py index cf56a5f13..d78b3f163 100644 --- a/src/triage/experiments/rq.py +++ b/src/triage/experiments/rq.py @@ -161,3 +161,23 @@ def process_train_test_tasks(self, train_test_tasks): for task in train_test_tasks ] return self.wait_for(jobs) + + def process_subset_tasks(self, subset_tasks): + """Run subset tasks using RQ + + Args: + subset_tasks (list) of dictionaries, each representing kwargs suitable + for self.subsetter.process_task + Returns: (list) of job results for each given task + """ + jobs = [ + self.queue.enqueue( + self.subsetter.process_task, + timeout=DEFAULT_TIMEOUT, + result_ttl=DEFAULT_TIMEOUT, + ttl=DEFAULT_TIMEOUT, + **task + ) + for task in subset_tasks + ] + return self.wait_for(jobs) diff --git a/src/triage/experiments/singlethreaded.py b/src/triage/experiments/singlethreaded.py index e414cef92..d046260c2 100644 --- a/src/triage/experiments/singlethreaded.py +++ b/src/triage/experiments/singlethreaded.py @@ -10,3 +10,6 @@ def process_matrix_build_tasks(self, matrix_build_tasks): def process_train_test_tasks(self, tasks): self.model_train_tester.process_all_tasks(tasks) + + def process_subset_tasks(self, subset_tasks): + self.subsetter.process_all_tasks(subset_tasks) diff --git a/src/triage/experiments/validate.py b/src/triage/experiments/validate.py index 4a69a484b..090b371de 100644 --- a/src/triage/experiments/validate.py +++ b/src/triage/experiments/validate.py @@ -768,6 +768,52 @@ def _run(self, scoring_config): ) ) + if "subsets" in scoring_config: + for subset in scoring_config["subsets"]: + # 1. Validate that all required keys are present + if "query" not in subset: + raise ValueError( + dedent( + f"""Section: subsets - + The subset {subset} does not have a query key. + To run evaluations on a subset, you must + include a query that returns a list of distinct + entity_ids and has a placeholder for an + as_of_date + """ + ) + ) + if "name" not in subset: + raise ValueError( + dedent( + f"""Section: subsets - + The subset {subset} does not have a name key. + Please give a name to your subset. This is used + in the namespacing of subset tables created by + triage. + """ + ) + ) + + # 2. Validate that query conforms to the expectations + if "{as_of_date}" not in subset["query"]: + raise ValueError( + dedent( + f"""Section: subsets - + The subset query {subset["query"]} must + include a placeholder for the as_of_date + """ + ) + ) + if "entity_id" not in subset["query"]: + raise ValueError( + dedent( + f"""The subset qeury {subset["query"]} must + return a list of distinct entity_ids + """ + ) + ) + class ExperimentValidator(Validator): def run(self, experiment_config):