Skip to content

Commit

Permalink
Reduce Memory Usage of Matrix Building [Resolves #372]
Browse files Browse the repository at this point in the history
Matrix building is still very memory-intensive, for no particularly good
reason: we're not using the matrices at that point, just transferring
them from database to disk with an in-Python join to get around column
limits. While we're still using pandas to build the matrices themselves,
this is hard to get around: any type of pandas join will always use
multiple times the memory needed. Bringing the memory usage down to what
is actually needed for the data is better, but even better is to make
the memory usage controllable by never keeping the matrix in memory.

Using Ohio's PipeTextIO makes this technically feasible, but to make it
work out we also need to remove HDF support. HDF support was added
merely for the compression capabilities, and with recent changes to
compress CSVs, this is no longer needed.

- Remove HDFMatrixStore and hdf support from the experiment and CLI
- Modify MatrixStore.save to take in a bytestream instead of assuming it
has a dataframe available to convert
  • Loading branch information
thcrock committed Apr 4, 2019
1 parent 7707b08 commit 0a0817f
Show file tree
Hide file tree
Showing 14 changed files with 361 additions and 543 deletions.
335 changes: 49 additions & 286 deletions src/tests/architect_tests/test_builders.py

Large diffs are not rendered by default.

76 changes: 39 additions & 37 deletions src/tests/catwalk_tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import io
import tempfile
from collections import OrderedDict

Expand All @@ -8,15 +9,15 @@
from moto import mock_s3
import boto3
from numpy.testing import assert_almost_equal
from pandas.testing import assert_frame_equal
from pandas.testing import assert_frame_equal, assert_series_equal
from unittest import mock
from triage.util.pandas import downcast_matrix
import pytest

from triage.component.catwalk.storage import (
MatrixStore,
CSVMatrixStore,
FSStore,
HDFMatrixStore,
S3Store,
ProjectStorage,
ModelStorageEngine,
Expand Down Expand Up @@ -101,16 +102,12 @@ def matrix_stores():
df.to_csv(tmpcsv, compression="gzip")
df.to_hdf(tmphdf, "matrix")
csv = CSVMatrixStore(project_storage, [], "df")
hdf = HDFMatrixStore(project_storage, [], "df")
assert csv.design_matrix.equals(hdf.design_matrix)
# first test with caching
with csv.cache(), hdf.cache():
with csv.cache():
yield csv
yield hdf
# with the caching out of scope they will be nuked
# and these last two versions will not have any cache
yield csv
yield hdf


def test_MatrixStore_empty():
Expand Down Expand Up @@ -175,25 +172,24 @@ def test_MatrixStore_labels_idempotency():


def test_MatrixStore_save():
data = {
"entity_id": [1, 2],
"as_of_date": [pd.Timestamp(2017, 1, 1), pd.Timestamp(2017, 1, 1)],
"feature_one": [0.5, 0.6],
"feature_two": [0.5, 0.6],
"label": [1, 0]
}
df = pd.DataFrame.from_dict(data)
labels = df.pop("label")

for matrix_store in matrix_stores():
matrix_store.metadata = METADATA
data = {
"entity_id": [1, 2],
"as_of_date": [pd.Timestamp(2016, 1, 1), pd.Timestamp(2016, 1, 1)],
"feature_one": [0.5, 0.6],
"feature_two": [0.5, 0.6],
"label": [1, 0]
}
df = pd.DataFrame.from_dict(data)
df.set_index(MatrixStore.indices, inplace=True)
df = downcast_matrix(df)
bytestream = io.BytesIO(df.to_csv(None).encode('utf-8'))

matrix_store.save(bytestream, METADATA)

matrix_store.matrix_label_tuple = df, labels
matrix_store.save()
assert_frame_equal(
matrix_store.design_matrix,
df
)
labels = df.pop("label")
assert_frame_equal(matrix_store.design_matrix, df)
assert_series_equal(matrix_store.labels, labels)


def test_MatrixStore_caching():
Expand Down Expand Up @@ -228,16 +224,22 @@ def test_s3_save():
with mock_s3():
client = boto3.client("s3")
client.create_bucket(Bucket="fake-matrix-bucket", ACL="public-read-write")
for example in matrix_stores():
if not isinstance(example, CSVMatrixStore):
continue
project_storage = ProjectStorage("s3://fake-matrix-bucket")

tosave = CSVMatrixStore(project_storage, [], "test")
tosave.metadata = example.metadata
tosave.matrix_label_tuple = example.matrix_label_tuple
tosave.save()

tocheck = CSVMatrixStore(project_storage, [], "test")
assert tocheck.metadata == example.metadata
assert tocheck.design_matrix.to_dict() == example.design_matrix.to_dict()
project_storage = ProjectStorage("s3://fake-matrix-bucket")
matrix_store = project_storage.matrix_storage_engine().get_store('1234')
data = {
"entity_id": [1, 2],
"as_of_date": [pd.Timestamp(2017, 1, 1), pd.Timestamp(2017, 1, 1)],
"feature_one": [0.5, 0.6],
"feature_two": [0.5, 0.6],
"label": [1, 0]
}
df = pd.DataFrame.from_dict(data)
df.set_index(MatrixStore.indices, inplace=True)
df = downcast_matrix(df)
bytestream = io.BytesIO(df.to_csv(None).encode('utf-8'))

matrix_store.save(bytestream, METADATA)

labels = df.pop("label")
assert_frame_equal(matrix_store.design_matrix, df)
assert_series_equal(matrix_store.labels, labels)
4 changes: 2 additions & 2 deletions src/tests/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import sqlalchemy

from tests.utils import sample_config, populate_source_data
from triage.component.catwalk.storage import HDFMatrixStore, CSVMatrixStore
from triage.component.catwalk.storage import CSVMatrixStore

from triage.experiments import (
MultiCoreExperiment,
Expand Down Expand Up @@ -53,7 +53,7 @@ def num_linked_evaluations(db_engine):
)

parametrize_matrix_storage_classes = pytest.mark.parametrize(
("matrix_storage_class",), [(HDFMatrixStore,), (CSVMatrixStore,)]
("matrix_storage_class",), [(CSVMatrixStore,)]
)


Expand Down
15 changes: 14 additions & 1 deletion src/tests/test_utils_pandas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from triage.util.pandas import downcast_matrix
from triage.component.catwalk.storage import MatrixStore
import pandas as pd
from triage.util.pandas import columns_with_nulls, downcast_matrix
from .utils import matrix_creator


Expand All @@ -12,3 +13,15 @@ def test_downcast_matrix():

# make sure the memory usage is lower because there would be no point of this otherwise
assert downcasted_df.memory_usage().sum() < df.memory_usage().sum()


def test_columns_with_nulls():
assert columns_with_nulls(pd.DataFrame.from_dict({
"feature_one": [0.5, 0.6, 0.5, 0.6],
"feature_two": [0.5, 0.6, 0.5, 0.6],
})) == []

assert columns_with_nulls(pd.DataFrame.from_dict({
"feature_one": [0.5, None, 0.5, 0.6],
"feature_two": [0.5, 0.6, 0.5, 0.6],
})) == ["feature_one"]
11 changes: 5 additions & 6 deletions src/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import io
import random
import tempfile
from contextlib import contextmanager
Expand Down Expand Up @@ -180,12 +181,10 @@ def get_matrix_store(project_storage, matrix=None, metadata=None, write_to_db=Tr
matrix["as_of_date"] = matrix["as_of_date"].apply(pandas.Timestamp)
matrix.set_index(MatrixStore.indices, inplace=True)
matrix_store = project_storage.matrix_storage_engine().get_store(filename_friendly_hash(metadata))
matrix_store.metadata = metadata
new_matrix = matrix.copy()
labels = new_matrix.pop(matrix_store.label_column_name)
matrix_store.matrix_label_tuple = new_matrix, labels
matrix_store.save()
matrix_store.clear_cache()
matrix_store.save(
from_fileobj=io.BytesIO(matrix.to_csv(None).encode('utf-8')),
metadata=metadata
)
if write_to_db:
if (
session.query(Matrix).filter(Matrix.matrix_uuid == matrix_store.uuid).count()
Expand Down
6 changes: 2 additions & 4 deletions src/triage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import argparse
import importlib.util
import logging
import time
import os
import yaml
from datetime import datetime
Expand All @@ -15,7 +16,7 @@
from triage.component.audition import AuditionRunner
from triage.component.results_schema import upgrade_db, stamp_db, REVISION_MAPPING
from triage.component.timechop.plotting import visualize_chops
from triage.component.catwalk.storage import CSVMatrixStore, HDFMatrixStore, Store, ProjectStorage
from triage.component.catwalk.storage import CSVMatrixStore, Store, ProjectStorage
from triage.experiments import (
CONFIG_VERSION,
MultiCoreExperiment,
Expand All @@ -24,8 +25,6 @@
from triage.component.postmodeling.crosstabs import CrosstabsConfigLoader, run_crosstabs
from triage.util.db import create_engine

logging.basicConfig(level=logging.INFO)


def natural_number(value):
natural = int(value)
Expand Down Expand Up @@ -157,7 +156,6 @@ class Experiment(Command):

matrix_storage_map = {
"csv": CSVMatrixStore,
"hdf": HDFMatrixStore,
}
matrix_storage_default = "csv"

Expand Down
Loading

0 comments on commit 0a0817f

Please sign in to comment.