diff --git a/src/triage/component/catwalk/__init__.py b/src/triage/component/catwalk/__init__.py index b85f821fd..999a87be2 100644 --- a/src/triage/component/catwalk/__init__.py +++ b/src/triage/component/catwalk/__init__.py @@ -2,5 +2,141 @@ from .model_trainers import ModelTrainer from .predictors import Predictor from .evaluation import ModelEvaluator +from .individual_importance import IndividualImportanceCalculator +from .model_grouping import ModelGrouper -__all__ = ("ModelTrainer", "Predictor", "ModelEvaluator") +import logging + + +class ModelTrainTester(object): + def __init__( + self, + matrix_storage_engine, + model_trainer, + model_evaluator, + individual_importance_calculator, + predictor + ): + self.matrix_storage_engine = matrix_storage_engine + self.model_trainer = model_trainer + self.model_evaluator = model_evaluator + self.individual_importance_calculator = individual_importance_calculator + self.predictor = predictor + + def generate_tasks(self, split, grid_config, model_comment=None): + logging.info("Generating train/test tasks for split %s", split["train_uuid"]) + train_store = self.matrix_storage_engine.get_store(split["train_uuid"]) + if train_store.empty: + logging.warning( + """Train matrix for split %s was empty, + no point in training this model. Skipping + """, + split["train_uuid"], + ) + return [] + if len(train_store.labels().unique()) == 1: + logging.warning( + """Train Matrix for split %s had only one + unique value, no point in training this model. Skipping + """, + split["train_uuid"], + ) + return [] + train_tasks = self.model_trainer.generate_train_tasks( + grid_config=grid_config, + misc_db_parameters=dict(test=False, model_comment=model_comment), + matrix_store=train_store + ) + + train_test_tasks = [] + for test_matrix_def, test_uuid in zip( + split["test_matrices"], split["test_uuids"] + ): + test_store = self.matrix_storage_engine.get_store(test_uuid) + + if test_store.empty: + logging.warning( + """Test matrix for uuid %s + was empty, no point in generating predictions. Not creating train/test task. + """, + test_uuid, + ) + continue + for train_task in train_tasks: + train_test_tasks.append( + { + "test_store": test_store, + "train_store": train_store, + "train_kwargs": train_task, + } + ) + return train_test_tasks + + def process_all_tasks(self, tasks): + for task in tasks: + self.process_task(**task) + + def process_task(self, test_store, train_store, train_kwargs): + logging.info("Beginning train task %s", train_kwargs) + model_id = self.model_trainer.process_train_task(**train_kwargs) + if not model_id: + logging.warning("No model id returned from ModelTrainer.process_train_task, " + "training unsuccessful. Not attempting to test") + return + logging.info("Trained task %s and got model id %s", train_kwargs, model_id) + as_of_times = test_store.metadata["as_of_times"] + logging.info( + "Testing and scoring model id %s with test matrix %s. " + "as_of_times min: %s max: %s num: %s", + model_id, + test_store.uuid, + min(as_of_times), + max(as_of_times), + len(as_of_times), + ) + + self.individual_importance_calculator.calculate_and_save_all_methods_and_dates( + model_id, test_store + ) + + # Generate predictions for the testing data then training data + for store in (test_store, train_store): + if self.predictor.replace or self.model_evaluator.needs_evaluations(store, model_id): + logging.info( + "The evaluations needed for matrix %s-%s and model %s" + "are not all present in db, so predicting and evaluating", + store.uuid, + store.matrix_type, + model_id + ) + predictions_proba = self.predictor.predict( + model_id, + store, + misc_db_parameters=dict(), + train_matrix_columns=train_store.columns(), + ) + + self.model_evaluator.evaluate( + predictions_proba=predictions_proba, + matrix_store=store, + model_id=model_id, + ) + else: + logging.info( + "The evaluations needed for matrix %s-%s and model %s are all present" + "in db from a previous run (or none needed at all), so skipping!", + store.uuid, + store.matrix_type, + model_id + ) + self.model_trainer.uncache_model(model_id) + + +__all__ = ( + "IndividualImportanceCalculator", + "ModelEvaluator", + "ModelGrouper" + "ModelTrainer", + "Predictor", + "ModelTrainTester" +) diff --git a/src/triage/component/catwalk/individual_importance/__init__.py b/src/triage/component/catwalk/individual_importance/__init__.py index 1f8838b1d..20f404b12 100644 --- a/src/triage/component/catwalk/individual_importance/__init__.py +++ b/src/triage/component/catwalk/individual_importance/__init__.py @@ -151,4 +151,5 @@ def save(self, importance_records, model_id, as_of_date, method_name): importance_score=float(importance_record["score"]), ) db_objects.append(db_object) + print(len(db_objects)) save_db_objects(self.db_engine, db_objects) diff --git a/src/triage/component/catwalk/model_trainers.py b/src/triage/component/catwalk/model_trainers.py index 0a86225f0..3be9c201d 100644 --- a/src/triage/component/catwalk/model_trainers.py +++ b/src/triage/component/catwalk/model_trainers.py @@ -17,6 +17,7 @@ from .utils import ( filename_friendly_hash, retrieve_model_id_from_hash, + retrieve_model_hash_from_id, db_retry, save_db_objects, ) @@ -420,3 +421,6 @@ def generate_train_tasks(self, grid_config, misc_db_parameters, matrix_store=Non ) logging.info("Found %s unique model training tasks", len(tasks)) return tasks + + def uncache_model(self, model_id): + self.model_storage_engine.uncache(retrieve_model_hash_from_id(self.db_engine, model_id)) diff --git a/src/triage/component/catwalk/predictors.py b/src/triage/component/catwalk/predictors.py index 41a061c05..12ed7fba3 100644 --- a/src/triage/component/catwalk/predictors.py +++ b/src/triage/component/catwalk/predictors.py @@ -10,7 +10,7 @@ from triage.component.results_schema import Model -from .utils import db_retry +from .utils import db_retry, retrieve_model_hash_from_id class ModelNotFoundError(ValueError): @@ -37,22 +37,6 @@ def __init__(self, model_storage_engine, db_engine, replace=True): def sessionmaker(self): return sessionmaker(bind=self.db_engine) - @db_retry - def _retrieve_model_hash(self, model_id): - """Retrieves the model hash associated with a given model id - - Args: - model_id (int) The id of a given model in the database - - Returns: (str) the stored hash of the model - """ - try: - session = self.sessionmaker() - model_hash = session.query(Model).get(model_id).model_hash - finally: - session.close() - return model_hash - @db_retry def load_model(self, model_id): """Downloads the cached model associated with a given model id @@ -64,7 +48,7 @@ def load_model(self, model_id): A python object which implements .predict() """ - model_hash = self._retrieve_model_hash(model_id) + model_hash = retrieve_model_hash_from_id(self.db_engine, model_id) logging.info("Checking for model_hash %s in store", model_hash) if self.model_storage_engine.exists(model_hash): return self.model_storage_engine.load(model_hash) @@ -76,7 +60,7 @@ def delete_model(self, model_id): Args: model_id (int) The id of a given model in the database """ - model_hash = self._retrieve_model_hash(model_id) + model_hash = retrieve_model_hash_from_id(self.db_engine, model_id) self.model_storage_engine.delete(model_hash) @db_retry diff --git a/src/triage/component/catwalk/storage.py b/src/triage/component/catwalk/storage.py index 5fa6bec1c..b47a6eb7b 100644 --- a/src/triage/component/catwalk/storage.py +++ b/src/triage/component/catwalk/storage.py @@ -205,11 +205,15 @@ class ModelStorageEngine(object): A project file storage engine model_directory (string, optional) A directory name for models. Defaults to 'trained_models' + should_cache (bool, optional) Whether or not the engine should cache written models + in memory in addition to persisting. Defaults to True """ - def __init__(self, project_storage, model_directory=None): + def __init__(self, project_storage, model_directory=None, should_cache=True): self.project_storage = project_storage self.directories = [model_directory or "trained_models"] + self.should_cache = should_cache + self.cache = {} def write(self, obj, model_hash): """Persist a model object using joblib. Also performs compression @@ -218,6 +222,9 @@ def write(self, obj, model_hash): obj (object) A picklable model object model_hash (string) An identifier, unique within this project, for the model """ + if self.should_cache: + logging.info("Caching model %s", model_hash) + self.cache[model_hash] = obj with self._get_store(model_hash).open("wb") as fd: joblib.dump(obj, fd, compress=True) @@ -229,6 +236,9 @@ def load(self, model_hash): Returns: (object) A model object """ + if self.should_cache and model_hash in self.cache: + logging.info("Returning model %s from cache", model_hash) + return self.cache[model_hash] with self._get_store(model_hash).open("rb") as fd: return joblib.load(fd) @@ -250,6 +260,20 @@ def delete(self, model_hash): """ return self._get_store(model_hash).delete() + def uncache(self, model_hash): + """Remove the model identified by this hash from memory + + Args: + model_hash (string) An identifier, unique within this project, for the model + """ + if model_hash in self.cache: + logging.info("Removing model %s from cache", model_hash) + del self.cache[model_hash] + else: + logging.info("Model %s not in cache (likely was trained in another run)," + "so no need to remove", + model_hash) + def _get_store(self, model_hash): return self.project_storage.get_store(self.directories, model_hash) diff --git a/src/triage/component/catwalk/utils.py b/src/triage/component/catwalk/utils.py index caf88f462..f2ddef961 100644 --- a/src/triage/component/catwalk/utils.py +++ b/src/triage/component/catwalk/utils.py @@ -170,6 +170,22 @@ def retrieve_model_id_from_hash(db_engine, model_hash): session.close() +@db_retry +def retrieve_model_hash_from_id(db_engine, model_id): + """Retrieves the model hash associated with a given model id + + Args: + model_id (int) The id of a given model in the database + + Returns: (str) the stored hash of the model + """ + session = sessionmaker(bind=db_engine)() + try: + return session.query(Model).get(model_id).model_hash + finally: + session.close() + + @db_retry def save_db_objects(db_engine, db_objects): """Saves a collection of SQLAlchemy model objects to the database using a COPY command diff --git a/src/triage/experiments/base.py b/src/triage/experiments/base.py index db6ddd9ea..dae6baba7 100644 --- a/src/triage/experiments/base.py +++ b/src/triage/experiments/base.py @@ -27,9 +27,14 @@ ) from triage.component.timechop import Timechop from triage.component.results_schema import upgrade_db -from triage.component.catwalk.model_grouping import ModelGrouper -from triage.component.catwalk.model_trainers import ModelTrainer -from triage.component.catwalk.model_testers import ModelTester +from triage.component.catwalk import ( + ModelTrainer, + ModelEvaluator, + Predictor, + IndividualImportanceCalculator, + ModelGrouper, + ModelTrainTester +) from triage.component.catwalk.utils import ( save_experiment_and_get_hash, associate_models_with_experiment, @@ -229,13 +234,32 @@ def initialize_components(self): replace=self.replace, ) - self.tester = ModelTester( + self.predictor = Predictor( + db_engine=self.db_engine, model_storage_engine=self.model_storage_engine, - matrix_storage_engine=self.matrix_storage_engine, replace=self.replace, + ) + + self.individual_importance_calculator = IndividualImportanceCalculator( + db_engine=self.db_engine, + n_ranks=self.config.get("individual_importance", {}).get("n_ranks", 5), + methods=self.config.get("individual_importance", {}).get("methods", ["uniform"]), + replace=self.replace, + ) + + self.evaluator = ModelEvaluator( db_engine=self.db_engine, - individual_importance_config=self.config.get("individual_importance", {}), - evaluator_config=self.config.get("scoring", {}), + sort_seed=self.config.get("scoring", {}).get("sort_seed", None), + testing_metric_groups=self.config.get("scoring", {}).get("testing_metric_groups", []), + training_metric_groups=self.config.get("scoring", {}).get("training_metric_groups", []), + ) + + self.model_train_tester = ModelTrainTester( + matrix_storage_engine=self.matrix_storage_engine, + model_evaluator=self.evaluator, + model_trainer=self.trainer, + individual_importance_calculator=self.individual_importance_calculator, + predictor=self.predictor ) @cachedproperty @@ -491,7 +515,7 @@ def log_split(self, split_num, split): ) @abstractmethod - def process_train_tasks(self, train_tasks): + def process_train_test_tasks(self, train_tasks): pass @abstractmethod @@ -538,67 +562,44 @@ def generate_matrices(self): logging.info("Building all matrices") self.build_matrices() - def train_and_test_models(self): + def _all_train_test_tasks(self): if "grid_config" not in self.config: logging.warning( "No grid_config was passed in the experiment config. No models will be trained" ) return + train_test_tasks = [] for split_num, split in enumerate(self.full_matrix_definitions): self.log_split(split_num, split) - train_store = self.matrix_storage_engine.get_store(split["train_uuid"]) - if train_store.empty: - logging.warning( - """Train matrix for split %s was empty, - no point in training this model. Skipping - """, - split["train_uuid"], - ) - continue - if len(train_store.labels().unique()) == 1: - logging.warning( - """Train Matrix for split %s had only one - unique value, no point in training this model. Skipping - """, - split["train_uuid"], - ) - continue - - logging.info("Training models") - - train_tasks = self.trainer.generate_train_tasks( - grid_config=self.config["grid_config"], - misc_db_parameters=dict( - test=False, model_comment=self.config.get("model_comment", None) - ), - matrix_store=train_store, - ) - - associate_models_with_experiment( - self.experiment_hash, - [train_task['model_hash'] for train_task in train_tasks], - self.db_engine - ) - model_ids = self.process_train_tasks(train_tasks) + for task in self.model_train_tester.generate_tasks( + split=split, + grid_config=self.config.get('grid_config'), + model_comment=self.config.get('model_comment', None) + ): + train_test_tasks.append(task) + return train_test_tasks - logging.info("Done training models for split %s", split_num) - - test_tasks = self.tester.generate_model_test_tasks( - split=split, train_store=train_store, model_ids=model_ids - ) - logging.info( - "Found %s non-empty test matrices for split %s", - len(test_tasks), - split_num, - ) + def train_and_test_models(self): + tasks = self._all_train_test_tasks() + if not tasks: + logging.warning("No train/test tasks found, so no training to do") + return - self.process_model_test_tasks(test_tasks) + logging.info("%s train/test tasks found. Beginning training.", len(tasks)) + associate_models_with_experiment( + self.experiment_hash, + set(task['train_kwargs']['model_hash'] for task in tasks), + self.db_engine + ) + self.process_train_test_tasks(tasks) def validate(self, strict=True): ExperimentValidator(self.db_engine, strict=strict).run(self.config) def _run(self): + cp = cProfile.Profile() + cp.enable() try: logging.info("Generating matrices") self.generate_matrices() @@ -608,6 +609,12 @@ def _run(self): self.train_and_test_models() logging.info("Experiment complete") + cp.disable() + store = self.project_storage.get_store( + ["profiling_stats"], + f"{int(time.time())}.profile" + ) + cp.dump_stats(store.path) self._log_end_of_run_report() def _log_end_of_run_report(self): diff --git a/src/triage/experiments/multicore.py b/src/triage/experiments/multicore.py index 5c64e0ec2..74602a7be 100644 --- a/src/triage/experiments/multicore.py +++ b/src/triage/experiments/multicore.py @@ -51,29 +51,13 @@ def generated_chunked_parallelized_results( except Exception: logging.exception('Child failure') - def process_train_tasks(self, train_tasks): - partial_train_models = partial( - run_task_with_splatted_arguments, self.trainer.process_train_task - ) - logging.info( - "Starting parallel training: %s tasks, %s processes", - len(train_tasks), - self.n_processes, - ) - model_ids = [] - for model_id in parallelize( - partial_train_models, train_tasks, self.n_processes - ): - model_ids.append(model_id) - return model_ids - - def process_model_test_tasks(self, test_tasks): + def process_train_test_tasks(self, tasks): partial_test = partial( - run_task_with_splatted_arguments, self.tester.process_model_test_task + run_task_with_splatted_arguments, self.model_train_tester.process_task ) - logging.info("Starting parallel testing with %s processes", self.n_db_processes) - parallelize(partial_test, test_tasks, self.n_db_processes) + logging.info("Starting parallel testing with %s processes", self.n_processes) + parallelize(partial_test, tasks, self.n_processes) logging.info("Cleaned up concurrent pool") def process_query_tasks(self, query_tasks): diff --git a/src/triage/experiments/rq.py b/src/triage/experiments/rq.py index 979d72ae3..cf56a5f13 100644 --- a/src/triage/experiments/rq.py +++ b/src/triage/experiments/rq.py @@ -142,7 +142,7 @@ def process_matrix_build_tasks(self, matrix_build_tasks): ] return self.wait_for(jobs) - def process_train_tasks(self, train_tasks): + def process_train_test_tasks(self, train_test_tasks): """Run train tasks using RQ Args: @@ -152,32 +152,12 @@ def process_train_tasks(self, train_tasks): """ jobs = [ self.queue.enqueue( - self.trainer.process_train_task, + self.model_train_tester.process_task, timeout=DEFAULT_TIMEOUT, result_ttl=DEFAULT_TIMEOUT, ttl=DEFAULT_TIMEOUT, - **train_task + **task ) - for train_task in train_tasks - ] - return self.wait_for(jobs) - - def process_model_test_tasks(self, test_tasks): - """Run test tasks using RQ - - Args: - test_tasks (list) of dictionaries, each representing kwargs suitable - for self.tester.process_model_test_task - Returns: (list) of job results for each given task - """ - jobs = [ - self.queue.enqueue( - self.tester.process_model_test_task, - timeout=DEFAULT_TIMEOUT, - result_ttl=DEFAULT_TIMEOUT, - ttl=DEFAULT_TIMEOUT, - **test_task - ) - for test_task in test_tasks + for task in train_test_tasks ] return self.wait_for(jobs) diff --git a/src/triage/experiments/singlethreaded.py b/src/triage/experiments/singlethreaded.py index d2b942935..e414cef92 100644 --- a/src/triage/experiments/singlethreaded.py +++ b/src/triage/experiments/singlethreaded.py @@ -8,12 +8,5 @@ def process_query_tasks(self, query_tasks): def process_matrix_build_tasks(self, matrix_build_tasks): self.matrix_builder.build_all_matrices(matrix_build_tasks) - def process_train_tasks(self, train_tasks): - return [ - self.trainer.process_train_task(**train_task) for train_task in train_tasks - ] - - def process_model_test_tasks(self, test_tasks): - return [ - self.tester.process_model_test_task(**test_task) for test_task in test_tasks - ] + def process_train_test_tasks(self, tasks): + self.model_train_tester.process_all_tasks(tasks)