Skip to content

Commit

Permalink
Support RayOnSpark for k8s and add docs (intel-analytics#2836)
Browse files Browse the repository at this point in the history
* support ray on k8s

* add to init orca context

* style

* minor

* minor

* ut
  • Loading branch information
hkvision committed Sep 11, 2020
1 parent 2e412e7 commit 961b53c
Showing 1 changed file with 51 additions and 38 deletions.
89 changes: 51 additions & 38 deletions python/orca/src/bigdl/orca/ray/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,35 +189,44 @@ def _start_ray_node(self, command, tag):
return process_info

def _get_ray_exec(self):
python_bin_dir = "/".join(self.python_loc.split("/")[:-1])
return "{}/python {}/ray".format(python_bin_dir, python_bin_dir)
if "envs" in self.python_loc: # conda environment
python_bin_dir = "/".join(self.python_loc.split("/")[:-1])
return "{}/python {}/ray".format(python_bin_dir, python_bin_dir)
else: # system environment with ray installed; for example: /usr/local/bin/ray
return "ray"

def gen_ray_start(self):
def gen_ray_start(self, master_ip):
def _start_ray_services(iter):
from pyspark import BarrierTaskContext
from zoo.util.utils import get_node_ip
tc = BarrierTaskContext.get()
# The address is sorted by partitionId according to the comments
# Partition 0 is the Master
task_addrs = [taskInfo.address for taskInfo in tc.getTaskInfos()]
print(task_addrs)
master_ip = task_addrs[0].split(":")[0]
print("current address {}".format(task_addrs[tc.partitionId()]))
current_ip = get_node_ip()
print("current address {}".format(current_ip))
print("master address {}".format(master_ip))
redis_address = "{}:{}".format(master_ip, self.redis_port)
process_info = None
if tc.partitionId() == 0:
print("partition id is : {}".format(tc.partitionId()))
process_info = self._start_ray_node(command=self._gen_master_command(),
tag="ray-master")
process_info.master_addr = redis_address
import tempfile
import filelock
base_path = tempfile.gettempdir()
master_flag_path = os.path.join(base_path, "ray_master_initialized")
if current_ip == master_ip: # Start the ray master.
lock_path = os.path.join(base_path, "ray_master_start.lock")
# It is possible that multiple executors are on one node. In this case,
# the first executor that gets the lock would be the master and it would
# create a flag to indicate the master has initialized.
# The flag file is removed when ray start processes finish so that this
# won't affect other programs.
with filelock.FileLock(lock_path):
if not os.path.exists(master_flag_path):
print("partition id is : {}".format(tc.partitionId()))
process_info = self._start_ray_node(command=self._gen_master_command(),
tag="ray-master")
process_info.master_addr = redis_address
os.mknod(master_flag_path)

tc.barrier()
if tc.partitionId() != 0:
import tempfile
import filelock

base_path = tempfile.gettempdir()
lock_path = os.path.join(base_path, "ray_on_spark_start.lock")
if not process_info: # Start raylets.
lock_path = os.path.join(base_path, "raylet_start.lock")
with filelock.FileLock(lock_path):
print("partition id is : {}".format(tc.partitionId()))
process_info = self._start_ray_node(
Expand All @@ -231,6 +240,8 @@ def _start_ray_services(iter):
extra_params=self.extra_params),
tag="raylet")
kill_redundant_log_monitors(redis_address=redis_address)
if os.path.exists(master_flag_path):
os.remove(master_flag_path)

yield process_info
return _start_ray_services
Expand Down Expand Up @@ -342,7 +353,8 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
"you need to manually specify num_ray_nodes and ray_node_cpu_cores "
"for RayContext to start ray services")

self.python_loc = os.environ['PYSPARK_PYTHON']
from zoo.util.utils import detect_python_location
self.python_loc = os.environ.get("PYSPARK_PYTHON", detect_python_location())
self.redis_port = random.randint(10000, 65535) if not redis_port else int(redis_port)
self.ray_service = RayServiceFuncGenerator(
python_loc=self.python_loc,
Expand All @@ -353,11 +365,6 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
verbose=self.verbose,
env=self.env,
extra_params=self.extra_params)
self._gather_cluster_ips()
from bigdl.util.common import init_executor_gateway
print("Start to launch the JVM guarding process")
init_executor_gateway(sc)
print("JVM guarding process has been successfully launched")
RayContext._active_ray_context = self

@classmethod
Expand All @@ -371,18 +378,17 @@ def get(cls, initialize=True):
raise Exception("No active RayContext. Please create a RayContext and init it first")

def _gather_cluster_ips(self):
total_cores = int(self.num_ray_nodes) * int(self.ray_node_cpu_cores)

"""
Get the ips of all Spark executors in the cluster. The first ip returned would be the
ray master.
"""
def info_fn(iter):
from pyspark import BarrierTaskContext
tc = BarrierTaskContext.get()
task_addrs = [taskInfo.address.split(":")[0] for taskInfo in tc.getTaskInfos()]
yield task_addrs
tc.barrier()
from zoo.util.utils import get_node_ip
yield get_node_ip()

ips = self.sc.range(0, total_cores,
numSlices=total_cores).barrier().mapPartitions(info_fn).collect()
return ips[0]
ips = self.sc.range(0, self.num_ray_nodes,
numSlices=self.num_ray_nodes).barrier().mapPartitions(info_fn).collect()
return ips

def stop(self):
if not self.initialized:
Expand Down Expand Up @@ -443,6 +449,10 @@ def init(self, driver_cores=0):
object_store_memory=self.object_store_memory,
resources=self.extra_params)
else:
self.cluster_ips = self._gather_cluster_ips()
from bigdl.util.common import init_executor_gateway
init_executor_gateway(self.sc)
print("JavaGatewayServer has been successfully launched on executors")
self._start_cluster()
self._address_info = self._start_driver(num_cores=driver_cores)

Expand All @@ -462,8 +472,9 @@ def _start_cluster(self):
print("Start to launch ray on cluster")
ray_rdd = self.sc.range(0, self.num_ray_nodes,
numSlices=self.num_ray_nodes)
# The first ip would be used to launch ray master.
process_infos = ray_rdd.barrier().mapPartitions(
self.ray_service.gen_ray_start()).collect()
self.ray_service.gen_ray_start(self.cluster_ips[0])).collect()

self.ray_processesMonitor = ProcessMonitor(process_infos, self.sc, ray_rdd, self,
verbose=self.verbose)
Expand All @@ -481,8 +492,10 @@ def _start_restricted_worker(self, num_cores, node_ip_address):
ray_node_cpu_cores=num_cores,
object_store_memory=self.object_store_memory,
extra_params=extra_param)
modified_env = self.ray_service._prepare_env()
print("Executing command: {}".format(command))
process_info = session_execute(command=command, fail_fast=True)
process_info = session_execute(command=command, env=modified_env,
tag="raylet", fail_fast=True)
ProcessMonitor.register_shutdown_hook(pgid=process_info.pgid)

def _start_driver(self, num_cores=0):
Expand Down

0 comments on commit 961b53c

Please sign in to comment.