Ray Core System Design: A Deep Dive
Introduction
This article provides my understanding and summary of the technical details around Ray Core, the heart of the distributed engine for modern ML workflows. We will cover the high-level design and dive into technical details on task scheduling, object storage, and fault tolerance.
Why?
I wrote this post make sure that I understood how Ray addresses limitations in traditional distributed computing frameworks. Having worked primarily with MapReduce-style systems, I was curious about Ray’s approach.
The ML Workflow Problem:
- Varying resource requirements (CPU for preprocessing, GPU for training)
- Complex dependencies (evaluation depends on trained models)
- Shared datasets that shouldn’t be copied repeatedly
- Need for fault tolerance
Ray was designed specifically to handle these patterns—or at least attempts to handle them effectively.
What is Ray?
Overview
Ray is a distributed computing framework that provides primitives for building scalable Python applications. It abstracts away the complexity of distributed execution while maintaining high performance. It helps scale applications from one machine to hundreds of nodes. We will focus on the core part of this engine.
Design Philosophy
Ray prioritizes performance over durability, making it well-suited for ML pipelines where this trade-off is acceptable:
-
Tasks are often deterministic - Many tasks produce consistent outputs for given inputs, enabling reliable reconstruction (although not all tasks are deterministic)
-
Recomputation is acceptable - Tasks can be re-executed without significant cost when failures occur
-
Low scheduling overhead matters - Microsecond-level task scheduling enables fine-grained parallelism
-
Minimal data copying overhead - Objects are transferred between machines only when needed, avoiding redundant communication and serialization/deserialization overhead
-
Dynamic execution patterns - Task graphs that change based on runtime conditions and intermediate results
All these properties lead us to the reasonable idea of avoiding disk work as much as possible and prioritizing in-memory computation. This is exactly what Ray does—it can afford to lose intermediate results and reconstruct them when needed, rather than persisting everything to stable storage.
Core Primitives
- Tasks: Stateless remote functions
- Actors: Stateful remote classes with methods
- Objects: Immutable values stored in distributed object store
- Object References: References (futures) to objects
This list of core primitives appears extremely simple. When we write code, we always use functions, classes, and data. It is a basic set of primitives, and this is probably one of the reasons why Ray has become so popular today.
Driver: The entry point program that submits tasks and coordinates execution Job: Container for all tasks, objects, and actors originating from the same driver
Programming Model
Tasks: Stateless Functions
import ray
@ray.remote
def add(a, b): return a + b
# Call remote function and get the result
future = add.remote(2, 3)
result = ray.get(future) # -> 5
Key Characteristics:
- Stateless: No persistent state between invocations
- Asynchronous: Returns ObjectRef (future) immediately
- Location transparent: Executed on any available worker
- Automatic distribution: Ray handles scheduling and placement
Use Cases:
- Parallel data processing: Independent operations on data chunks without shared state requirements
- ETL pipelines: Stateless transformation steps that can be distributed across workers
Actors: Stateful Services
@ray.remote
class Counter:
def __init__(self): self.count = 0
def inc(self): self.count += 1
def get(self): return self.count
# Create actor instance and call actor methods
counter = Counter.remote()
for _ in range(10):
counter.inc.remote()
result = ray.get(counter.get.remote()) # -> 10
Key Characteristics:
- Stateful: Maintains persistent state across method calls
- Single instance: Each actor is a unique process with isolated state
- Concurrency: Multiple actors can run simultaneously, but each actor processes methods sequentially
- Fault tolerance: State is lost on actor failure (reconstruction logic must be implemented manually)
Use Cases:
- Parameter servers for distributed ML training (maintaining model weights and gradients)
- Stateful services such as in-memory databases, caches, and environment simulators that maintain state across requests
- Resource managers for coordinating access to shared resources (GPU pools, database connections, file handles)
- Long-running services that need to maintain connections or expensive-to-initialize state (model inference servers, data preprocessors)
Ray’s actor model provides scalability through isolated processes and by eliminating shared state synchronization issues (or at least making best efforts to achieve this).
Object References and the Distributed Object Store
# Put object in store
ref = ray.put(array)
# Pass reference to tasks
for _ in range(10):
process.remote(ref)
Key Characteristics:
- Immutable: Objects cannot be modified once created, enabling safe sharing across processes
- Lazy evaluation: Objects created by tasks aren’t materialized until explicitly retrieved
Use Cases:
- Cross-task data flow: Enable complex task dependencies where outputs of some tasks become inputs to others
- Large dataset sharing: Pass references to large NumPy arrays or DataFrames between multiple tasks without copying
- Intermediate results caching: Store computation results that multiple downstream tasks depend on
System Design
To understand how Ray works as a distributed system, we’ll examine how its key components interact. We start with a high-level overview. First, we’ll cover how cluster metadata and coordination are managed across nodes. Next, we’ll look at efficient data storage and sharing between processes. We will also study how the garbage collector works. Then, we’ll discuss how tasks are scheduled and distributed across the cluster.
Ray’s architecture consists of four core layers that work together:
-
Global Control Service (GCS) - Centralized metadata management and cluster coordination
-
Raylet - Per-node daemon responsible for resource management, object management, and distributed task scheduling
-
Object Store - Shared memory storage for zero-copy data sharing within nodes
-
Worker Processes - Execution environments for tasks and actors
This layered design separates concerns: GCS handles cluster-wide decisions, Raylets manage local resources and scheduling, the object store provides efficient data sharing, and workers execute the actual computation.
Key Components
Global Control Service
The Global Control Service (GCS) is Ray’s centralized metadata store and control plane that manages cluster-level state and coordinates distributed operations across the Ray cluster.
Primary Functions
- Cluster Metadata Management - Central repository for all cluster state
- Actor Lifecycle Management - Registration, creation, scheduling, monitoring, and restart of actors
- Placement Group Management - Scheduling and lifecycle management of placement groups (a mechanism to control allocation locality)
- Node Management - Tracking node registration, health, and resources
- Job Management - Job registration, configuration, and lifecycle tracking
- Resource Coordination - Cluster-wide resource tracking and allocation decisions
- Worker Management - Worker process registration and failure detection
- Function Registry - Remote function and actor class definitions
The GCS serves as the source of truth for cluster-wide metadata, including shared resources and cluster health. However, while the GCS is the source of truth for metadata, the actual resource utilization and task execution occur at the distributed Raylet level, and Raylets push updates to the GCS to maintain this view.
Typically, the GCS is located on the head node and represents a single point of failure. While recent versions of Ray support an experimental recovery mechanism, for simplicity, we can assume that if the GCS fails, we need to restart the cluster and rerun all jobs.
Raylet
The Raylet is Ray’s per-node daemon responsible for local resource management, task scheduling, and object management.
Primary Functions:
- Local Resource Management - Track and allocate node-local resources (CPU, GPU, memory, custom resources)
- Task Scheduling - Schedule tasks onto local workers based on resource availability
- Worker Pool Management - Manage lifecycle of worker processes (start, stop, lease, return)
- Object Management - Handle local object storage, pinning, spilling, and eviction
- Lease Management - Queue and dispatch worker lease requests
- Placement Group Management - Reserve and manage placement group bundle resources
- Resource Reporting - Report resource usage to GCS
- Memory Management - Monitor memory usage, trigger GC, kill workers on OOM
- Dependency Resolution - Manage task dependencies and object waits
Object Store
The Object Store (or Plasma) is a shared-memory object storage system that stores immutable objects on a single node. Each node runs one Object Store process.
Primary Functions:
- Shared Memory Management - Allocate and manage shared memory for objects
- Object Lifecycle - Create, seal, and delete objects
- Reference Counting - Track object references to determine evictability
- Memory Eviction - LRU-based eviction when memory is full
- Object Transfer - Push/pull objects between nodes
- Spilling/Restore - Spill objects to external storage when low on memory
- Zero-Copy Access - Provide memory-mapped access to objects
Worker Processes
The Worker Process is Ray’s task and actor execution engine. Each worker is a separate process that executes user code.
Primary Functions:
- Task Execution - Execute remote functions assigned by the local Raylet
- Actor Execution - Run actor methods and maintain actor state across invocations
- Object Management - Store, retrieve, and track object references with distributed reference counting
- Child Task Submission - Submit nested tasks and actor calls to the scheduling system
- Dependency Resolution - Wait for object dependencies before beginning execution
- Direct Actor Communication - Handle actor-to-actor method calls without Raylet involvement
- Streaming Support - Process generator tasks and streaming operations
Deepdive
Ownership Model and Garbage Collector
Ray implements a distributed ownership model for managing object lifecycles and garbage collection. Each object has a designated owner - typically the worker process that created it or the task that produced it as a return value. The owner is responsible for:
- Reference Tracking - Maintaining the reference count for the object across all nodes
- Garbage Collection Coordination - Deciding when an object can be safely deleted
- Location Tracking - Knowing which nodes currently store copies of the object
When an object is created, Ray establishes a ownership chain: the creating task owns its return values, and any process holding an ObjectRef becomes a borrower. Borrowers periodically send heartbeats to owners to indicate they’re still using the object. When all borrowers stop sending heartbeats (indicating they no longer need the object), the owner can trigger garbage collection across all nodes storing the object.
This distributed ownership model enables automatic distributed garbage collection and avoids the overhead of centralized garbage collection that would create a bottleneck.
Let’s examine how it works.
Reference Counting Mechanism
The Core Problem
Question: Worker A creates obj. Workers B, C, D use it. How does A know when everyone is done?
Solution: Owner tracks the borrowers list. Can only delete when RefCount=0 and the list of borrowers is empty.
The core structure for reference counting has the following form:
struct Reference {
// Reference Counts
size_t local_ref_count; // Python refs in this worker
size_t submitted_task_ref_count; // Pending tasks using it
size_t lineage_ref_count; // Tasks that may retry
// Ownership
rpc::Address owner_address_; // Who owns this
NodeID pinned_at_node_id_; // Where it lives in plasma
bool owned_by_us_;
bool is_reconstructable_;
// Pointers
std::unique_ptr<BorrowInfo> borrow_info; // Only if has borrowers
std::unique_ptr<NestedReferenceCount> nested_refs; // Only if nested
// Metadata
std::string call_site_; // "file.py:123"
int64_t object_size_; // Bytes
absl::flat_hash_set<NodeID> locations; // Where copies exist
std::string spilled_url_; // If spilled to external storage
// State Flags
bool publish_ref_removed;
bool pending_creation_;
bool spilled;
// ... other flags
};
struct BorrowInfo {
absl::flat_hash_set<rpc::Address> borrowers; // Who's using this object
absl::flat_hash_map<ObjectID, rpc::Address> stored_in_objects;
};
struct NestedReferenceCount {
absl::flat_hash_set<ObjectID> contained_in_owned; // Outer objects we own
absl::flat_hash_set<ObjectID> contained_in_borrowed; // Outer objects we borrow
absl::flat_hash_set<ObjectID> contains; // Inner objects
};
Protocol Overview
- Owner creates object
- Borrower receives object in task → knows owner address
- Borrower finishes task → sends metadata back to owner
- Owner adds borrower to tracking list, subscribes to their cleanup
- Borrower’s ref drops to 0 → publishes notification
- Owner receives notification → removes from borrowers list
- When all borrowers are gone + local refs are gone → delete
Key Mechanism
Protocol Steps:
- Borrower Task Completion - When task finishes, borrower reports usage:
{
object_id: obj1,
owner_address: A,
has_local_ref: true/false, // Still using it?
borrowers: [addresses], // Others I passed it to
stored_in_objects: [outer_ids] // Nested in task returns
}
- Owner Update Logic - Owner merges borrower information:
// Add direct borrower if still using object
if (has_local_ref) borrowers.insert(sender_address);
// Add all transitive borrowers
for (auto& addr : message.borrowers) borrowers.insert(addr);
// Add owners of containing objects
for (auto& [outer_id, owner] : message.stored_in_objects) {
borrowers.insert(owner);
nested_refs->contained_in_borrowed.insert(outer_id);
}
- Cleanup Notification - When borrower’s local refs drop to zero:
// Borrower publishes to owner
publish_ref_removed(object_id, owner_address);
- Deletion Decision - Owner checks if safe to delete:
bool can_delete = (local_ref_count + submitted_task_ref_count + borrowers.size()) == 0;
Ray’s garbage collection represents a carefully designed trade-off between performance and complexity:
Accepted Costs:
- Deletion latency: seconds due to async coordination
- Memory overhead: ~100 bytes metadata per object for reference tracking
- Code complexity: ~1800 lines of distributed GC protocol implementation
- Failure scenarios: Potential memory leaks during cascading worker failures
Benefits Gained:
- Non-blocking execution: Tasks complete without waiting for GC coordination
- Horizontal scaling: No centralized bottlenecks in reference management
- Correctness guarantees: Distributed consensus ensures objects aren’t prematurely deleted
- Python semantics: Familiar reference counting behavior for developers
Key Trade-offs (and Why They Made These Choices)
Design Trade-offs:
-
Reference Counting vs. Tracing GC
- Choice: Reference counting with distributed ownership
- Why: ML workloads rarely create cycles; deterministic deletion is more valuable than pause-free collection
- Trade-off: Higher per-operation overhead but predictable memory reclamation
-
Distributed vs. Centralized GC
- Choice: Owner-based distributed garbage collection
- Why: Centralized coordination can’t scale to millions of ML objects across hundreds of nodes
- Trade-off: Complex protocols but horizontal scalability
-
Async vs. Sync Coordination
- Choice: Asynchronous pub-sub for GC notifications
- Why: Task execution latency is critical; GC can be eventually consistent
- Trade-off: Higher cleanup latency but non-blocking task completion
-
Eager vs. Lazy Deletion
- Choice: Eager deletion when reference count reaches zero
- Why: GPU memory is precious; users expect
del xto release memory quickly - Trade-off: Higher message volume but predictable memory usage
-
Strong Safety vs. Performance
- Choice: Never delete objects prematurely (strong safety guarantees)
- Why: GPU task failures are expensive - correctness trumps memory efficiency
- Trade-off: Objects may live longer than necessary but no use-after-free errors
-
Eventual vs. Strong Consistency
- Choice: Eventual consistency in borrower tracking
- Why: Low-latency task submission matters more than temporarily stale GC state
- Trade-off: Potential memory leaks during network partitions but higher throughput
When This Works: The protocol is optimized for Ray’s target workload—ML objects that are large, long-lived, and moderately shared. The coordination overhead becomes negligible compared to object size and computation time.
When This Breaks: The design struggles with workloads involving tiny objects, short-lived data, or massive fanout, where protocol overhead dominates actual computation.
Summary
Ray’s scheduling is:
- Distributed: Each raylet makes independent decisions with cluster view
- Bottom-up: Workers drive, raylets respond
- Hybrid: Balances locality, load, availability
- Spillback-based: Stateless raylets, worker handles retries
- Eventually consistent: Periodic sync
- Best-effort: Optimizes for common case, degrades gracefully
Task Scheduling
The Core Problem
Challenge: Schedule tasks across nodes with:
- Heterogeneous resources (CPU, GPU, memory)
- Task dependencies and resource requirements
- Low scheduling latency
- Failure and load balancing handling
Why naive approaches fail:
- Centralized scheduler: Bottleneck at scale
- Random assignment: Ignores resources and locality
- Static partitioning: Can’t adapt to dynamic workloads
- Fully decentralized: No global coordination
Algorithm
Scheduling is not about perfect placement; it’s about good-enough placement with low latency.
Core Trade-off: Ray optimizes for scheduling latency over placement optimality Core Idea: Fully decentralized, bottom-up, pull-based scheduling with stateless raylets.
How a Task Gets Executed
- Worker Request: Worker calls
train.remote()→ asks its local raylet for a lease (not “run this task”). At this moment, we get an ID for the return value. This is fundamentally different from futures in many systems where the future ID might be assigned by the executor. - Raylet Decision: Raylet either:
- Grants locally: Schedules on its node, or
- Spillback: Replies “go ask node X instead”
- Worker Retry: Worker retries on suggested node(s) until it gets a lease
- Direct Execution: Once granted, worker talks directly to assigned worker process (raylet steps out)
Node Selection Logic
When raylet needs to pick the “best” node:
- Filter: Remove nodes without sufficient resources
- Score: Rank nodes by utilization (lower = better)
- Pack: Prefer under-utilized nodes (<80% → aggressive packing)
- Locality: Strong bias toward local node (avoids worker cold starts)
- Randomize: Take top ~20% best nodes and pick one randomly (avoids thundering herd)
Resource & Dependency Management
- Parallel Prefetch: Dependencies (Ray objects) fetched in parallel before task runs
- Lazy Wait: Task waits only when objects are actually missing
- Precise Allocation: Specific CPU/GPU IDs and memory reservations
Design Rationale
Why These Decisions?
- Stateless raylets → Simple failure recovery, no task buffering overhead
- Bottom-up (workers pull leases) → No central bottleneck, natural backpressure
- Hybrid scheduling → Balances load distribution vs. data locality
- Randomized top-K selection → Prevents hot spots and thundering herd
- Spillback over forwarding → Keeps raylets simple and stateless
Cluster State Management
- Gossip Protocol: Every ~1 second, raylets share resource availability and queue depth
- Eventually Consistent: The view can be slightly stale, but spillback self-corrects bad decisions
- Adaptive: The system learns from spillback patterns to improve future placement Why This Works
- Tasks are long (seconds to hours): Scheduling overhead is amortized
- Resources are coarse (whole GPUs): Simple resource model
- Locality matters but isn’t critical: Hybrid policy is good enough
- Failures are rare: Simple retry is sufficient
- Workloads are elastic: Autoscaler handles capacity
When This Breaks
- Microsecond tasks: Scheduling overhead dominates
- Fine-grained resources: Complex bin-packing is needed
- Strict placement: Hard constraints (anti-affinity, regulatory)
- Real-time deadlines: Best-effort scheduling is insufficient
- Adversarial workloads: Need isolation, quotas, and preemption
Execution
Task Execution Flow
Task Submission
Worker A sends PushTask gRPC directly to Worker B containing:
- Task/actor method code and arguments (ObjectRefs or inlined values)
- Resource mapping (which CPUs/GPUs to use)
- For actors: sequence number for ordering guarantees
Object Transfer (when needed)
- Small objects (<100 KB): Inlined directly in
PushTaskmessage - Large objects: Multi-step resolution process:
- Worker B queries object’s owner: “where is ObjectRef X?”
- Owner replies with node locations or external spill URL
- Worker B pulls directly via ObjectManager Push/Pull RPCs
Dependency Resolution
Before executing, Worker B blocks until every ObjectRef argument is locally available in shared memory (Plasma store). This ensures all required data is present before computation begins.
Execution & Reply
- Worker B executes the function
- Return values stored in local shared memory
PushTaskReplysent back to Worker A with:- New ObjectRefs (or inlined values for small results)
- Error status and execution metadata
Actors provide strong consistency:
- Strict in-order execution using sequence numbers
- Out-of-order tasks are buffered until dependencies arrive
- Bounded flow-control window per (client -> actor) pair prevents overwhelming actors
- Reliable delivery with acknowledgments and automatic retries for lost messages
Normal tasks are fire-and-forget:
- No ordering guarantees - tasks execute as soon as resources are available
- No flow control - backpressure handled by raylet scheduling decisions
- Best-effort delivery - failed tasks trigger lineage reconstruction
Object Storage
Ray’s object storage system manages distributed object sharing across multiple nodes through a combination of local shared memory stores and network transfer protocols.
Architecture Overview
Per-Node Components:
- Plasma Store: Shared memory object storage for zero-copy access within a node
- Object Manager: Handles inter-node object transfer and location tracking
- Reference Manager: Tracks object ownership and coordinates distributed garbage collection
Global Components:
- Object Directory: Distributed location service tracking where objects are stored
- Ownership Registry: Maps objects to their owner processes for GC coordination
Plasma Store: Local Object Storage
Memory Layout:
- Pre-allocated shared memory pool (typically
/dev/shmon Linux) - Single process manages allocations using
dlmalloc - Workers access via memory-mapped files and Unix socket IPC
Object Lifecycle:
- CREATE: Worker requests allocation, receives file descriptor
- WRITE: Worker writes serialized data directly to mapped memory
- SEAL: Object becomes immutable and visible to other workers
- ACCESS: Other workers map same memory for zero-copy reads
- EVICT: LRU eviction when memory pressure occurs
Inter-Node Object Transfer
Pull-Based Protocol: When a worker needs an object not stored locally:
- Location Query: Worker asks object’s owner “where is ObjectRef X?”
- Location Response: Owner returns list of nodes storing the object
- Direct Transfer: Worker sends
PullRequestto source node’s ObjectManager - Streaming Transfer: Source streams object chunks over gRPC
- Local Storage: Receiving node stores object in its Plasma store
Push Operations: Objects can also be proactively pushed:
- Spilling: When local memory is full, push objects to other nodes
- Prefetching: Raylet pushes objects to nodes likely to need them
- Replication: Critical objects copied to multiple nodes for fault tolerance
Object Location Management
Distributed Directory:
- Each object’s owner maintains authoritative location list
- Locations updated when objects are transferred, spilled, or evicted
- Workers cache location information to avoid repeated queries
Location Tracking Protocol:
Worker A creates object → Owner A knows "object at Node A"
Node B pulls object → Owner A updates "object at Node A, Node B"
Node A evicts object → Owner A updates "object at Node B"
Communication Protocols
ObjectManagerService (gRPC):
Push: Send object from one node to anotherPull: Request object from remote nodeGetLocationUpdate: Query current object locationsSpillObjectsRequest: Coordinate spilling to external storage
Message Flow:
Pull Request: {object_id, destination_node}
Pull Response: {chunks_stream, metadata, error_status}
Memory Management Across Nodes
Spilling Strategy: When local Plasma store is full:
- Local Eviction: Try LRU eviction of unreferenced objects
- Remote Push: Push objects to nodes with available memory
- External Spill: Write objects to external storage (S3, disk)
Memory Pressure Coordination:
- Nodes report memory usage to cluster
- Raylet considers memory availability when scheduling tasks
- Objects automatically pulled back from spill when needed
Performance Optimizations:
- Within a node: Direct memory mapping with no serialization overhead
- Objects are collocated with tasks that will use them
This distributed object storage design balances local efficiency (zero-copy shared memory) with global accessibility (transparent network transfer) while maintaining fault tolerance through lineage-based reconstruction.
Key Takeaways
- Distributed scheduling: Bottom-up task submission with spillback enables low-latency scheduling at scale
- Shared memory object store: Zero-copy data sharing within nodes dramatically reduces data movement overhead
- Distributed Garbage Collector: Owner-based reference counting with asynchronous cleanup prevents memory leaks while avoiding centralized bottlenecks
- Lineage-based fault tolerance: Lightweight recovery mechanism without checkpointing, optimized for compute-intensive tasks
When to Choose Ray Core: Ray Core is best suited for Python-heavy compute workloads in ML, reinforcement learning, and simulation domains that benefit from fine-grained parallelism and stateful actors.