Ray notes - Submit
[toc]
What ray.init()
does?
if driver_mode == LOCAL_MODE
# in python/ray/worker.py
_global_node = ray.node.LocalNode()
else if redis_address is None
, start a new cluster, (default behavior)
# Start the Ray processes. We set shutdown_at_exit=False because we
# shutdown the node in the ray.shutdown call that happens in the atexit
# handler. We still spawn a reaper process in case the atexit handler
# isn't called.
_global_node = ray.node.Node(
head=True,
shutdown_at_exit=False,
spawn_reaper=True,
ray_params=ray_params)
else connecting an existing cluster.
_global_node = ray.node.Node(
ray_params,
head=False,
shutdown_at_exit=False,
spawn_reaper=False,
connect_only=True)
then call connect() to connect this node, call some hook if any
connect(
_global_node,
mode=driver_mode,
log_to_driver=log_to_driver,
worker=global_worker,
driver_object_store_memory=driver_object_store_memory,
job_id=job_id,
internal_config=json.loads(_internal_config)
if _internal_config else {})
When creating Node, pass local_mode to RayParams
local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging.
class: ray.node.Node(): create a node, may be start some new process.
# python/ray/node.py
def __init__(self,
ray_params,
head=False,
shutdown_at_exit=True,
spawn_reaper=True,
connect_only=False):
"""Start a node.
Args:
ray_params (ray.params.RayParams): The parameters to use to
configure the node.
head (bool): True if this is the head node, which means it will
start additional processes like the Redis servers, monitor
processes, and web UI.
shutdown_at_exit (bool): If true, spawned processes will be cleaned
up if this process exits normally.
spawn_reaper (bool): If true, spawns a process that will clean up
other spawned processes if this process dies unexpectedly.
connect_only (bool): If true, connect to the node without starting
new processes.
"""
How to connect a Node (local & remote)?
-
Some basis check, eg. init twice?
-
create redis client if it is not Local Model
worker.redis_client = node.create_redis_client()
-
if it is Worker Model, do not specify the job id
-
if it is Local Mode, random a job id
-
if it is Driver Mode
# This is the code path of driver mode. if job_id is None: # TODO(qwang): use `GcsClient::GenerateJobId()` here. job_id = JobID.from_int( int(worker.redis_client.incr("JobCounter"))) # When tasks are executed on remote workers in the context of multiple # drivers, the current job ID is used to keep track of which job is # responsible for the task so that error messages will be propagated to # the correct driver. worker.worker_id = ray.utils.compute_driver_id_from_job( job_id).binary()
-
Create an object for interfacing with the global state.
ray.state.state._initialize_global_state(
node.redis_address, redis_password=node.redis_password)
-
initial
GlableControlStateOptions
andCoreWorker
gcs_options = ray._raylet.GcsClientOptions( redis_address, int(redis_port), node.redis_password, ) worker.core_worker = ray._raylet.CoreWorker( (mode == SCRIPT_MODE), node.plasma_store_socket_name, node.raylet_socket_name, job_id, gcs_options, node.get_logs_dir_path(), node.node_ip_address, node.node_manager_port, )
-
initial raylet client:
worker.raylet_client = ray._raylet.RayletClient(worker.core_worker)
-
start some diamond threads (log, print messages, listener etc)
How remote decorator works?
remote(*args, **kwargs)
when simply using @ray.remote, use
return make_decorator(worker=worker)(args[0])
if there are some arguments, use
return make_decorator(
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
object_store_memory=object_store_memory,
resources=resources,
max_calls=max_calls,
max_reconstructions=max_reconstructions,
max_retries=max_retries,
worker=worker)
noted the later case return a function: make_decorator, while the former return the result of invoke this function.
make_decorator()
Wen making remote class, using the argument worker to create actor:
return worker.make_actor(function_or_class, num_cpus, num_gpus,
memory, object_store_memory, resources,
max_reconstructions)
when making remote function, create a RemoteFunction
object
return ray.remote_function.RemoteFunction(
function_or_class, num_cpus, num_gpus, memory,
object_store_memory, resources, num_return_vals, max_calls,
max_retries)
worker.make_actor() ==> python/ray/actor.py
return ActorClass._ray_from_modified_class(
Class, ActorClassID.from_random(), max_reconstructions, num_cpus,
num_gpus, memory, object_store_memory, resources)
return an ActorClass object
Call the remote function:
# in ray.remote()
...
def invocation(args, kwargs):
if not args and not kwargs and not self._function_signature:
list_args = []
else:
list_args = ray.signature.flatten_args(
self._function_signature, args, kwargs)
if worker.mode == ray.worker.LOCAL_MODE:
object_ids = worker.local_mode_manager.execute(
self._function, self._function_descriptor, args, kwargs,
num_return_vals)
else:
object_ids = worker.core_worker.submit_task(
self._function_descriptor_list, list_args, num_return_vals,
is_direct_call, resources, max_retries)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
if self._decorator is not None:
invocation = self._decorator(invocation)
return invocation(args, kwargs)
The self._function_descriptor_list:
- module_name,
- class_name
- function_name
- function_source_hash
is_direct_call: it is false by default which is controlled by environment variable: RAY_FORCE_DIRECT
Instance the remote class (Actor)
# in ActorClass::_remote()
...
actor_id = worker.core_worker.create_actor(
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources, is_direct_call, max_concurrency,
detached, is_asyncio)
actor_handle = ActorHandle(
actor_id,
meta.modified_class.__module__,
meta.class_name,
meta.actor_method_names,
meta.method_decorators,
meta.method_signatures,
meta.actor_method_num_return_vals,
actor_method_cpu,
worker.current_session_and_job,
original_handle=True)
if name is not None:
ray.experimental.register_actor(name, actor_handle)
return actor_handle
return an ActorHandle object
Call the remote class (actor) function
return actor._actor_method_call(
self._method_name,
args=args,
kwargs=kwargs,
num_return_vals=num_return_vals)
How create_actor() works in CoreWorker
?
def create_actor(self,
function_descriptor,
args,
uint64_t max_reconstructions,
resources,
placement_resources,
c_bool is_direct_call,
int32_t max_concurrency,
c_bool is_detached,
c_bool is_asyncio):
cdef:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[c_string] dynamic_worker_options
unordered_map[c_string, double] c_resources
unordered_map[c_string, double] c_placement_resources
CActorID c_actor_id
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
prepare_resources(placement_resources, &c_placement_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
with nogil:
check_status(self.core_worker.get().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, is_direct_call, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, is_asyncio),
&c_actor_id))
return ActorID(c_actor_id.Binary())
It will invoke method CoreWorker::CreateActor
to create the actor and generate relative actor id.
How submit_task()
works in CoreWorker?:
from ray.includes.libcoreworker cimport CCoreWorker
class CoreWorker:
def submit_task(self,
function_descriptor,
args,
int num_return_vals,
c_bool is_direct_call,
resources,
int max_retries):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources) # some resources
task_options = CTaskOptions(
num_return_vals, is_direct_call, c_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector) # push arguments
with nogil:
check_status(self.core_worker.get().SubmitTask(
ray_function, args_vector, task_options, &return_ids,
max_retries))
return VectorToObjectIDs(return_ids)
python/ray/includes/libcoreworker.pxd:
cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CCoreWorker "ray::CoreWorker":
CCoreWorker(const CWorkerType worker_type, const CLanguage language,
const c_string &store_socket,
const c_string &raylet_socket, const CJobID &job_id,
const CGcsClientOptions &gcs_options,
const c_string &log_dir, const c_string &node_ip_address,
int node_manager_port,
CRayStatus (
CTaskType task_type,
const CRayFunction &ray_function,
const unordered_map[c_string, double] &resources,
const c_vector[shared_ptr[CRayObject]] &args,
const c_vector[CObjectID] &arg_reference_ids,
const c_vector[CObjectID] &return_ids,
c_vector[shared_ptr[CRayObject]] *returns) nogil,
CRayStatus() nogil,
c_bool ref_counting_enabled)
void Disconnect()
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
void StartExecutingTasks()
CRayStatus SubmitTask(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids,
int max_retries)
src/ray/core_worker/core_worker.cc:
Status CoreWorker::SubmitTask(const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids, int max_retries) {
TaskSpecBuilder builder;
const int next_task_index = worker_context_.GetNextTaskIndex();
const auto task_id =
TaskID::ForNormalTask(worker_context_.GetCurrentJobID(),
worker_context_.GetCurrentTaskID(), next_task_index);
const std::unordered_map<std::string, double> required_resources;
// TODO(ekl) offload task building onto a thread pool for performance
BuildCommonTaskSpec(
builder, worker_context_.GetCurrentJobID(), task_id,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, task_options.num_returns, task_options.resources,
required_resources,
task_options.is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET,
return_ids);
TaskSpecification task_spec = builder.Build();
if (task_options.is_direct_call) {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, max_retries);
return direct_task_submitter_->SubmitTask(task_spec);
} else {
return local_raylet_client_->SubmitTask(task_spec);
}
}
Some definition:
// Language of a task or worker.
enum Language {
PYTHON = 0;
JAVA = 1;
CPP = 2;
}
// Type of a worker.
enum WorkerType {
WORKER = 0;
DRIVER = 1;
}
// Type of a task.
enum TaskType {
// Normal task.
NORMAL_TASK = 0;
// Actor creation task.
ACTOR_CREATION_TASK = 1;
// Actor task.
ACTOR_TASK = 2;
}
to be continued…
Other useful reference
https://medium.com/distributed-computing-with-ray/how-ray-uses-grpc-and-arrow-to-outperform-grpc-43ec368cb385
At Anyscale, we’re working on a number of enhancements for Ray 1.0, including:
- Support for tasks/actors written in C++ and Rust (in addition to Python and Java today).
- Distributed reference counting for shared memory objects.
- Accelerated point-to-point GPU transfers.