Оригинальная статья: Ray Compiled Graphs: Optimized AI Workloads with Native GPU Communication
Перевод: January 2025
Введение
По мере того как AI модели продолжают расти в сложности и размере, рабочие нагрузки и приложения вокруг этих моделей создают новые требования к базовой программной инфраструктуре и примитивам. В отличие от традиционных CPU-ориентированных рабочих нагрузок, нагрузки для больших AI моделей, такие как training и inference, являются преимущественно GPU-интенсивными и часто требуют распределенных вычислений и координации между десятками или сотнями ускорителей.
Compiled Graphs обеспечивают минимальные накладные расходы на отправку задач (~50 мкс) по сравнению со стандартными накладными расходами Ray на отправку задач (1-2 мс). Хотя эти стандартные накладные расходы незначительны для длительных рабочих нагрузок, ориентированных на пропускную способность, таких как обработка данных или batch processing, они становятся неприемлемыми для подсекундных рабочих нагрузок, таких как auto-regressive генерация токенов.
Compiled Graphs также поддерживает нативную передачу данных между GPU, автоматически разрешая deadlock и совмещая коммуникацию с вычислениями. Без Compiled Graphs пользователи должны использовать внешние коммуникационные примитивы, такие как NCCL, чтобы обеспечить передачу данных между GPU с низкой задержкой.
Эти улучшения открывают захватывающие новые возможности для программ Ray:
Ray Compiled Graphs может передавать большие тензоры между actors с задержкой на уровне миллисекунд
Используя Compiled Graphs, вы можете выразить сложные алгоритмы pipeline scheduling менее чем в 70 строках кода, достигая при этом производительности на уровне передовых библиотек, таких как Deepspeed, для модели Llama 7B на 4 GPU A100
Multi-modal training workload на основе Compiled Graphs с heterogeneous parallelism и GPU достигает улучшения эффективности token-per-dollar на 43% по сравнению со стандартной FSDP-реализацией PyTorch.
Введение Compiled Graphs в Ray
Compiled Graphs - это новая функция в Ray, которая предлагает классический Ray Core-подобный API с тремя основными преимуществами:
Сниженные системные накладные расходы для повторяющихся графов задач
Нативная поддержка GPU-GPU коммуникации через NVIDIA NCCL
Оптимизированное планирование для предотвращения deadlock и наилучшего использования вычислительных и коммуникационных ресурсов
В отличие от стандартного Ray API, Compiled Graphs выражаются как статический граф вычислений.
Используя статическую природу программы, Ray может заранее выделять ресурсы для задачи и повторно использовать их для будущих вызовов. Кроме того, Ray может правильно выделять память заранее для использования оптимизированных коммуникационных примитивов, таких как NCCL, которые требуют симметричности отправителей и получателей. Как показано ниже, Ray Compiled Graphs может улучшить задержки для простых передач GPU-тензоров до 140 раз и передачи данных CPU до 17 раз.
Давайте рассмотрим упрощенную версию общего шаблона в рабочих нагрузках машинного обучения: scatter-gather.
Для начала установим Compiled Graphs:
pip install -U "ray[adag]"
Scatter-gather - это распространенный шаблон распределенных вычислений в рабочих нагрузках машинного обучения. Он отправляет одинаковые входные данные нескольким workers (Ray Actors) и собирает результаты от всех workers. Например, для tensor parallel inference программа отправляет одни и те же CPU данные всем actors, затем actors перемещают CPU данные на GPU и запускают torch модель, которая загружает только sharded weights. Давайте реализуем эту программу с помощью Compiled Graphs.
Начнем с создания обычных Ray actors, в данном случае трех:
import ray
from time import perf_counter
@ray.remote
class TensorProcessor:
def fwd(self, tensor: str) -> str:
# some_gpu_ops(tensor.to(torch.device()))
return tensor
N = 3
actors = [TensorProcessor.remote() for _ in range(N)]
Сначала посмотрим, как выразить это с помощью обычных Ray программ.
# warmup actors
for _ in range(10):
ray.get([actor.fwd.remote("hello") for actor in actors])
s = perf_counter()
result = ray.get([actor.fwd.remote("hello") for actor in actors])
print("Обычный ray runtime занял", (perf_counter() - s) * 1000, "мс")
print(result)
Затем мы определяем и компилируем compiled graph, который передает один и тот же input placeholder всем actors. Здесь мы используем Ray’s DAG API, который создает промежуточное представление, фиксирующее граф вычислений в статическом виде. Мы используем синтаксис MultiOutputNode для обертывания выходных данных, что необходимо, когда у нас есть более одного выходного узла.
import ray.dag
# Определяем DAG для ленивого выполнения.
with ray.dag.InputNode() as inp:
# Привязываем каждую задачу actor к одному и тому же input placeholder.
outputs = [actor.fwd.bind(inp) for actor in actors]
dag = ray.dag.MultiOutputNode(outputs)
Этот код создает Ray DAG, который выглядит так:
Теперь, чтобы использовать Compiled Graphs, мы используем команду experimental_compile
. Ray будет предварительно выделять все необходимые ресурсы для выполнения графа, что приводит к значительному ускорению выполнения графа по сравнению с динамическим стандартным runtime:
compiled_graph = dag.experimental_compile()
# разогрев actors
for _ in range(10):
ray.get(compiled_graph.execute("hello"))
# Execute the DAG with different arguments:
s = perf_counter()
result = ray.get(compiled_graph.execute("hello"))
print("Compiled Graphs took", (perf_counter() - s) * 1000, "ms")
print(result)
# ["hello", "hello", "hello"]
Это все! Вы можете использовать эту же программу для масштабирования ваших workloads. API также работают при наличии нескольких nodes.
Под капотом ресурсы предварительно выделяются для снижения накладных расходов во время выполнения:
Новый backend Compiled Graphs статически выделяет входные и выходные буферы для каждой actor task при компиляции, вместо их динамического выделения при каждом выполнении DAG. Эти буферы повторно используются во время выполнения, и actors всегда отправляют результаты напрямую процессу, которому они нужны.
Все actors на одном Ray node используют один и тот же физический входной буфер, который синхронизируется backend’ом Ray Compiled Graphs. Это помогает снизить накладные расходы на сериализацию аргументов task, выделение памяти для аргументов и вызов task.
Backend также заранее выделяет цикл выполнения actor. Вместо ожидания RPC для выполнения следующей task, каждый actor ждет в цикле аргументы (передаваемые через выделенные буферы) для следующей задачи
echo
.
Теперь, что если мы хотим организовать конвейерное выполнение между различными actor tasks? Одним из примеров является pipeline-parallel inference, где мы передаем промежуточные выходные данные от одного actor к следующему через shared memory, и передача данных должна быть организована конвейером с compute tasks. Мы можем организовать конвейерное выполнение между различными actors, выполняя один и тот же DAG несколько раз перед получением результата:
# Teardown the previous dag.
compiled_graph.teardown()
with ray.dag.InputNode() as inp:
for actor in actors:
# Pass each actor task output as input to the next actor task.
inp = actor.fwd.bind(inp)
dag = inp
Это создает DAG, который выглядит так:
Который мы можем скомпилировать и выполнить:
compiled_graph = dag.experimental_compile()
# Call dag.execute() several times. The executions will be pipelined across the different actors.
refs = [compiled_graph.execute(f"hello{i}") for i in range(N)]
# Get the results, flushing the pipeline.
for ref in refs:
print(ray.get(ref))
# "hello0"
# "hello1"
# "hello2"
Чтобы продемонстрировать GPU-GPU коммуникацию, мы можем создать actor, который отправляет тензор другому actor. Чтобы запустить этот пример, убедитесь, что у вас в кластере есть как минимум 2 GPU.
import ray
import ray.dag
import torch
assert ray.cluster_resources().get("GPU") >= 2, ("Недостаточное количество GPU в кластере.")
@ray.remote(num_gpus=1)
class GPUSender:
def send(self, shape):
return torch.zeros(shape, device="cuda")
@ray.remote(num_gpus=1)
class GPUReceiver:
def recv(self, tensor: torch.Tensor):
assert tensor.device.type == "cuda"
return tensor.shape
sender = GPUSender.remote()
receiver = GPUReceiver.remote()
Далее мы определяем и компилируем compiled graph, который передает CUDA тензор от одного actor к другому. С помощью TorchTensorType
, Ray будет использовать NCCL под капотом для переноса тензоров через GPU RDMA между 2 GPU.
from ray.experimental.channel.torch_tensor_type import TorchTensorType
with ray.dag.InputNode() as inp:
dag = sender.send.bind(inp)
# You have to specify transport="nccl", otherwise, it uses shared memory to transfer GPU tensors.
dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
dag = receiver.recv.bind(dag)
compiled_graph = dag.experimental_compile()
# Execute the DAG. Ray aDAG will orchestrate any NCCL ops.
assert ray.get(compiled_graph.execute((10, ))) == (10, )
Можете посмотреть developer guide для более подробной информации.
Сравнение
Мы сравнили Ray Core (“Ray Standard Runtime”) с Ray Compiled Graphs на различных паттернах.
Round Trip: В этом паттерне мы создаем actor для повторяющейся отправки данных actor-получателю.
Scatter-Gather: В этом паттерне драйвер отправляет данные всем actors и собирает результаты от каждого actor.
Chain of tasks: В этом паттерне мы создаем pipeline из actors, которые получают данные от предыдущего actor и отправляют их следующему actor.
Сначала мы запустили benchmark с 1 байтом CPU данных на одном и нескольких узлах для указанных выше паттернов вычислений, чтобы измерить системные накладные расходы. Следующий benchmark выполнялся на инстансе m5.16xlarge с 64 CPU.
На графике ниже мы видим, что round trip коммуникация на одном узле может быть быстрее до 17 раз, а на нескольких узлах - до 2.7 раз.
Для более сложных workloads, таких как scatter-gather или chain of tasks, мы видим, что Ray Compiled Graphs может улучшить latency до 20 раз.
Мы также сравнили Ray Core с Ray Compiled Graph для передачи данных между GPU. Мы запустили простой round trip benchmark с CUDA тензором размером 40MB на машине с NVLink (A100) и без NVLink (A10G), где мы передавали тензор 10 раз и измеряли end-to-end latency.
По умолчанию Ray Core не имеет встроенной поддержки zero copy сериализации для torch тензоров и не использует NCCL для коммуникации.
С другой стороны, Ray Compiled Graph использует NCCL под капотом для оптимизации передачи данных между GPU. На машине с 2 устройствами A10G без NVLink, Ray Compiled Graphs может передавать тензоры с latency в 19 раз лучше, чем Ray Core. На A100 с NVLink latency может быть уменьшена почти в 140 раз.
Эти benchmark’и показывают, что compiled graphs являются мощными примитивами, которые открывают возможности для нового класса развивающихся AI приложений.
Примеры использования
Ускоренный model-parallel inference
Мы интегрировали compiled graphs в vLLM, популярный open source проект для LLM inference, чтобы обеспечить tensor parallelism и pipeline parallelism inference с использованием Ray. С pipeline parallelism мы смогли достичь улучшения throughput/latency на 10-15% по сравнению со стандартным NCCL backend.
Современное распределенное обучение
В наших экспериментах с distributed training мы использовали Compiled Graphs для реализации более сложных и эффективных стратегий обучения. Например, мы значительно увеличили training throughput для multimodal contrastive моделей, таких как CLIP, используя heterogeneous devices и методы parallelism. Реализация такой программы намного сложнее с ванильным PyTorch.
В частности, применяя различные методы parallelism к text и vision энкодерам и размещая меньший text энкодер на более экономичных GPU, мы наблюдали увеличение training throughput на 20% и улучшение эффективности token-per-dollar на 43% по сравнению со стандартной FSDP реализацией PyTorch.
Мы также прототипировали различные алгоритмы pipeline parallel scheduling, такие как afab, 1f1b и zero bubble (https://arxiv.org/pdf/2401.10241). Мы смогли выразить сложные алгоритмы pipeline scheduling менее чем в 70 строках кода, достигая при этом производительности на уровне передовых библиотек, таких как Deepspeed, для модели Llama 7B на 4 GPU A100. Мы планируем дальнейшее улучшение производительности pipeline parallel training для workloads большего масштаба.
Мы планируем опубликовать отдельные статьи, посвященные этим use cases. Следите за обновлениями!
Выводы
Попробуйте Ray Compiled Graphs уже сегодня, установив последнюю версию Ray через pip install "ray[adag]"
. Функциональность находится на alpha стадии и активно разрабатывается. В будущем мы планируем рассказать больше о том, как использовать Ray Compiled Graphs для реализации distributed inference и training. Мы с нетерпением ждем новых use cases и приветствуем ваш вклад в развитие проекта!
Чтобы присоединиться:
Подключайтесь к ray.slack.com и присоединяйтесь к каналу #ray-accelerated-dag
Следите за нашей работой на Github