Skip to content

Commit

Permalink
add shard size to dataframe_to_xshards (intel-analytics#3491)
Browse files Browse the repository at this point in the history
* add shard size to dataframe_to_xshards

* add ut and change default shard_size as None

* add shard size in orca context and fix style

* add ut in pytorch estimator and tf estimator

* move shard_size to internal use

* fix
  • Loading branch information
shanyu-sys committed Mar 1, 2021
1 parent e1ee507 commit f712d54
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 36 deletions.
83 changes: 47 additions & 36 deletions python/orca/src/bigdl/orca/learn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,59 +177,69 @@ def combine(pair):
return result_df


def arrays2dict(iter, feature_cols, label_cols):

feature_lists = [[] for col in feature_cols]
if label_cols is not None:
label_lists = [[] for col in label_cols]
else:
label_lists = None
def arrays2dict(iter, feature_cols, label_cols, shard_size=None):
def init_result_lists():
feature_lists = [[] for col in feature_cols]
if label_cols is not None:
label_lists = [[] for col in label_cols]
else:
label_lists = None
return feature_lists, label_lists

for row in iter:
# feature
if not isinstance(row[0], list):
features = [row[0]]
def add_row(data, results):
if not isinstance(data, list):
arrays = [data]
else:
features = row[0]
arrays = data

for i, arr in enumerate(arrays):
results[i].append(arr)

for i, arr in enumerate(features):
feature_lists[i].append(arr)
def merge_rows(results):
result_arrs = [np.stack(l) for l in results]
if len(result_arrs) == 1:
result_arrs = result_arrs[0]
else:
result_arrs = tuple(result_arrs)
return result_arrs

# label
def generate_output(feature_lists, label_lists):
feature_arrs = merge_rows(feature_lists)
if label_cols is not None:
if not isinstance(row[1], list):
labels = [row[1]]
else:
labels = row[1]
label_arrs = merge_rows(label_lists)
return {"x": feature_arrs, "y": label_arrs}
else:
return {"x": feature_arrs}

for i, arr in enumerate(labels):
label_lists[i].append(arr)
feature_lists, label_lists = init_result_lists()
counter = 0

feature_arrs = [np.stack(l) for l in feature_lists]
if len(feature_arrs) == 1:
feature_arrs = feature_arrs[0]
else:
feature_arrs = tuple(feature_arrs)
if label_lists is not None:
label_arrs = [np.stack(l) for l in label_lists]
if len(label_arrs) == 1:
label_arrs = label_arrs[0]
else:
label_arrs = tuple(label_arrs)
return [{"x": feature_arrs, "y": label_arrs}]
for row in iter:
counter += 1
add_row(row[0], feature_lists)
if label_cols is not None:
add_row(row[1], label_lists)

if shard_size and counter % shard_size == 0:
yield generate_output(feature_lists, label_lists)
feature_lists, label_lists = init_result_lists()

return [{"x": feature_arrs}]
if feature_lists[0]:
yield generate_output(feature_lists, label_lists)


def _dataframe_to_xshards(data, feature_cols, label_cols=None):
from zoo.orca import OrcaContext
schema = data.schema
shard_size = OrcaContext._shard_size
numpy_rdd = data.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
label_cols))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x,
feature_cols,
label_cols))
label_cols,
shard_size))
return SparkXShards(shard_rdd)


Expand Down Expand Up @@ -259,7 +269,8 @@ def maybe_dataframe_to_xshards(data, validation_data, feature_cols, label_cols,
if isinstance(data, DataFrame):
data, validation_data = dataframe_to_xshards(data, validation_data,
feature_cols=feature_cols,
label_cols=label_cols, mode=mode)
label_cols=label_cols,
mode=mode)
return data, validation_data


Expand Down
1 change: 1 addition & 0 deletions python/orca/test/bigdl/orca/data/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,6 @@ def test_partition_nested_with_num_shards_specification(self):
assert errorInfo.type == ValueError
assert "number of shards" in str(errorInfo.value)


if __name__ == "__main__":
pytest.main([__file__])
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ def test_dataframe_train_eval(self):
feature_cols=["feature"],
label_cols=["label"])

def test_dataframe_shard_size_train_eval(self):
from zoo.orca import OrcaContext
OrcaContext._shard_size = 30
sc = init_nncontext()
rdd = sc.range(0, 100)
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])

estimator = get_estimator(workers_per_node=2)
estimator.fit(df, batch_size=4, epochs=2,
feature_cols=["feature"],
label_cols=["label"])
estimator.evaluate(df, batch_size=4,
feature_cols=["feature"],
label_cols=["label"])

def test_dataframe_predict(self):

sc = init_nncontext()
Expand Down
27 changes: 27 additions & 0 deletions python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,33 @@ def test_dataframe(self):
label_cols=["label"])
trainer.predict(df, feature_cols=["feature"]).collect()

def test_dataframe_shard_size(self):
from zoo.orca import OrcaContext
OrcaContext._shard_size = 3
sc = init_nncontext()
rdd = sc.range(0, 10)
from pyspark.sql import SparkSession
spark = SparkSession(sc)
from pyspark.ml.linalg import DenseVector
df = rdd.map(lambda x: (DenseVector(np.random.randn(1,).astype(np.float)),
int(np.random.randint(0, 1, size=())))).toDF(["feature", "label"])

config = {
"lr": 0.8
}
trainer = Estimator.from_keras(
model_creator=model_creator,
verbose=True,
config=config,
workers_per_node=2)

trainer.fit(df, epochs=1, batch_size=4, steps_per_epoch=25,
feature_cols=["feature"],
label_cols=["label"])
trainer.evaluate(df, batch_size=4, num_steps=25, feature_cols=["feature"],
label_cols=["label"])
trainer.predict(df, feature_cols=["feature"]).collect()

def test_dataframe_predict(self):
sc = init_nncontext()
rdd = sc.parallelize(range(20))
Expand Down
43 changes: 43 additions & 0 deletions python/orca/test/bigdl/orca/learn/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,49 @@ def test_convert_predict_xshards_to_dataframe_multi_output(self):
expr = "sum(cast(feature <> flatten(prediction) as int)) as error"
assert result_df.selectExpr(expr).first()["error"] == 0

def test_array2dict(self):
from zoo.orca.learn.utils import arrays2dict
record_num = 100
shard_size = 30
data = [(np.float32(np.random.randn(1, 50)), np.float32([np.random.randint(0, 2,)]))
for i in range(record_num)]
result = arrays2dict(data, feature_cols=["feature"], label_cols=["label"],
shard_size=shard_size)
for i, d in enumerate(result):
if (record_num % shard_size == 0) or (i != record_num // shard_size):
assert d['x'].shape[0] == shard_size
assert d['y'].shape[0] == shard_size
else:
assert d['x'].shape[0] == record_num % shard_size
assert d['y'].shape[0] == record_num % shard_size

def test_array2dict_shard_size_none(self):
from zoo.orca.learn.utils import arrays2dict
record_num = 100
data = [(np.float32(np.random.randn(1, 50)), np.float32([np.random.randint(0, 2,)]))
for i in range(record_num)]
result = arrays2dict(data, feature_cols=["feature"], label_cols=["label"], shard_size=None)
for i, d in enumerate(result):
assert d['x'].shape[0] == record_num
assert d['y'].shape[0] == record_num

def test_dataframe_to_xshards(self):
rdd = self.sc.range(0, 100)
df = rdd.map(lambda x: ([float(x)] * 50,
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
num_partitions = df.rdd.getNumPartitions()
# test shard_size = None
shards = _dataframe_to_xshards(df, feature_cols=["feature"], label_cols=["label"])
num_shards = shards.rdd.count()
assert num_shards == num_partitions

from zoo.orca import OrcaContext
OrcaContext._shard_size = 1
shards = _dataframe_to_xshards(df, feature_cols=["feature"], label_cols=["label"])
num_shards = shards.rdd.count()
assert num_shards == df.rdd.count()


if __name__ == "__main__":
pytest.main([__file__])

0 comments on commit f712d54

Please sign in to comment.