Monday, December 23, 2024

Understanding Distributed Training in PyTorch: Exploring Distributed Data Parallelism and Mixed-Precision Techniques

Share

Accelerating Model Training with PyTorch’s DistributedDataParallel

In the realm of deep learning, training models on large datasets can be a time-consuming process. However, with the advent of multi-GPU training techniques, we can significantly reduce training time. In this tutorial, we will explore how to utilize nn.parallel.DistributedDataParallel in PyTorch to train our models across multiple GPUs. We will walk through a minimal example of training an image classifier on the CIFAR10 dataset and demonstrate how to speed up the training process effectively.

Getting Started: Imports and Data Preparation

Before diving into the code, let’s start with the necessary imports. We will be using PyTorch and torchvision for our model and dataset:

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time

For our experiments, we will use the CIFAR10 dataset with a batch size of 256. The CIFAR10 dataset consists of 60,000 32×32 color images in 10 classes, with 6,000 images per class.

Creating the Data Loader

We will create a function to load the CIFAR10 dataset with appropriate transformations:

def create_data_loader_cifar10():
    transform = transforms.Compose([
        transforms.RandomCrop(32),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    batch_size = 256
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                             download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                              shuffle=True, num_workers=10, pin_memory=True)

    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                            download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                             shuffle=False, num_workers=10)
    return trainloader, testloader

Training on a Single GPU

Before we scale up to multiple GPUs, let’s first train our model on a single Nvidia A100 GPU for one epoch. The training function is defined as follows:

def train(net, trainloader):
    print("Start training...")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
    epochs = 1
    num_of_batches = len(trainloader)

    for epoch in range(epochs):
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            images, labels = inputs.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'[Epoch {epoch + 1}/{epochs}] loss: {running_loss / num_of_batches:.3f}')
    print('Finished Training')

The main script will tie everything together:

if __name__ == '__main__':
    start = time.time()
    PATH = './cifar_net.pth'
    trainloader, testloader = create_data_loader_cifar10()
    net = torchvision.models.resnet50(False).cuda()
    start_train = time.time()
    train(net, trainloader)
    end_train = time.time()
    torch.save(net.state_dict(), PATH)
    test(net, PATH, testloader)
    end = time.time()
    seconds = (end - start)
    seconds_train = (end_train - start_train)
    print(f"Total elapsed time: {seconds:.2f} seconds, Train 1 epoch {seconds_train:.2f} seconds")

After running the training, we can observe the accuracy and time taken for training. For instance, the accuracy of the network on the 10,000 test images might be around 27%, with a total elapsed time of approximately 69 seconds.

Transitioning to Multi-GPU Training

Now that we have established a baseline with single GPU training, it’s time to optimize our training using multiple GPUs. The first step is to understand the limitations of torch.nn.DataParallel and why torch.nn.parallel.DistributedDataParallel is preferred.

Understanding DataParallel vs. DistributedDataParallel

DataParallel is a single-process, multi-thread approach that only works on a single machine. It replicates the model on each GPU and scatters the input data across them. However, this can lead to inefficiencies, especially with communication overhead.

In contrast, DistributedDataParallel (DDP) is a multi-process approach that scales better across multiple GPUs and nodes. Each GPU runs in its own process, which allows for more efficient gradient communication and reduces the bottleneck seen in DataParallel.

Setting Up DistributedDataParallel

To convert our training script to use DDP, we need to follow a few steps:

Step 1: Initialize the Distributed Learning Processes

We will create a function to initialize the distributed environment:

import torch.distributed as dist

def init_distributed():
    dist_url = "env://"
    rank = int(os.environ["RANK"])
    world_size = int(os.environ['WORLD_SIZE'])
    local_rank = int(os.environ['LOCAL_RANK'])
    dist.init_process_group(
        backend="nccl",
        init_method=dist_url,
        world_size=world_size,
        rank=rank
    )
    torch.cuda.set_device(local_rank)
    dist.barrier()

This function sets up the distributed environment based on the environment variables provided when launching the script.

Step 2: Wrap the Model Using DDP

Next, we need to wrap our model with DistributedDataParallel:

net = torchvision.models.resnet50(False).cuda()
local_rank = int(os.environ['LOCAL_RANK'])
net = nn.parallel.DistributedDataParallel(net, device_ids=[local_rank])

Step 3: Use a DistributedSampler in Your DataLoader

To ensure that each GPU gets a unique subset of the data, we will use a DistributedSampler:

from torch.utils.data.distributed import DistributedSampler

def create_data_loader_cifar10():
    # (same transformation code as before)

    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                             download=True, transform=transform)
    train_sampler = DistributedSampler(dataset=trainset, shuffle=True)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                              sampler=train_sampler, num_workers=10, pin_memory=True)

    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                            download=True, transform=transform)
    test_sampler = DistributedSampler(dataset=testset, shuffle=False)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                             sampler=test_sampler, num_workers=10)
    return trainloader, testloader

Training with DDP

In the training loop, we need to set the epoch for the sampler to ensure proper shuffling:

for epoch in range(epochs):
    trainloader.sampler.set_epoch(epoch)
    # (rest of the training loop)

Launching the Distributed Training

To run the script, we will use the following command:

$ python -m torch.distributed.launch --nproc_per_node=4 main_script.py

This command launches the script with 4 processes, each corresponding to a GPU.

Mixed Precision Training

To further enhance performance, we can implement mixed precision training using PyTorch’s automatic mixed precision (AMP). This technique combines FP16 and FP32 to speed up training while maintaining model accuracy.

Here’s how to implement mixed precision in the training loop:

from torch.cuda.amp import GradScaler, autocast

fp16_scaler = GradScaler()

for epoch in range(epochs):
    trainloader.sampler.set_epoch(epoch)
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        images, labels = inputs.cuda(), labels.cuda()
        optimizer.zero_grad()
        with autocast():
            outputs = net(images)
            loss = criterion(outputs, labels)
        fp16_scaler.scale(loss).backward()
        fp16_scaler.step(optimizer)
        fp16_scaler.update()

Results and Conclusion

In an ideal scenario, using N workers would yield an N-fold speedup. However, practical results show that achieving a 2X speedup with 4 GPUs in DDP is more realistic. Mixed precision training can also provide substantial speed improvements, especially on modern GPU architectures like the Nvidia A100.

Here are some results from our experiments:

Configuration Time (seconds)
Single GPU (baseline) 13.2
DataParallel (4 GPUs) 19.1
DistributedDataParallel (2 GPUs) 9.8
DistributedDataParallel (4 GPUs) 6.1
DistributedDataParallel (4 GPUs) + Mixed Precision 6.5

It’s important to note that DDP uses an effective batch size of 4 * 256 = 1024, which can affect validation accuracy.

The code for this tutorial is available on GitHub. If you’re looking to deepen your understanding of PyTorch, consider checking out the recommended books on deep learning with PyTorch.

By following the steps outlined in this tutorial, you should now have a solid foundation for training your models using PyTorch’s DistributedDataParallel. Happy training!

Read more

Related updates