Scaling Applications on Kubernetes with Ray
I’ve been a heavy user of Ray for Deep Learning applications for the last two years. And in my opinion, Ray Tune is the best library available today with which to instrument your ML code for easy scalability. It guarantees that your application will function on your laptop just as well as it does on a cluster of 200 machines on the cloud — with no code changes needed.
But as good as my experience was using Ray for ML, I didn’t really consider using it to offload job processing for a web application when the need arose. There was no need because there’s no dearth of dedicated software available to process jobs: Celery, RQ (Redis Queue), Kubeflow Pipelines (KFP), Airflow, Volcano, and more.
However, what I actually found out was that while solutions like Celery and RQ allowed for low-latency job processing, they needed a dedicated team to build and maintain a layer of infrastructure on top that would allow for more complex functionality — worker-aware scheduling, GPU support, etc. On the other end of the spectrum, we have KFP and Volcano, which come packaged with the power of Kubernetes and all that entails — job isolation, dynamic selection of worker types, autoscaling to support tens of thousands of jobs with minimal effort, etc. But with the unpredictable overhead of Kubernetes scheduling, we pretty much need to forget about low latency processing for our jobs, which is not an option when your goal is to enable near real-time use-cases.
Why Ray on Kubernetes?
Ray on Kubernetes (K8s) hit the sweet spot for what I needed, and I suspect that a lot of you might find yourself in the same boat. It gives you the best of both worlds — low latency job processing with Ray Core, along with all the features of Kubernetes and more delivered through the Ray Autoscaler.
This article will delve into the details of how Ray works on K8s and how it can be set up and managed for production-level environments. We’ll then briefly explore a sample configuration, how to remain connected to a Ray Cluster for long periods of time from a Python application, and then end with a few notes on some additional features. This article assumes that the reader has some knowledge of K8s and Ray.
Let’s start off with setting our baseline as the Ray Autoscaler as implemented for AWS EC2. You have a ‘launchpad’, the machine from where your Ray clusters are launched, and you have your Ray clusters which comprise one head node and zero or more worker nodes. The Autoscaler component in this case runs on the head node of your Ray cluster. This means that the only task of the launchpad is to read the cluster configuration you provide, launch the head node, and transfer the SSH private key and the cluster configuration to the launched node, after which its task is complete. The Autoscaler process that is now running on the head node uses the cluster configuration to launch the worker nodes as required and connects to them through SSH using the aforementioned SSH key.
The head node has to be up at all times in this setup, since its loss eliminates the Autoscaler process, thus orphaning the worker nodes.
Ray, as implemented on Kubernetes, is quite different. Let’s look at the architecture first this time.
The cluster configuration file goes through some changes in this setup, and is now a K8s compatible YAML file which defines a Custom Resource of type
RayCluster. Once it’s used to create a RayCluster resource in the K8s cluster, the autoscaler process running within the Ray operator pod detects the new resource and uses the Ray cluster definition within to create a head node pod and the worker node pods as specified. The operator will also create other necessary components like a Kubernetes Service to allow for easy connectivity to the Ray cluster from anywhere within the K8s cluster.
This setup has some interesting properties:
- Since the Autoscaler is now decoupled from the head node of the Ray cluster, the loss of the head no longer means the complete loss of the cluster. In fact, as long as the Ray operator is running, you can lose your head node without issue, since the operator will simply launch it again — this works best, of course, if fault tolerance is baked into your application code.
- The Ray Autoscaler completely takes charge of the management of your Ray cluster, your only input to it being a single YAML file that describes the desired cluster configuration.
- You can have multiple worker types to handle different kinds of workloads.
- The head node and each worker type can be thoroughly customized. And since the Ray Cluster’s head and worker nodes are run as pods on Kubernetes, any specifications that are applicable to a pod can be configured here. This opens up the world of NodeSelectors, tolerations, affinities, and more. This lets us schedule worker pods on specific K8s node groups and helps fulfill a variety of needs — task-specific worker isolation, accelerated (GPU) machines for priority workloads, and so on.
- Adding/removing a new worker type is a non-destructive operation. This is an incredible feature since it means that you can add/remove new worker types to a Ray cluster operating in production and the Autoscaler will just quietly deal with it without interfering with the pods of other Ray worker types that are processing critical jobs.
A Helm chart is available in the Ray Github repository, though I don’t use it since Helm is a bit weird to use in conjunction with ArgoCD. But from initial usage, the Helm charts worked perfectly well for me and will make it easy for you to manage multiple Ray clusters. (On a side note, if you’re not using ArgoCD yet to manage your K8s applications, you should try it out; I think it fundamentally betters the way people work with K8s.)
The alternative to Helm is to explicitly define the RayCluster YAML yourself, starting from the official example. If you deploy your applications on K8s, you likely have a set of K8s manifests grouped together in a Git repository that defines your application’s resources. Adding your RayCluster as an additional YAML file to this set lets you treat it as just another dependency — allowing you to effortlessly replicate it along with your application across clusters and clouds. And thanks to the naming consistency of the Service resource that gets created along with your Ray cluster, you’re able to add its address as part of your application code, with the assurance that regardless of where the application is run — in production or as part of a CI/CD pipeline for testing, the Ray cluster will always be available at a particular address in the same namespace as your application.
If you take another look at the diagram for Ray on K8s above, you’ll see that a single Ray operator is responsible for managing multiple Ray Clusters. This is true across namespaces; so if you deploy a cluster-scoped Ray operator in one namespace and create RayCluster resources in 10 namespaces, the single operator pod will be responsible for creating and managing all 10 Ray Clusters in 10 different namespaces. This is not an approach that scales well, and not to mention, it creates a single point of failure — if the operator fails, autoscaling on all 10 Ray clusters fails.
Thankfully, there’s the concept of a namespace-scoped operator, wherein a Ray operator is launched for each namespace where you expect to have RayCluster resources, and the operator will take the responsibility of creating and managing Ray clusters only for that namespace. This pattern will let you eliminate the chances of a K8s cluster-wide Ray failure and more evenly distribute the load of autoscaling dozens of Ray clusters across multiple operators.
You can find the K8s manifests here and the guides on deploying and managing them can be found here and here. I suggest you spend some time looking through the Ray documentation, they’re well written, comprehensive enough, and will save you a lot of time spent in trial and error experimentation.
Here’s an example of a RayCluster definition. I’ve discussed some points of note below it.
The Docker images used by the pods of the RayCluster need to have Ray installed but beyond that, I haven’t had any issues using custom, application-specific Docker images. There are also Docker images built by the Ray team for each version available here.
When a Ray pod gets started, Kubernetes runs the commands defined by the
args section under the
containers section of each definition under
podTypes. These commands are meant to keep the pod up indefinitely unless interrupted. The process of connecting these new pods to the Ray Cluster happens using the
workerStartRayCommands. These commands will be run by the Ray operator using kubectl once the pod gets launched. As an example of what’s possible here, I’ve added a
pip install ray==1.8.0 in the start commands, which means that you can use any Docker image, and as long as it has Python, you can simply install Ray during runtime and then use it for your cluster’s pods. You shouldn’t do that, of course, since it’s better to bake everything into your Docker image to keep startup time as low as possible. But… this does make it easier to validate your setup when you’re starting out, or to test new Ray versions without putting any effort into building a new Docker image.
I’ve also prefixed environment variable definitions to the
ray start commands for both the head and workers. This will allow these variables to be available to all functions that will eventually end up being executed on these workers. Of course, if you’re launching your Ray cluster without Helm, you have complete control over the podConfig, meaning that you can have an env section and mounted configMaps or secrets where you can define the variables and files you need on each podType in your Ray cluster.
Pod Resource Requests/Limits and Ray Resources
So this is quite interesting because we now have two resource-based schedulers involved — Kubernetes and Ray. The K8s scheduler picks up the memory and CPU configuration you specify in each pod spec’s
limits sections. And when the pod is launched with those resources, Ray will detect the resources assigned to the pod from its spec and will auto-fill the resource mapping for that pod. In the above example, you’ll see that the worker pods will have 15 GiB and 4 CPUs assigned, which means that Ray will consider those values when it makes task scheduling decisions.
You can override this using the
rayResources section. In the example, this has been set to 0 CPUs for the head pod and 8 CPUs for the worker pods, even though both have 4 CPUs specified in the requests/limits section. For the head pod, this is because it’s better to reserve it completely for Ray processes and prevent your application tasks from being scheduled on it and competing with Ray for resources. For the worker pods, the logic of how scheduling works is completely up to you. If you have a task that uses multiple CPU cores and is resource-intensive, you can set the CPU value in
rayResources to 1 so that only one task can ever be scheduled concurrently on a pod. Or if you have a task that’s mostly just I/O, you can increase the CPU value to whatever you want to enable the level of task concurrency you desire on each worker pod.
And as the Ray documentation clearly specifies multiple times, it’s the responsibility of the user to ensure that their task consumes no more than the resources it requests. Be wary of this especially with CPU, because you always have the OOMKilled action that lets you know when you’ve messed up with memory assignment. But there’s nothing worse than finding out your latencies are slow because Linux has been CPU throttling your app because your CPU limits are too low.
Any scale-up of the Ray cluster happens in iterations, and each iteration has a maximum value of the number of new nodes that can be launched. Any nodes needed beyond this value will need to be launched in the following iterations. This is dynamically computed and will never fall below 5. When the value of
upscalingSpeed * running_nodes becomes greater than 5, that is used instead. Each worker type is considered and scaled independently, so
running_nodes will be a count of pods in each worker type.
Number of Workers and idleTimeoutMinutes
You can have a global value for
maxWorkers, which is a hard upper limit for the number of pods that can exist in a Ray cluster across podTypes. However, each type of worker can have its own specification of
maxWorkers. Managing these values carefully will let you perform a balancing act between costs and immediate worker availability.
idleTimeoutMinutes is the natural complement to this, and it determines how long your pods will wait for tasks in idle mode. If this value is too low, you’re going to face high worker pod churn and if it’s too high, you’ll face a Ray cluster that’s idle more often than not. The best way to set this value will be trial and error for your particular usecase, but I do have a rough formula that usually works out for me. Let’s say your pod takes 3 minutes to launch (Kubernetes node launch + Docker image pull + Ray initialization), setting the idle timeout to 5x that value will give you a 5 x 3 minute window where tasks can be waited on. Lowering the 5 will give you a cheaper cluster and increasing it will give you a more responsive cluster. This is of course assuming a random pattern of tasks. If you’re able to judge the frequency of your tasks, use that to the fullest extent — manually pre-warm your clusters by increasing minWorkers, wind it down over the weekends, etc. I’ve been making full use of the ability to update the RayCluster definition on-the-fly. You can add entirely new worker types and the operator deals with it without missing a step, which is just super useful to ensure that you have precise control over your cluster.
Node Selectors and Tolerations
Node selectors and tolerations are ways that Kubernetes lets you select the nodes on which you want your pods to run. In the sample configuration above, you’ll notice for the head pod that we’ve got a
ray-type-ondemand nodeSelector and a similar config for tolerations. This ensures that the head pod is always scheduled on an on-demand instance, which is important because any interruption to the head causes the loss of the Ray cluster — at least temporarily until the operator can recreate it. This can be worked around in the application code, but the downtime of the head pod will be inconvenient to deal with, and so this solution will let you have a head that will never be killed off due to a Spot instance termination.
The worker node has a lot more flexibility in this regard since the operator will simply launch a new pod to cover the loss of a worker due to a Spot termination. But we can still make use of this feature to group tasks of different types so they never compete with each other for resources, assign certain tasks to instances with GPUs on them, etc.
From my previous experience with Ray Tune, connectivity was never an issue — we launch a Ray cluster on EC2, run an ML training job that detects and connects to the Ray cluster (ray init, address=”auto”), and the job automatically tears down the cluster when it’s done with the training. But when you have an application that runs indefinitely, it’s crucial to ensure that the connectivity to the Ray cluster is never lost. Or if it is lost, that your application knows how to recover it.
Now, the connection to the cluster happens in a standard way —
ray.init(). We specify the address of the cluster, which when deploying it through K8s is exposed through a Service resource. So we have:
Let’s say this is added to a Python script that loops indefinitely awaiting tasks. When the Ray cluster goes down, the Client gets disconnected from it, so even when it comes back up, things are not quite right, and you might face odd errors that tell you that the client is already present but that your connection is lost. So the easy way around this is to ensure that you disconnect clearly in case there are any issues and restart your connection.
If you’re writing a script, ensure that the place where the remote function is located isn’t the same place where your Ray initialization takes place in the code that launches it. I had a not-so-great time puzzling over why my head node kept getting blown up by extreme memory usage before I figured out that a poorly structured import caused the Ray initialization to also happen when the remote function was deserialized in the worker. I don’t think I’ve quite wrapped my head around why it causes a memory leak since I can’t think of a runaway circular process that would lead to that, but at least now I know not to do it.
One of the many advantages of Kubernetes is the ease of achieving internet ingress. To expose your dashboard outside the cluster, you only need to direct an Ingress to port 8265 of the Ray cluster’s Service resource. Here’s a sample:
Can I just take a moment to appreciate how nice it is to have zero-configuration support for path-based routing? A lot of highly mature applications don’t allow you to access their UIs from subpaths, so for a young project like Ray to have considered that is a pretty nice thing to experience.
I always expect distributed systems to break, and they never disappoint. There are just too many components involved and it’s inevitable that something somewhere will fail or that a user will find an innovative way to utterly crash your “battle-tested and resilient” architecture. I think handling Ray on Kubernetes in a measured way will let you mitigate the risk of implementing a distributed system, especially when you make full use of its features along with those of K8s. But considering that the risk is ever-present, I would take care to build fail-overs and fault tolerance at every stage and never depend on a Ray cluster living forever. Since failovers require diversity, I’d suggest using Kubeflow Pipelines, which processes jobs at a high latency but scales really well. You can integrate it into your application with just a little bit of additional Python code.
All that said, though, I’m really looking forward to finding out how well Ray on Kubernetes holds up to consistently heavy workloads on a longer timescale. Maybe I’ll have an update in a few months?