Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shard size to dataframe_to_xshards #3491

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyzoo/test/zoo/orca/data/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,6 @@ def test_partition_nested_with_num_shards_specification(self):
assert errorInfo.type == ValueError
assert "number of shards" in str(errorInfo.value)


if __name__ == "__main__":
pytest.main([__file__])
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ def test_dataframe_train_eval(self):
feature_cols=["feature"],
label_cols=["label"])

def test_dataframe_shard_size_train_eval(self):
from zoo.orca import OrcaContext
OrcaContext._shard_size = 30
sc = init_nncontext()
rdd = sc.range(0, 100)
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])

estimator = get_estimator(workers_per_node=2)
estimator.fit(df, batch_size=4, epochs=2,
feature_cols=["feature"],
label_cols=["label"])
estimator.evaluate(df, batch_size=4,
feature_cols=["feature"],
label_cols=["label"])

def test_dataframe_predict(self):

sc = init_nncontext()
Expand Down
27 changes: 27 additions & 0 deletions pyzoo/test/zoo/orca/learn/ray/tf/test_tf_ray_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,33 @@ def test_dataframe(self):
label_cols=["label"])
trainer.predict(df, feature_cols=["feature"]).collect()

def test_dataframe_shard_size(self):
from zoo.orca import OrcaContext
OrcaContext._shard_size = 3
sc = init_nncontext()
rdd = sc.range(0, 10)
from pyspark.sql import SparkSession
spark = SparkSession(sc)
from pyspark.ml.linalg import DenseVector
df = rdd.map(lambda x: (DenseVector(np.random.randn(1,).astype(np.float)),
int(np.random.randint(0, 1, size=())))).toDF(["feature", "label"])

config = {
"lr": 0.8
}
trainer = Estimator.from_keras(
model_creator=model_creator,
verbose=True,
config=config,
workers_per_node=2)

trainer.fit(df, epochs=1, batch_size=4, steps_per_epoch=25,
feature_cols=["feature"],
label_cols=["label"])
trainer.evaluate(df, batch_size=4, num_steps=25, feature_cols=["feature"],
label_cols=["label"])
trainer.predict(df, feature_cols=["feature"]).collect()

def test_dataframe_predict(self):
sc = init_nncontext()
rdd = sc.parallelize(range(20))
Expand Down
43 changes: 43 additions & 0 deletions pyzoo/test/zoo/orca/learn/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,49 @@ def test_convert_predict_xshards_to_dataframe_multi_output(self):
expr = "sum(cast(feature <> flatten(prediction) as int)) as error"
assert result_df.selectExpr(expr).first()["error"] == 0

def test_array2dict(self):
from zoo.orca.learn.utils import arrays2dict
record_num = 100
shard_size = 30
data = [(np.float32(np.random.randn(1, 50)), np.float32([np.random.randint(0, 2,)]))
for i in range(record_num)]
result = arrays2dict(data, feature_cols=["feature"], label_cols=["label"],
shard_size=shard_size)
for i, d in enumerate(result):
if (record_num % shard_size == 0) or (i != record_num // shard_size):
assert d['x'].shape[0] == shard_size
assert d['y'].shape[0] == shard_size
else:
assert d['x'].shape[0] == record_num % shard_size
assert d['y'].shape[0] == record_num % shard_size

def test_array2dict_shard_size_none(self):
from zoo.orca.learn.utils import arrays2dict
record_num = 100
data = [(np.float32(np.random.randn(1, 50)), np.float32([np.random.randint(0, 2,)]))
for i in range(record_num)]
result = arrays2dict(data, feature_cols=["feature"], label_cols=["label"], shard_size=None)
for i, d in enumerate(result):
assert d['x'].shape[0] == record_num
assert d['y'].shape[0] == record_num

def test_dataframe_to_xshards(self):
rdd = self.sc.range(0, 100)
df = rdd.map(lambda x: ([float(x)] * 50,
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
num_partitions = df.rdd.getNumPartitions()
# test shard_size = None
shards = _dataframe_to_xshards(df, feature_cols=["feature"], label_cols=["label"])
num_shards = shards.rdd.count()
assert num_shards == num_partitions

from zoo.orca import OrcaContext
OrcaContext._shard_size = 1
shards = _dataframe_to_xshards(df, feature_cols=["feature"], label_cols=["label"])
num_shards = shards.rdd.count()
assert num_shards == df.rdd.count()


if __name__ == "__main__":
pytest.main([__file__])
19 changes: 19 additions & 0 deletions pyzoo/zoo/orca/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class OrcaContextMeta(type):
__eager_mode = True
_serialize_data_creator = False
_train_data_store = "DRAM"
__shard_size = None

@property
def log_output(cls):
Expand Down Expand Up @@ -101,6 +102,24 @@ def train_data_store(cls, value):
"train_data_store must be either DRAM or PMEM or DIRECT or DISK_n"
cls._train_data_store = value

@property
def _shard_size(cls):
"""
The number of Rows in Spark DataFrame to transform as one shard of SparkXShards. We convert
Spark DataFrame input to SparkXShards internally in fit/predict/evaluate of
PyTorchRayEstimator and TensorFlow2Estimator. This parameter may affect the performance in
transferring an SparkXShards to an RayXShards.
Default to be None, in which case Rows in one partition will be transformed as one shard.
"""
return cls.__shard_size

@_shard_size.setter
def _shard_size(cls, value):
if value is not None:
assert isinstance(value, int) and value > 0, \
"shard size should be either None or a positive integer."
cls.__shard_size = value


class OrcaContext(metaclass=OrcaContextMeta):
@staticmethod
Expand Down
83 changes: 47 additions & 36 deletions pyzoo/zoo/orca/learn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,59 +177,69 @@ def combine(pair):
return result_df


def arrays2dict(iter, feature_cols, label_cols):

feature_lists = [[] for col in feature_cols]
if label_cols is not None:
label_lists = [[] for col in label_cols]
else:
label_lists = None
def arrays2dict(iter, feature_cols, label_cols, shard_size=None):
def init_result_lists():
feature_lists = [[] for col in feature_cols]
if label_cols is not None:
label_lists = [[] for col in label_cols]
else:
label_lists = None
return feature_lists, label_lists

for row in iter:
# feature
if not isinstance(row[0], list):
features = [row[0]]
def add_row(data, results):
if not isinstance(data, list):
arrays = [data]
else:
features = row[0]
arrays = data

for i, arr in enumerate(arrays):
results[i].append(arr)

for i, arr in enumerate(features):
feature_lists[i].append(arr)
def merge_rows(results):
result_arrs = [np.stack(l) for l in results]
if len(result_arrs) == 1:
result_arrs = result_arrs[0]
else:
result_arrs = tuple(result_arrs)
return result_arrs

# label
def generate_output(feature_lists, label_lists):
feature_arrs = merge_rows(feature_lists)
if label_cols is not None:
if not isinstance(row[1], list):
labels = [row[1]]
else:
labels = row[1]
label_arrs = merge_rows(label_lists)
return {"x": feature_arrs, "y": label_arrs}
else:
return {"x": feature_arrs}

for i, arr in enumerate(labels):
label_lists[i].append(arr)
feature_lists, label_lists = init_result_lists()
counter = 0

feature_arrs = [np.stack(l) for l in feature_lists]
if len(feature_arrs) == 1:
feature_arrs = feature_arrs[0]
else:
feature_arrs = tuple(feature_arrs)
if label_lists is not None:
label_arrs = [np.stack(l) for l in label_lists]
if len(label_arrs) == 1:
label_arrs = label_arrs[0]
else:
label_arrs = tuple(label_arrs)
return [{"x": feature_arrs, "y": label_arrs}]
for row in iter:
counter += 1
add_row(row[0], feature_lists)
if label_cols is not None:
add_row(row[1], label_lists)

if shard_size and counter % shard_size == 0:
yield generate_output(feature_lists, label_lists)
feature_lists, label_lists = init_result_lists()

return [{"x": feature_arrs}]
if feature_lists[0]:
yield generate_output(feature_lists, label_lists)


def _dataframe_to_xshards(data, feature_cols, label_cols=None):
from zoo.orca import OrcaContext
schema = data.schema
shard_size = OrcaContext._shard_size
numpy_rdd = data.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
label_cols))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x,
feature_cols,
label_cols))
label_cols,
shard_size))
return SparkXShards(shard_rdd)


Expand Down Expand Up @@ -259,7 +269,8 @@ def maybe_dataframe_to_xshards(data, validation_data, feature_cols, label_cols,
if isinstance(data, DataFrame):
data, validation_data = dataframe_to_xshards(data, validation_data,
feature_cols=feature_cols,
label_cols=label_cols, mode=mode)
label_cols=label_cols,
mode=mode)
return data, validation_data


Expand Down