Random Notes
  • Introduction
  • Reading list
  • Theory
    • Index
      • Impossibility of Distributed Consensus with One Faulty Process
      • Time, Clocks, and the Ordering of Events in a Distributed System
      • Using Reasoning About Knowledge to analyze Distributed Systems
      • CAP Twelve Years Later: How the “Rules” Have Changed
      • A Note on Distributed Computing
  • Operating System
    • Index
  • Storage
    • Index
      • Tachyon: Reliable, Memory Speed Storage for Cluster Computing Frameworks
      • Exploiting Commutativity For Practical Fast Replication
      • Don’t Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS
      • Building Consistent Transactions with Inconsistent Replication
      • Managing Update Conflicts in Bayou, a Weakly Connected Replicated Storage System
      • Spanner: Google's Globally-Distributed Database
      • Bigtable: A Distributed Storage System for Structured Data
      • The Google File System
      • Dynamo: Amazon’s Highly Available Key-value Store
      • Chord: A Scalable Peer-to-peer Lookup Service for Internet Applications
      • Replicated Data Consistency Explained Through Baseball
      • Session Guarantees for Weakly Consistent Replicated Data
      • Flat Datacenter Storage
      • Small Cache, Big Effect: Provable Load Balancing forRandomly Partitioned Cluster Services
      • DistCache: provable load balancing for large-scale storage systems with distributed caching
      • Short Summaries
  • Coordination
    • Index
      • Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases
      • Paxos made simple
      • ZooKeeper: Wait-free coordination for Internet-scale systems
      • Just Say NO to Paxos Overhead: Replacing Consensus with Network Ordering
      • Keeping CALM: When Distributed Consistency is Easy
      • In Search of an Understandable Consensus Algorithm
      • A comprehensive study of Convergent and Commutative Replicated Data Types
  • Fault Tolerance
    • Index
      • The Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services
      • Gray Failure: The Achilles’ Heel of Cloud-Scale Systems
      • Capturing and Enhancing In Situ System Observability for Failure Detection
      • Check before You Change: Preventing Correlated Failures in Service Updates
      • Efficient Scalable Thread-Safety-Violation Detection
      • REPT: Reverse Debugging of Failures in Deployed Software
      • Redundancy Does Not Imply Fault Tolerance
      • Fixed It For You:Protocol Repair Using Lineage Graphs
      • The Good, the Bad, and the Differences: Better Network Diagnostics with Differential Provenance
      • Lineage-driven Fault Injection
      • Short Summaries
  • Cloud Computing
    • Index
      • Improving MapReduce Performance in Heterogeneous Environments
      • CLARINET: WAN-Aware Optimization for Analytics Queries
      • MapReduce: Simplified Data Processing on Large Clusters
      • Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks
      • Resource Management
      • Apache Hadoop YARN: Yet Another Resource Negotiator
      • Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center
      • Dominant Resource Fairness: Fair Allocation of Multiple Resource Types
      • Large-scale cluster management at Google with Borg
      • MapReduce Online
      • Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling
      • Reining in the Outliers in Map-Reduce Clusters using Mantri
      • Effective Straggler Mitigation: Attack of the Clones
      • Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
      • Discretized Streams: Fault-Tolerant Streaming Computation at Scale
      • Sparrow: Distributed, Low Latency Scheduling
      • Making Sense of Performance in Data Analytics Framework
      • Monotasks: Architecting for Performance Clarity in Data Analytics Frameworks
      • Drizzle: Fast and Adaptable Stream Processing at Scale
      • Naiad: A Timely Dataflow System
      • The Dataflow Model:A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale
      • Interruptible Tasks:Treating Memory Pressure AsInterrupts for Highly Scalable Data-Parallel Program
      • PACMan: Coordinated Memory Caching for Parallel Jobs
      • Multi-Resource Packing for Cluster Schedulers
      • Other interesting papers
  • Systems for ML
    • Index
      • A Berkeley View of Systems Challenges for AI
      • Tiresias: A GPU Cluster Managerfor Distributed Deep Learning
      • Gandiva: Introspective Cluster Scheduling for Deep Learning
      • Workshop papers
      • Hidden Technical Debt in Machine Learning Systems
      • Inference Systems
      • Parameter Servers and AllReduce
      • Federated Learning at Scale - Part I
      • Federated Learning at Scale - Part II
      • Learning From Non-IID data
      • Ray: A Distributed Framework for Emerging AI Applications
      • PipeDream: Generalized Pipeline Parallelism for DNN Training
      • DeepXplore: Automated Whitebox Testingof Deep Learning Systems
      • Distributed Machine Learning Misc.
  • ML for Systems
    • Index
      • Short Summaries
  • Machine Learning
    • Index
      • Deep Learning with Differential Privacy
      • Accelerating Deep Learning via Importance Sampling
      • A Few Useful Things to Know About Machine Learning
  • Video Analytics
    • Index
      • Scaling Video Analytics on Constrained Edge Nodes
      • Focus: Querying Large Video Datasets with Low Latency and Low Cost
      • NoScope: Optimizing Neural Network Queriesover Video at Scale
      • Live Video Analytics at Scale with Approximation and Delay-Tolerance
      • Chameleon: Scalable Adaptation of Video Analytics
      • End-to-end Learning of Action Detection from Frame Glimpses in Videos
      • Short Summaries
  • Networking
    • Index
      • Salsify: Low-Latency Network Video through Tighter Integration between a Video Codec and a Transport
      • Learning in situ: a randomized experiment in video streaming
      • Short Summaries
  • Serverless
    • Index
      • Serverless Computing: One Step Forward, Two Steps Back
      • Encoding, Fast and Slow: Low-Latency Video Processing Using Thousands of Tiny Threads
      • SAND: Towards High-Performance Serverless Computing
      • Pocket: Elastic Ephemeral Storage for Serverless Analytics
      • Fault-tolerant and Transactional Stateful Serverless Workflows
  • Resource Disaggregation
    • Index
  • Edge Computing
    • Index
  • Security/Privacy
    • Index
      • Differential Privacy
      • Honeycrisp: Large-Scale Differentially Private Aggregation Without a Trusted Core
      • Short Summaries
  • Misc.
    • Index
      • Rate Limiting
      • Load Balancing
      • Consistency Models in Distributed System
      • Managing Complexity
      • System Design
      • Deep Dive into the Spark Scheduler
      • The Actor Model
      • Python Global Interpreter Lock
      • About Research and PhD
Powered by GitBook
On this page
  • TL;DR:
  • Motivation:
  • Programming and Computation Model
  • Architecture
  • Fault tolerance - lineage reconstruction
  • Evaluation:

Was this helpful?

  1. Systems for ML
  2. Index

Ray: A Distributed Framework for Emerging AI Applications

https://www.usenix.org/system/files/osdi18-moritz.pdf

PreviousLearning From Non-IID dataNextPipeDream: Generalized Pipeline Parallelism for DNN Training

Last updated 5 years ago

Was this helpful?

TL;DR:

This paper presents Ray, a general-purpose cluster-computing framework that enables simulation, training, and serving for RL applications.

[] []

Motivation:

The central goal of an Reinforcement learning application is to learn a policy - a mapping from the state of the environment to a choice of action - that yields effective performance over time. Finding effective policies in large-scale applications requires 1) simulation to evaluate policies. 2) distributed training to improve the policy. and 3) serve the policy in interactive control scenarios.

To enable simulation, training, and serving for the emerging RL applications, a system must support:

  • Fine-grained computations: rendering actions in milliseconds when interacting with the real world

  • Heterogeneity: The duration of a computation can range from millisecond to hours. In addition, training often requires heterogeneous hardware(CPU, GPU, or TPU)

  • Flexible computation model: RL application require both stateless and stateful computations.

  • Dynamic execution: The order in which computation finish is not always known in advance and the result of a computation can determine the future computations.

Note that RL is just one of the applications. There are many other usage of the system

Programming and Computation Model

Tasks

A task represents the execution of a remote function on a stateless worker. A future representing the result of the task is returned immediately. This allows the user to express parallelism while capturing data dependencies.

Remote functions operate on immutable objects and are expected to be stateless and side-effect free, which implies idempotence and simplifies fault tolerance through function re-execution on failure.

Actors

An actor represents a stateful computation. A method execution is similar to a task, in that it executes remotely and returns a future, but differs in that it executes on a stateful worker. Actors provide much more efficient fine-grained updates, as these updates are performed on internal rather than external state, which typically requires serialization and deserialization.

Computation Model

Ray employes a dynamic task graph computation model. There are two types of nodes in a computation graph: data objects and remote function invocations. And, there are two types of edges: data edge and control edges.

To capture the state dependency across subsequent method invocations on the same actor, we add a third type of edge: a stateful edge. They capture the implicit data dependency between successive method invocations sharing the internal state of an actor. Stateful edges also enable us to maintain lineage. We can easily reconstruct lost data.

Architecture

Ray's architecture comprises 1) an application layer implementing the API, and 2) a system layer providing high scalability and fault tolerance.

Application Layer:

  • Driver: A process executing the user program

  • Worker: A stateless process that executes tasks (remote functions) invoked by a driver or another worker.

  • Actor: A stateful process that executes, when invoked, only the methods it exposes.

System Layer:

Global Control Store(GCS):

Maintaining low latency requires minimizing overheads in task scheduling, which involves choosing where to execute, and subsequently task dispatch, which involves retrieving remote inputs from other nodes. Therefore, they store the object metadata (object locations and sizes) in the GCS rather than in the scheduler, fully decoupling task dispatch from task scheduling.

Bottom-Up Distributed Scheduler:

Ray needs to dynamically schedule millions of tasks per second, tasks which may take as little as a few milliseconds.[1] To satisfy the above requirements, they design a two-level hierarchical scheduler consisting of a global scheduler and per-node local schedulers.

A local scheduler first tries to schedules tasks locally unless the node is overloaded (i.e., its local task queue exceeds a predefined threshold), or it cannot satisfy a task’s requirements (e.g., lacks a GPU). If a local scheduler decides not to schedule a task locally, it forwards it to the global scheduler. The global scheduler considers each node’s load and task’s constraints to make scheduling decisions.

The global scheduler gets the queue size at each node and the node resource availability via heartbeats, and the location of the task’s inputs and their sizes from GCS. The global scheduler selects the node which provides the lowest estimated waiting time. ( The sum of queuing delay and data transfer time.)

In-memory Distributed Object store:

On each node, we implement the object store via shared memory. This allows zero-copy data sharing between tasks running on the same node. If a task’s inputs are not local, the inputs are replicated to the local object store before execution. Also, a task writes its outputs to the local object store.

For low latency, we keep objects entirely in memory and evict them as needed to disk using an LRU policy.

One of the nice things of this decentralized design where you have seperate schedulers and metadata storage is that you can scale these things independently.

[1] Most cluster computing frameworks, such as Spark, CIEL, and Dryad implement a centralized scheduler, which can provide locality but at latencies in the tens of ms. Distributed schedulers such as work stealing, Sparrow and Canary can achieve high scale, but they either don’t consider data locality(work stealing) or assume tasks belong to independent jobs(Sparrow), or assume the computation graph is known(Canary).

Fault tolerance - lineage reconstruction

Maintaining the lineage of an execution gives us fast recover time. We only need to replay the work lost to continue executing. However, recording the lineage adds significant overhead to normal execution. We need extra work to record lineage of each task and must commit a task's lineage before task start executing. Thus, we have an interesting tradeoff(as identified in Drizzle paper) between fast execution without failures and fast recovery during failures.

Evaluation:

The paper provides some evaluation of Ray on training, serving and simulation. The key takeaway is that the performance of Ray matches the existing frameworks on traditional ML workloads and it either outperforms or support some missing features of other frameworks on RL workloads.

Specifically, it matches the training performance of the state-of-the-art frameworks:

For serving:

However, it worth noting that this might not be a fair comparison, because Ray focuses primarily on the embedded serving of models to simulators running within the same dynamic task graph (e.g., within an RL application on Ray). In contrast, systems like Clipper focus on serving predictions to external clients

For simulation:

Ray models an application as a graph of dependent tasks that evolves during execution. On top of this model, Ray provides both an actor and a task-parallel programming abstraction, which differentiate Ray from preview works including , which only support task-parallellabstraction, and /, which only support actor abstraction.

GCS is a key-value store with pub-sub functionality, use sharding to achieve scale, and per-shard . It maintains the entire control state of the system.

NOTE: The global scheduler was removed after , which is replaced by direct node-manager to node-manager communication.

To get both fast task execution and fast failure recovery, Ray implements . We have lineage stash at each node which looks like in-memory subgraph. Thus, instead of writing the lineage to the Global storage, the local scheduler will write the lineage to its local stash and then forward the task as well as the uncommitted lineage to remote node. Ray will asynchronously commit the uncommitted lineage to avoid huge lineages.

CIEL
Orlean
Akka
chain replication
v0.5
Lineage Stash
Github
Documentation
Images per second reached when distributing the training of a ResNet-101 TensorFlow model
Throughput comparisons for Clipper and Ray for two embedded serving workloads
Ray allows for better utilization when running heterogeneous simulations at scale.