Skip to content

Commit

Permalink
Downcast matrices [Resolves #372]
Browse files Browse the repository at this point in the history
Introduces triage.util.pandas.downcast_matrix, which downcasts the
values of a dataframe to their minimal values (e.g. float64->float32, or
int64->int32). This is called in two places:

1. Within MatrixBuilder to each smaller
matrix before they are joined together, with the intention of stopping
memory from spiking at the join step.

2. When loading a matrix into memory from the MatrixStore class.
Since it made sense to put this in the superclass as opposed to
forcing each subclass to implement it, it was added to the .matrix
getter. While doing this, it made sense to do the same for the set_index
call as well, allowing some further cleaning up of the MatrixStore
subclasses.
  • Loading branch information
thcrock committed Oct 10, 2018
1 parent f2d96df commit 36dcd4a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 17 deletions.
9 changes: 6 additions & 3 deletions src/tests/catwalk_tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd
from moto import mock_s3
from numpy.testing import assert_almost_equal

from triage.component.catwalk.storage import (
CSVMatrixStore,
Expand Down Expand Up @@ -112,7 +113,7 @@ def test_MatrixStore_resort_columns(self):
[0.4, 0.5],
[0.5, 0.4]
]
self.assertEqual(expected, result)
assert_almost_equal(expected, result)

def test_MatrixStore_already_sorted_columns(self):
for matrix_store in self.matrix_stores():
Expand All @@ -126,7 +127,7 @@ def test_MatrixStore_already_sorted_columns(self):
[0.5, 0.4],
[0.4, 0.5]
]
self.assertEqual(expected, result)
assert_almost_equal(expected, result)

def test_MatrixStore_sorted_columns_subset(self):
with self.assertRaises(ValueError):
Expand Down Expand Up @@ -160,7 +161,9 @@ def test_MatrixStore_save(self):
for matrix_store in self.matrix_stores():
original_dict = matrix_store.matrix.to_dict()
matrix_store.save()
assert matrix_store._load().to_dict() == original_dict
# nuke the cache to force reload
matrix_store.matrix = None
assert matrix_store.matrix.to_dict() == original_dict

def test_as_of_dates_entity_index(self):
data = {
Expand Down
13 changes: 13 additions & 0 deletions src/tests/test_utils_pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from triage.util.pandas import downcast_matrix
from .utils import matrix_creator


def test_downcast_matrix():
df = matrix_creator()
downcasted_df = downcast_matrix(df)

# make sure the contents are equivalent
assert((downcasted_df == df).all().all())

# make sure the memory usage is lower because there would be no point of this otherwise
assert downcasted_df.memory_usage().sum() < df.memory_usage().sum()
3 changes: 2 additions & 1 deletion src/triage/component/architect/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from triage.component.results_schema import Matrix
from triage.database_reflection import table_has_data
from triage.util.pandas import downcast_matrix


class BuilderBase(object):
Expand Down Expand Up @@ -463,7 +464,7 @@ def query_to_df(self, query_string, header='HEADER'):
out.seek(0)
df = pandas.read_csv(out, parse_dates=['as_of_date'])
df.set_index(['entity_id', 'as_of_date'], inplace=True)
return df
return downcast_matrix(df)

def merge_feature_csvs(self, dataframes, matrix_uuid):
"""Horizontally merge a list of feature CSVs
Expand Down
22 changes: 9 additions & 13 deletions src/triage/component/catwalk/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from urllib.parse import urlparse
from triage.component.results_schema import TestEvaluation, TrainEvaluation, \
TestPrediction, TrainPrediction
from triage.util.pandas import downcast_matrix

import pandas as pd
import s3fs
Expand Down Expand Up @@ -274,6 +275,11 @@ def matrix(self):
"""The raw matrix. Will load from storage into memory if not already loaded"""
if self.__matrix is None:
self.__matrix = self._load()
# Is the index already in place?
if self.__matrix.index.names != self.metadata['indices']:
self.__matrix.set_index(self.metadata['indices'], inplace=True)

self.__matrix = downcast_matrix(self.__matrix)
return self.__matrix

@matrix.setter
Expand Down Expand Up @@ -437,21 +443,15 @@ def head_of_matrix(self):
return head_of_matrix

def _load(self):
matrix = pd.read_hdf(self.matrix_base_store.path)

# Is the index already in place?
if matrix.index.names != self.metadata['indices']:
matrix.set_index(self.metadata['indices'], inplace=True)

return matrix
return pd.read_hdf(self.matrix_base_store.path)

def save(self):
hdf = pd.HDFStore(self.matrix_base_store.path,
mode='w',
complevel=4,
complib="zlib",
format='table')
hdf.put(self.matrix_uuid, self.matrix.apply(pd.to_numeric), data_columns=True)
hdf.put(self.matrix_uuid, self.matrix, data_columns=True)
hdf.close()

with self.metadata_base_store.open('wb') as fd:
Expand All @@ -478,11 +478,7 @@ def head_of_matrix(self):
def _load(self):
parse_dates_argument = ['as_of_date'] if 'as_of_date' in self.metadata['indices'] else False
with self.matrix_base_store.open('rb') as fd:
matrix = pd.read_csv(fd, parse_dates=parse_dates_argument)

matrix.set_index(self.metadata['indices'], inplace=True)

return matrix
return pd.read_csv(fd, parse_dates=parse_dates_argument)

def save(self):
self.matrix_base_store.write(self.matrix.to_csv(None).encode('utf-8'))
Expand Down
25 changes: 25 additions & 0 deletions src/triage/util/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from functools import partial
import pandas as pd
import logging


def downcast_matrix(df):
"""Downcast the numeric values of a matrix.
This will make the matrix use less memory by turning, for instance,
int64 columns into int32 columns.
First converts floats and then integers.
Operates on the dataframe as passed, without doing anything to the index.
Callers may pass an index-less dataframe if they wish to re-add the index afterwards
and save memory on the index storage.
"""
logging.info("Downcasting matrix. Starting memory usage: %s", df.memory_usage())
new_df = (
df.apply(partial(pd.to_numeric, downcast="float"))
.apply(partial(pd.to_numeric, downcast="integer"))
)

logging.info("Downcasted matrix. Final memory usage: %s", new_df.memory_usage())
return new_df

0 comments on commit 36dcd4a

Please sign in to comment.