diff --git a/python/orca/src/bigdl/orca/learn/utils.py b/python/orca/src/bigdl/orca/learn/utils.py index a9a7df15400..6f4c03a2faa 100644 --- a/python/orca/src/bigdl/orca/learn/utils.py +++ b/python/orca/src/bigdl/orca/learn/utils.py @@ -177,59 +177,69 @@ def combine(pair): return result_df -def arrays2dict(iter, feature_cols, label_cols): - - feature_lists = [[] for col in feature_cols] - if label_cols is not None: - label_lists = [[] for col in label_cols] - else: - label_lists = None +def arrays2dict(iter, feature_cols, label_cols, shard_size=None): + def init_result_lists(): + feature_lists = [[] for col in feature_cols] + if label_cols is not None: + label_lists = [[] for col in label_cols] + else: + label_lists = None + return feature_lists, label_lists - for row in iter: - # feature - if not isinstance(row[0], list): - features = [row[0]] + def add_row(data, results): + if not isinstance(data, list): + arrays = [data] else: - features = row[0] + arrays = data + + for i, arr in enumerate(arrays): + results[i].append(arr) - for i, arr in enumerate(features): - feature_lists[i].append(arr) + def merge_rows(results): + result_arrs = [np.stack(l) for l in results] + if len(result_arrs) == 1: + result_arrs = result_arrs[0] + else: + result_arrs = tuple(result_arrs) + return result_arrs - # label + def generate_output(feature_lists, label_lists): + feature_arrs = merge_rows(feature_lists) if label_cols is not None: - if not isinstance(row[1], list): - labels = [row[1]] - else: - labels = row[1] + label_arrs = merge_rows(label_lists) + return {"x": feature_arrs, "y": label_arrs} + else: + return {"x": feature_arrs} - for i, arr in enumerate(labels): - label_lists[i].append(arr) + feature_lists, label_lists = init_result_lists() + counter = 0 - feature_arrs = [np.stack(l) for l in feature_lists] - if len(feature_arrs) == 1: - feature_arrs = feature_arrs[0] - else: - feature_arrs = tuple(feature_arrs) - if label_lists is not None: - label_arrs = [np.stack(l) for l in label_lists] - if len(label_arrs) == 1: - label_arrs = label_arrs[0] - else: - label_arrs = tuple(label_arrs) - return [{"x": feature_arrs, "y": label_arrs}] + for row in iter: + counter += 1 + add_row(row[0], feature_lists) + if label_cols is not None: + add_row(row[1], label_lists) + + if shard_size and counter % shard_size == 0: + yield generate_output(feature_lists, label_lists) + feature_lists, label_lists = init_result_lists() - return [{"x": feature_arrs}] + if feature_lists[0]: + yield generate_output(feature_lists, label_lists) def _dataframe_to_xshards(data, feature_cols, label_cols=None): + from zoo.orca import OrcaContext schema = data.schema + shard_size = OrcaContext._shard_size numpy_rdd = data.rdd.map(lambda row: convert_row_to_numpy(row, schema, feature_cols, label_cols)) shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x, feature_cols, - label_cols)) + label_cols, + shard_size)) return SparkXShards(shard_rdd) @@ -259,7 +269,8 @@ def maybe_dataframe_to_xshards(data, validation_data, feature_cols, label_cols, if isinstance(data, DataFrame): data, validation_data = dataframe_to_xshards(data, validation_data, feature_cols=feature_cols, - label_cols=label_cols, mode=mode) + label_cols=label_cols, + mode=mode) return data, validation_data diff --git a/python/orca/test/bigdl/orca/data/test_partition.py b/python/orca/test/bigdl/orca/data/test_partition.py index 264e3cd925a..1d2a060b89a 100644 --- a/python/orca/test/bigdl/orca/data/test_partition.py +++ b/python/orca/test/bigdl/orca/data/test_partition.py @@ -135,5 +135,6 @@ def test_partition_nested_with_num_shards_specification(self): assert errorInfo.type == ValueError assert "number of shards" in str(errorInfo.value) + if __name__ == "__main__": pytest.main([__file__]) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pytorch_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pytorch_backend.py index c180a1d3b0d..f6995f352d0 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pytorch_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pytorch_backend.py @@ -187,6 +187,23 @@ def test_dataframe_train_eval(self): feature_cols=["feature"], label_cols=["label"]) + def test_dataframe_shard_size_train_eval(self): + from zoo.orca import OrcaContext + OrcaContext._shard_size = 30 + sc = init_nncontext() + rdd = sc.range(0, 100) + df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [int(np.random.randint(0, 2, size=()))]) + ).toDF(["feature", "label"]) + + estimator = get_estimator(workers_per_node=2) + estimator.fit(df, batch_size=4, epochs=2, + feature_cols=["feature"], + label_cols=["label"]) + estimator.evaluate(df, batch_size=4, + feature_cols=["feature"], + label_cols=["label"]) + def test_dataframe_predict(self): sc = init_nncontext() diff --git a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py index 230e05d1fef..e177552e0c7 100644 --- a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py +++ b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py @@ -347,6 +347,33 @@ def test_dataframe(self): label_cols=["label"]) trainer.predict(df, feature_cols=["feature"]).collect() + def test_dataframe_shard_size(self): + from zoo.orca import OrcaContext + OrcaContext._shard_size = 3 + sc = init_nncontext() + rdd = sc.range(0, 10) + from pyspark.sql import SparkSession + spark = SparkSession(sc) + from pyspark.ml.linalg import DenseVector + df = rdd.map(lambda x: (DenseVector(np.random.randn(1,).astype(np.float)), + int(np.random.randint(0, 1, size=())))).toDF(["feature", "label"]) + + config = { + "lr": 0.8 + } + trainer = Estimator.from_keras( + model_creator=model_creator, + verbose=True, + config=config, + workers_per_node=2) + + trainer.fit(df, epochs=1, batch_size=4, steps_per_epoch=25, + feature_cols=["feature"], + label_cols=["label"]) + trainer.evaluate(df, batch_size=4, num_steps=25, feature_cols=["feature"], + label_cols=["label"]) + trainer.predict(df, feature_cols=["feature"]).collect() + def test_dataframe_predict(self): sc = init_nncontext() rdd = sc.parallelize(range(20)) diff --git a/python/orca/test/bigdl/orca/learn/test_utils.py b/python/orca/test/bigdl/orca/learn/test_utils.py index 30a832debd8..92ed50d0a8e 100644 --- a/python/orca/test/bigdl/orca/learn/test_utils.py +++ b/python/orca/test/bigdl/orca/learn/test_utils.py @@ -183,6 +183,49 @@ def test_convert_predict_xshards_to_dataframe_multi_output(self): expr = "sum(cast(feature <> flatten(prediction) as int)) as error" assert result_df.selectExpr(expr).first()["error"] == 0 + def test_array2dict(self): + from zoo.orca.learn.utils import arrays2dict + record_num = 100 + shard_size = 30 + data = [(np.float32(np.random.randn(1, 50)), np.float32([np.random.randint(0, 2,)])) + for i in range(record_num)] + result = arrays2dict(data, feature_cols=["feature"], label_cols=["label"], + shard_size=shard_size) + for i, d in enumerate(result): + if (record_num % shard_size == 0) or (i != record_num // shard_size): + assert d['x'].shape[0] == shard_size + assert d['y'].shape[0] == shard_size + else: + assert d['x'].shape[0] == record_num % shard_size + assert d['y'].shape[0] == record_num % shard_size + + def test_array2dict_shard_size_none(self): + from zoo.orca.learn.utils import arrays2dict + record_num = 100 + data = [(np.float32(np.random.randn(1, 50)), np.float32([np.random.randint(0, 2,)])) + for i in range(record_num)] + result = arrays2dict(data, feature_cols=["feature"], label_cols=["label"], shard_size=None) + for i, d in enumerate(result): + assert d['x'].shape[0] == record_num + assert d['y'].shape[0] == record_num + + def test_dataframe_to_xshards(self): + rdd = self.sc.range(0, 100) + df = rdd.map(lambda x: ([float(x)] * 50, + [int(np.random.randint(0, 2, size=()))]) + ).toDF(["feature", "label"]) + num_partitions = df.rdd.getNumPartitions() + # test shard_size = None + shards = _dataframe_to_xshards(df, feature_cols=["feature"], label_cols=["label"]) + num_shards = shards.rdd.count() + assert num_shards == num_partitions + + from zoo.orca import OrcaContext + OrcaContext._shard_size = 1 + shards = _dataframe_to_xshards(df, feature_cols=["feature"], label_cols=["label"]) + num_shards = shards.rdd.count() + assert num_shards == df.rdd.count() + if __name__ == "__main__": pytest.main([__file__])