Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only Build Features for Cohort [Resolves #513] #567

Merged
merged 4 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/experiments/algorithm.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,16 @@ a column or SQL expression representing a numeric quantity present in the `from_
of aggregate functions we want to use. The aggregate function is applied to the quantity.
* Each `group` is a column applied to the GROUP BY clause. Generally this is 'entity_id', but higher-level groupings
(for instance, 'zip_code') can be used as long as they can be rolled up to 'entity_id'.
* By default the query is joined with the cohort table (see 'state table' above) to remove unnecessary rows. If `features_ignore_cohort` is passed to the Experiment this is not done.

Copy link
Member

@saleiro saleiro Jan 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, save_all_features makes me think of columns and not rows... Maybe something like 'features_ignore_cohort'. or 'features_besides_cohort', or 'features_for_all' :) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. My preference is features_ignore_cohort, I'll change that

So a simplified version of a typical query would look like:
```
SELECT {group}, {metric}({quantity})
FROM {from_obj}
JOIN {cohort_table} ON (
{cohort_table.entity_id} = {from_obj.entity_id}
AND {cohort_table.date} = {as_of_time}
)
WHERE {knowledge_date_column} >= {as_of_time} - interval {interval}
GROUP BY {group}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I tested with explain analyze and everything looks good!

Expand Down
7 changes: 6 additions & 1 deletion docs/sources/experiments/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ By default, all work will be recreated. This includes label queries, feature que

- Cohort Table: The Experiment keeps a cohort table namespaced by its experiment hash, and within that will check on a per-as-of-date level whether or not there are any existing rows, and skip the cohort query for that date if so. For this reason, it is *not* aware of specific entities or source events so if the source data has changed, you will not want to set `replace` to False. Don't expect too much reuse from this, however, as the table is experiment-namespaced. Essentially, this will only reuse data if the same experiment was run prior and failed part of the way through.
- Labels Table: The Experiment keeps a labels table namespaced by its experiment hash, and within that will check on a per-`as_of_date`/`label timespan` level whether or not there are *any* existing rows, and skip the label query if so. For this reason, it is *not* aware of specific entities or source events so if the label query has changed or the source data has changed, you will not want to set `replace` to False. Don't expect too much reuse from this, however, as the table is experiment-namespaced. Essentially, this will only reuse data if the same experiment was run prior and failed part of the way through label generation.
- Features Tables: The Experiment will check on a per-table basis whether or not it exists, and skip the feature generation if so. Each 'table' maps to a feature aggregation in your experiment config, so if you have modified any source data that affects that feature aggregation, added any features to that aggregation, or changed any `temporal_config` so there are more `as_of_dates`, you won't want to set `replace` to False.
- Features Tables: The Experiment will check on a per-table basis whether or not it exists and contains rows for the entire cohort, and skip the feature generation if so. It does not look at the column list for the feature table or inspect the feature data itself. So, if you have modified any source data that affects a feature aggregation, or added any columns to that aggregation, you won't want to set `replace` to False. However, it is cohort-and-date aware so you can change around your cohort and temporal configuration safely.
- Matrix Building: Each matrix's metadata is hashed to create a unique id. If a file exists in storage with that hash, it will be reused.
- Model Training: Each model's metadata (which includes its train matrix's hash) is hashed to create a unique id. If a file exists in storage with that hash, it will be reused.

Expand Down Expand Up @@ -321,6 +321,11 @@ By default, experiments will inspect the `from_obj` of every feature aggregation

You can turn this off if you'd like, which you may want to do if the `from_obj` subqueries return a lot of data and you want to save as much disk space as possible. The option is turned off by passing `materialize_subquery_fromobjs=False` to the Experiment.

### Build Features Independently of Cohort

By default the feature queries generated by your feature configuration on any given date are joined with the cohort table on that date, which means that no features for entities not in the cohort are saved. This is to save time and database disk space when your cohort on any given date is not very large and allow you to iterate on feature building quickly by default. However, this means that anytime you change your cohort, you have to rebuild all of your features. Depending on your experiment setup (for instance, multiple large cohorts that you experiment with), this may be time-consuming. Change this by passing `features_ignore_cohort=True` to the Experiment constructor, or `--save-all-features` to the command-line.


## Experiment Classes

- *SingleThreadedExperiment*: An experiment that performs all tasks serially in a single thread. Good for simple use on small datasets, or for understanding the general flow of data through a pipeline.
Expand Down
24 changes: 22 additions & 2 deletions src/tests/architect_tests/test_feature_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ def test_aggregations(test_engine):


def test_replace(test_engine):
# test the replace=False functionality, wherein we see if the cohort is fully represented
# in the imputed table and reuse the features if so
aggregate_config = [
{
"prefix": "aprefix",
Expand All @@ -672,21 +674,23 @@ def test_replace(test_engine):
features_schema_name=features_schema_name,
replace=False,
).create_all_tables(
feature_dates=["2013-09-30", "2014-09-30"],
feature_dates=["2013-09-30", "2014-09-30", "2015-01-01"],
feature_aggregation_config=aggregate_config,
state_table="states",
)

assert len(feature_tables) == 1
assert list(feature_tables)[0] == "aprefix_aggregation_imputed"

# now try and run feature generation with replace=False. We should
# be able to see that the entire cohort is there and reuse the features
feature_generator = FeatureGenerator(
db_engine=test_engine,
features_schema_name=features_schema_name,
replace=False,
)
aggregations = feature_generator.aggregations(
feature_dates=["2013-09-30", "2014-09-30"],
feature_dates=["2013-09-30", "2014-09-30", "2015-01-01"],
feature_aggregation_config=aggregate_config,
state_table="states",
)
Expand All @@ -696,6 +700,7 @@ def test_replace(test_engine):
)

assert len(table_tasks["aprefix_entity_id"]) == 0
assert len(table_tasks["aprefix_aggregation"]) == 0

imp_tasks = feature_generator.generate_all_table_tasks(
aggregations,
Expand All @@ -704,6 +709,21 @@ def test_replace(test_engine):

assert len(imp_tasks["aprefix_aggregation_imputed"]) == 0

# add a new member of the cohort. now we should need to rebuild everything
test_engine.execute("insert into states values (%s, %s)", 999, "2015-01-01")
table_tasks = feature_generator.generate_all_table_tasks(
aggregations,
task_type="aggregation",
)
assert len(table_tasks["aprefix_entity_id"]) == 3
assert len(table_tasks["aprefix_aggregation"]) == 3
feature_generator.process_table_tasks(table_tasks)
imp_tasks = feature_generator.generate_all_table_tasks(
aggregations,
task_type="imputation",
)

assert len(imp_tasks["aprefix_aggregation_imputed"]) == 3

def test_aggregations_materialize_off(test_engine):
aggregate_config = {
Expand Down
70 changes: 70 additions & 0 deletions src/tests/collate_tests/test_spacetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,73 @@ def test_input_min_date():
st.validate(engine.connect())
with pytest.raises(ValueError):
st.execute(engine.connect())


def test_join_with_cohort_table(db_engine):
# if we specify joining with the cohort table
# only entity_id/date pairs in the cohort table should show up
db_engine.execute("create table events (entity_id int, date date, outcome bool)")
for event in events_data:
db_engine.execute("insert into events values (%s, %s, %s::bool)", event)

db_engine.execute("create table cohort (entity_id int, date date)")

# use the states list from above except only include entities 1 and 2 in the cohort
smaller_cohort = sorted(
product(
set([l[0] for l in events_data if l[0] == 1 or l[0] == 2]),
set([l[1] for l in events_data] + [date(2016, 1, 1)]),
)
)
for state in smaller_cohort:
db_engine.execute("insert into cohort values (%s, %s)", state)

# create our test aggregation with the important 'join_with_cohort_table' flag
agg = Aggregate(
"outcome::int",
["sum", "avg"],
{
"coltype": "aggregate",
"avg": {"type": "mean"},
"sum": {"type": "constant", "value": 3},
"max": {"type": "zero"},
},
)
st = SpacetimeAggregation(
aggregates=[agg],
from_obj="events",
groups=["entity_id"],
intervals=["all"],
dates=["2016-01-01", "2015-01-01"],
state_table="cohort",
state_group="entity_id",
date_column='"date"',
join_with_cohort_table=True,
)

st.execute(db_engine.connect())

r = db_engine.execute("select * from events_entity_id order by entity_id, date")
rows = [x for x in r]

# these rows should be similar to the rows in the basic spacetime test,
# except only the rows for entities 1 and 2 are present
assert len(rows) == 4

assert rows[0]["entity_id"] == 1
assert rows[0]["date"] == date(2015, 1, 1)
assert rows[0]["events_entity_id_all_outcome::int_sum"] == 1
assert rows[0]["events_entity_id_all_outcome::int_avg"] == 0.5
assert rows[1]["entity_id"] == 1
assert rows[1]["date"] == date(2016, 1, 1)
assert rows[1]["events_entity_id_all_outcome::int_sum"] == 2
assert rows[1]["events_entity_id_all_outcome::int_avg"] == 0.5

assert rows[2]["entity_id"] == 2
assert rows[2]["date"] == date(2015, 1, 1)
assert rows[2]["events_entity_id_all_outcome::int_sum"] == 1
assert rows[2]["events_entity_id_all_outcome::int_avg"] == 0.5
assert rows[3]["entity_id"] == 2
assert rows[3]["date"] == date(2016, 1, 1)
assert rows[3]["events_entity_id_all_outcome::int_sum"] == 1
assert rows[3]["events_entity_id_all_outcome::int_avg"] == 0.5
27 changes: 25 additions & 2 deletions src/triage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy.engine.url import URL

from triage.component.architect.feature_generators import FeatureGenerator
from triage.component.architect.cohort_table_generators import CohortTableGenerator
from triage.component.audition import AuditionRunner
from triage.component.results_schema import upgrade_db, stamp_db, REVISION_MAPPING
from triage.component.timechop import Timechop
Expand Down Expand Up @@ -133,10 +134,21 @@ def __init__(self, parser):
def __call__(self, args):
self.root.setup() # Loading configuration (if exists)
db_engine = create_engine(self.root.db_url)
feature_config = yaml.load(args.feature_config_file)
full_config = yaml.load(args.feature_config_file)
feature_config = full_config['feature_aggregations']
cohort_config = full_config.get('cohort_config', None)
if cohort_config:
CohortTableGenerator(
cohort_table_name="features_test.test_cohort",
db_engine=db_engine,
query=cohort_config["query"],
replace=True
).generate_cohort_table(as_of_dates=[args.as_of_date])

FeatureGenerator(db_engine, "features_test").create_features_before_imputation(
feature_aggregation_config=feature_config, feature_dates=[args.as_of_date]
feature_aggregation_config=feature_config,
feature_dates=[args.as_of_date],
state_table="features_test.test_cohort"
)
logging.info(
"Features created for feature_config %s and date %s",
Expand Down Expand Up @@ -230,6 +242,16 @@ def __init__(self, parser):
help="Skip saving predictions to the database to save time",
)

parser.add_argument(
"--features-ignore-cohort",
action="store_true",
default=False,
dest="features_ignore_cohort",
help="Will save all features independently of cohort. " +
"This can require more disk space but allow you to reuse " +
"features across different cohorts"
)

parser.set_defaults(validate=True, validate_only=False, materialize_fromobjs=True)

@cachedproperty
Expand All @@ -244,6 +266,7 @@ def experiment(self):
"config": config,
"replace": self.args.replace,
"materialize_subquery_fromobjs": self.args.materialize_fromobjs,
"features_ignore_cohort": self.args.features_ignore_cohort,
"matrix_storage_class": self.matrix_storage_map[self.args.matrix_format],
"profile": self.args.profile,
"save_predictions": self.args.save_predictions
Expand Down
56 changes: 38 additions & 18 deletions src/triage/component/architect/feature_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def __init__(
features_schema_name,
replace=True,
feature_start_time=None,
materialize_subquery_fromobjs=True
materialize_subquery_fromobjs=True,
features_ignore_cohort=False,
):
"""Generates aggregate features using collate

Expand All @@ -35,13 +36,17 @@ def __init__(
should be replaced
feature_start_time (string/datetime, optional) point in time before which
should not be included in features
features_ignore_cohort (boolean, optional) Whether or not features should be built
independently of the cohort. Takes longer but means that features can be reused
for different cohorts.
"""
self.db_engine = db_engine
self.features_schema_name = features_schema_name
self.categorical_cache = {}
self.replace = replace
self.feature_start_time = feature_start_time
self.materialize_subquery_fromobjs = materialize_subquery_fromobjs
self.features_ignore_cohort = features_ignore_cohort
self.entity_id_column = "entity_id"
self.from_objs = {}

Expand Down Expand Up @@ -315,6 +320,7 @@ def _aggregation(self, aggregation_config, feature_dates, state_table):
input_min_date=self.feature_start_time,
schema=self.features_schema_name,
prefix=aggregation_config["prefix"],
join_with_cohort_table=not self.features_ignore_cohort
)

def aggregations(self, feature_aggregation_config, feature_dates, state_table):
Expand Down Expand Up @@ -388,12 +394,12 @@ def generate_all_table_tasks(self, aggregations, task_type):
return table_tasks

def create_features_before_imputation(
self, feature_aggregation_config, feature_dates
self, feature_aggregation_config, feature_dates, state_table=None
):
"""Create features before imputation for a set of dates"""
all_tasks = self.generate_all_table_tasks(
self.aggregations(
feature_aggregation_config, feature_dates, state_table=None
feature_aggregation_config, feature_dates, state_table=state_table
),
task_type="aggregation",
)
Expand Down Expand Up @@ -560,6 +566,31 @@ def index_column_lookup(self, aggregations, imputed=True):
for aggregation in aggregations
)

def _needs_features(self, aggregation):
imputed_table = self._clean_table_name(
aggregation.get_table_name(imputed=True)
)

if self._table_exists(imputed_table):
check_query = (
f"select 1 from {aggregation.state_table} "
f"left join {self.features_schema_name}.{imputed_table} "
"using (entity_id, as_of_date) "
f"where {self.features_schema_name}.{imputed_table}.entity_id is null limit 1"
jesteria marked this conversation as resolved.
Show resolved Hide resolved
)
if self.db_engine.execute(check_query).scalar():
logging.warning(
"Imputed feature table %s did not contain rows from the "
"entire cohort, need to rebuild features", imputed_table)
return True
else:
logging.warning("Imputed feature table %s did not exist, "
"need to build features", imputed_table)
return True
logging.warning("Imputed feature table %s looks good, "
"skipping feature building!", imputed_table)
return False

def _generate_agg_table_tasks_for(self, aggregation):
"""Generates SQL commands for preparing, populating, and finalizing
each feature group table in the given aggregation
Expand All @@ -582,13 +613,7 @@ def _generate_agg_table_tasks_for(self, aggregation):
group_table = self._clean_table_name(
aggregation.get_table_name(group=group)
)
imputed_table = self._clean_table_name(
aggregation.get_table_name(imputed=True)
)
if self.replace or (
not self._table_exists(group_table)
and not self._table_exists(imputed_table)
):
if self.replace or self._needs_features(aggregation):
table_tasks[group_table] = {
"prepare": [drops[group], creates[group]],
"inserts": inserts[group],
Expand All @@ -599,12 +624,7 @@ def _generate_agg_table_tasks_for(self, aggregation):
logging.info("Skipping feature table creation for %s", group_table)
table_tasks[group_table] = {}
logging.info("Created table tasks for aggregation")
if self.replace or (
not self._table_exists(self._clean_table_name(aggregation.get_table_name()))
and not self._table_exists(
self._clean_table_name(aggregation.get_table_name(imputed=True))
)
):
if self.replace or self._needs_features(aggregation):
table_tasks[self._clean_table_name(aggregation.get_table_name())] = {
"prepare": [aggregation.get_drop(), aggregation.get_create()],
"inserts": [],
Expand Down Expand Up @@ -638,8 +658,8 @@ def _generate_imp_table_tasks_for(self, aggregation, drop_preagg=True):
table_tasks = OrderedDict()
imp_tbl_name = self._clean_table_name(aggregation.get_table_name(imputed=True))

if not self.replace and self._table_exists(imp_tbl_name):
logging.info("Skipping imputation table creation for %s", imp_tbl_name)
if not self.replace and not self._needs_features(aggregation):
logging.warning("Skipping imputation table creation for %s", imp_tbl_name)
table_tasks[imp_tbl_name] = {}
return table_tasks

Expand Down
13 changes: 12 additions & 1 deletion src/triage/component/collate/spacetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(
date_column=None,
output_date_column=None,
input_min_date=None,
join_with_cohort_table=False,
):
"""
Args:
Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(
self.date_column = date_column if date_column else "date"
self.output_date_column = output_date_column if output_date_column else "date"
self.input_min_date = input_min_date
self.join_with_cohort_table = join_with_cohort_table

def _state_table_sub(self):
"""Helper function to ensure we only include state table records
Expand Down Expand Up @@ -141,7 +143,16 @@ def get_selects(self):
)

gb_clause = make_sql_clause(groupby, ex.literal_column)
query = ex.select(columns=columns, from_obj=self.from_obj).group_by(
if self.join_with_cohort_table:
from_obj = ex.text(
f"(select from_obj.* from ("
f"(select * from {self.from_obj}) from_obj join {self.state_table} cohort on ( "
"cohort.entity_id = from_obj.entity_id and "
f"cohort.{self.output_date_column} = '{date}'::date)"
")) cohorted_from_obj")
else:
from_obj = self.from_obj
query = ex.select(columns=columns, from_obj=from_obj).group_by(
gb_clause
)
query = query.where(self.where(date, intervals))
Expand Down
Loading