Skip to content

Commit

Permalink
orca init (intel-analytics#2304)
Browse files Browse the repository at this point in the history
* orca init

* xshard migration

* doc fix

* add license

* indent

* add csv files

* fix path
  • Loading branch information
cyita committed May 9, 2020
1 parent d6806e5 commit 605e292
Show file tree
Hide file tree
Showing 9 changed files with 647 additions and 0 deletions.
15 changes: 15 additions & 0 deletions python/orca/src/bigdl/orca/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
18 changes: 18 additions & 0 deletions python/orca/src/bigdl/orca/data/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from zoo.orca.data.pandas.preprocessing import read_csv
from zoo.orca.data.pandas.preprocessing import read_json
224 changes: 224 additions & 0 deletions python/orca/src/bigdl/orca/data/pandas/preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import random
import ray
from pyspark.context import SparkContext

from bigdl.util.common import get_node_and_core_number

from zoo.common import get_file_list
from zoo.ray import RayContext
from zoo.orca.data.shard import RayDataShards, RayPartition, SparkDataShards
from zoo.orca.data.utils import *


def read_csv(file_path, context, **kwargs):
"""
Read csv files to DataShards
:param file_path: could be a csv file, multiple csv file paths separated by comma,
a directory containing csv files.
Supported file systems are local file system, hdfs, and s3.
:param context: SparkContext or RayContext
:return: DataShards
"""
if isinstance(context, RayContext):
return read_file_ray(context, file_path, "csv", **kwargs)
elif isinstance(context, SparkContext):
return read_file_spark(context, file_path, "csv", **kwargs)
else:
raise Exception("Context type should be RayContext or SparkContext")


def read_json(file_path, context, **kwargs):
"""
Read json files to DataShards
:param file_path: could be a json file, multiple json file paths separated by comma,
a directory containing json files.
Supported file systems are local file system, hdfs, and s3.
:param context: SparkContext or RayContext
:return: DataShards
"""
if isinstance(context, RayContext):
return read_file_ray(context, file_path, "json", **kwargs)
elif isinstance(context, SparkContext):
return read_file_spark(context, file_path, "json", **kwargs)
else:
raise Exception("Context type should be RayContext or SparkContext")


def read_file_ray(context, file_path, file_type, **kwargs):
file_paths = []
# extract all file paths
if isinstance(file_path, list):
[file_paths.extend(extract_one_path(path, file_type, context)) for path in file_path]
else:
file_paths = extract_one_path(file_path, file_type, context)

num_executors = context.num_ray_nodes
num_cores = context.ray_node_cpu_cores
num_partitions = num_executors * num_cores

# split files to partitions
random.shuffle(file_paths)
# remove empty partitions
file_partition_list = [partition for partition
in list(chunk(file_paths, num_partitions)) if partition]
# create shard actor to read data
shards = [RayPandasShard.remote() for i in range(len(file_partition_list))]
done_ids, undone_ids = \
ray.wait([shard.read_file_partitions.remote(file_partition_list[i], file_type, **kwargs)
for i, shard in enumerate(shards)], num_returns=len(shards))
assert len(undone_ids) == 0

# create initial partition
partitions = [RayPartition([shard]) for shard in shards]
data_shards = RayDataShards(partitions)
return data_shards


def read_file_spark(context, file_path, file_type, **kwargs):
file_url_splits = file_path.split("://")
prefix = file_url_splits[0]
node_num, core_num = get_node_and_core_number()

if prefix == "s3":
data_paths = list_s3_file(file_url_splits[1], file_type, os.environ)
else:
data_paths = get_file_list(file_path)
rdd = context.parallelize(data_paths, node_num * core_num)

if prefix == "hdfs":
def loadFile(iterator):
import pandas as pd
import pyarrow as pa
fs = pa.hdfs.connect()

for x in iterator:
with fs.open(x, 'rb') as f:
if file_type == "csv":
df = pd.read_csv(f, **kwargs)
elif file_type == "json":
df = pd.read_json(f, **kwargs)
else:
raise Exception("Unsupported file type")
yield df

pd_rdd = rdd.mapPartitions(loadFile)
elif prefix == "s3":
def loadFile(iterator):
access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
import boto3
import pandas as pd
s3_client = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
).client('s3', verify=False)
for x in iterator:
path_parts = x.split("://")[1].split('/')
bucket = path_parts.pop(0)
key = "/".join(path_parts)
obj = s3_client.get_object(Bucket=bucket, Key=key)
if file_type == "json":
df = pd.read_json(obj['Body'], **kwargs)
elif file_type == "csv":
df = pd.read_csv(obj['Body'], **kwargs)
else:
raise Exception("Unsupported file type")
yield df

pd_rdd = rdd.mapPartitions(loadFile)
else:
def loadFile(iterator):
import pandas as pd
for x in iterator:
if file_type == "csv":
df = pd.read_csv(x, **kwargs)
elif file_type == "json":
df = pd.read_json(x, **kwargs)
else:
raise Exception("Unsupported file type")
yield df

pd_rdd = rdd.mapPartitions(loadFile)

data_shards = SparkDataShards(pd_rdd)
return data_shards


@ray.remote
class RayPandasShard(object):
"""
Actor to read csv/json file to Pandas DataFrame and manipulate data
"""
def __init__(self, data=None):
self.data = data

def read_file_partitions(self, paths, file_type, **kwargs):
df_list = []
import pandas as pd
prefix = paths[0].split("://")[0]
if prefix == "hdfs":
import pyarrow as pa
fs = pa.hdfs.connect()
for path in paths:
with fs.open(path, 'rb') as f:
if file_type == "json":
df = pd.read_json(f, **kwargs)
elif file_type == "csv":
df = pd.read_csv(f, **kwargs)
else:
raise Exception("Unsupported file type")
df_list.append(df)
elif prefix == "s3":
import boto3
access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
s3_client = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
).client('s3', verify=False)
for path in paths:
path_parts = path.split("://")[1].split('/')
bucket = path_parts.pop(0)
key = "/".join(path_parts)
obj = s3_client.get_object(Bucket=bucket, Key=key)
if file_type == "json":
df = pd.read_json(obj['Body'], **kwargs)
elif file_type == "csv":
df = pd.read_csv(obj['Body'], **kwargs)
else:
raise Exception("Unsupported file type")
df_list.append(df)
else:
for path in paths:
if file_type == "json":
df = pd.read_json(path, **kwargs)
elif file_type == "csv":
df = pd.read_csv(path, **kwargs)
else:
raise Exception("Unsupported file type")
df_list.append(df)
self.data = pd.concat(df_list)
return 0

def apply(self, func, *args):
self.data = func(self.data, *args)
return 0

def get_data(self):
return self.data
116 changes: 116 additions & 0 deletions python/orca/src/bigdl/orca/data/shard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from zoo.orca.data.utils import *


class DataShards(object):
"""
A collection of data which can be pre-processed parallelly.
"""

def apply(self, func, *args):
"""
Appy function on each element in the DataShards
:param func: pre-processing function
:param args: arguments for the pre-processing function
:return: DataShard
"""
pass

def collect(self):
"""
Returns a list that contains all of the elements in this DataShards
:return: list of elements
"""
pass


class RayDataShards(DataShards):
"""
A collection of data which can be pre-processed parallelly on Ray
"""
def __init__(self, partitions):
self.partitions = partitions
self.shard_list = flatten([partition.shard_list for partition in partitions])

def apply(self, func, *args):
"""
Appy function on each element in the DataShards
:param func: pre-processing function.
In the function, the element object should be the first argument
:param args: rest arguments for the pre-processing function
:return: this DataShard
"""
import ray
done_ids, undone_ids = ray.wait([shard.apply.remote(func, *args)
for shard in self.shard_list],
num_returns=len(self.shard_list))
assert len(undone_ids) == 0
return self

def collect(self):
"""
Returns a list that contains all of the elements in this DataShards
:return: list of elements
"""
import ray
return ray.get([shard.get_data.remote() for shard in self.shard_list])

def repartition(self, num_partitions):
"""
Repartition DataShards.
:param num_partitions: number of partitions
:return: this DataShards
"""
shards_partitions = list(chunk(self.shard_list, num_partitions))
self.partitions = [RayPartition(shards) for shards in shards_partitions]
return self

def get_partitions(self):
"""
Return partition list of the DataShards
:return: partition list
"""
return self.partitions


class RayPartition(object):
"""
Partition of RayDataShards
"""

def __init__(self, shard_list):
self.shard_list = shard_list

def get_data(self):
return [shard.get_data.remote() for shard in self.shard_list]


class SparkDataShards(DataShards):
def __init__(self, rdd):
self.rdd = rdd

def apply(self, func, *args):
self.rdd = self.rdd.map(func(*args))
return self

def collect(self):
return self.rdd.collect()

def repartition(self, num_partitions):
self.rdd = self.rdd.repartition(num_partitions)
return self
Loading

0 comments on commit 605e292

Please sign in to comment.