Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read parquet dataset as tf.data.Dataset #3956

Merged
merged 7 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions pyzoo/test/zoo/orca/data/test_read_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import tempfile
import shutil

import pytest
from unittest import TestCase
import os
from zoo.orca.data.image.parquet_dataset import ParquetDataset, read_parquet
from zoo.orca.data.image.utils import DType, FeatureType, SchemaField
import tensorflow as tf

resource_path = os.path.join(os.path.split(__file__)[0], "../../resources")


class TestReadParquet(TestCase):
def test_read_parquet_images_tf_dataset(self):
temp_dir = tempfile.mkdtemp()

def generator():
dataset_path = os.path.join(resource_path, "cat_dog")
for root, dirs, files in os.walk(os.path.join(dataset_path, "cats")):
for name in files:
image_path = os.path.join(root, name)
yield {"image": image_path, "label": 1, "id": image_path}

for root, dirs, files in os.walk(os.path.join(dataset_path, "dogs")):
for name in files:
image_path = os.path.join(root, name)
yield {"image": image_path, "label": 0, "id": image_path}

schema = {
"image": SchemaField(feature_type=FeatureType.IMAGE, dtype=DType.FLOAT32, shape=()),
"label": SchemaField(feature_type=FeatureType.SCALAR, dtype=DType.FLOAT32, shape=()),
"id": SchemaField(feature_type=FeatureType.SCALAR, dtype=DType.STRING, shape=())
}

try:
ParquetDataset.write("file://" + temp_dir, generator(), schema)
path = "file://" + temp_dir
output_types = {"id": tf.string, "image": tf.string, "label": tf.float32}
dataset = read_parquet("tf_dataset", input_path=path,
output_types=output_types)
for dt in dataset:
print(dt.keys())

finally:
shutil.rmtree(temp_dir)


if __name__ == "__main__":
pytest.main([__file__])
52 changes: 51 additions & 1 deletion pyzoo/zoo/orca/data/image/parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
from zoo.orca.data import SparkXShards
from zoo.orca.data.file import open_text, write_text
from zoo.orca.data.image.utils import chunks, dict_to_row, row_to_dict, encode_schema, \
decode_schema, SchemaField, FeatureType, DType, ndarray_dtype_to_dtype
decode_schema, SchemaField, FeatureType, DType, ndarray_dtype_to_dtype, \
decode_feature_type_ndarray
from zoo.orca.data.image.voc_dataset import VOCDatasets
from bigdl.util.common import get_node_and_core_number
import os
import numpy as np
import random
import pyarrow.parquet as pq
import pyarrow as pa
import io


Expand Down Expand Up @@ -265,3 +268,50 @@ def write_parquet(format, output_path, *args, **kwargs):
func, required_args = format_to_function[format]
_check_arguments(format, kwargs, required_args)
func(output_path=output_path, *args, **kwargs)

def pa_fs(path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this function to utils

if path.startswith("hdfs"): # hdfs://url:port/file_path
fs = pa.hdfs.connect()
path = path[len("hdfs://"):]
return path, fs
elif path.startswith("s3"):
raise ValueError("aws s3 is not supported for now")
else: # Local path
if path.startswith("file://"):
path = path[len("file://"):]
return path, pa.LocalFileSystem()


def read_as_tfdataset(path, output_types, output_shapes=None, *args, **kwargs):
"""
return a orca.data.tf.data.Dataset
:param path:
:return:
"""
path, _ = pa_fs(path)
import tensorflow as tf

def generator():
for root, dirs, files in os.walk(path):
for name in dirs:
if name.startswith("chunk="):
chunk_path = os.path.join(path, name)
pq_table = pq.read_table(chunk_path)
df = decode_feature_type_ndarray(path, pq_table.to_pandas())
for record in df.to_dict("records"):
yield record

dataset = tf.data.Dataset.from_generator(generator, output_types=output_types,
output_shapes=output_shapes)
return dataset


def read_parquet(format, input_path, *args, **kwargs):
supported_format = {"tf_dataset"}
if format not in supported_format:
raise ValueError(format + " is not supported, should be 'tf_dataset'.")

format_to_function = {"tf_dataset": (read_as_tfdataset, ["output_types"])}
func, required_args = format_to_function[format]
_check_arguments(format, kwargs, required_args)
func(path=input_path, *args, **kwargs)
12 changes: 12 additions & 0 deletions pyzoo/zoo/orca/data/image/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#

import copy
import os
from collections import namedtuple
from io import BytesIO
import numpy as np
from zoo.orca.data.file import open_text
from itertools import chain, islice

from enum import Enum
Expand Down Expand Up @@ -146,6 +148,16 @@ def dict_to_row(schema, row_dict):
return pyspark.Row(**row)


def decode_feature_type_ndarray(path, df):
schema_path = os.path.join(path, "_orca_metadata")
j_str = open_text(schema_path)[0]
schema = decode_schema(j_str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you should decode the schema only once and pass the decoced schema to this function.

for n, field in schema.items():
if field.feature_type == FeatureType.NDARRAY:
df[n] = df[n].map(lambda k: decode_ndarray(k))
return df


def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
Expand Down