Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines

Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines

Big Data Digest Production

Source:Medium

Compiled by:Zhao Jike

In recent years, there have been significant advancements in deep learning hardware, with Nvidia’s latest products, the Tesla V100 and Geforce RTX series, featuring dedicated tensor cores designed to accelerate common operations in neural networks.

Notably, the V100 has sufficient capability to train neural networks at thousands of images per second, allowing small models based on the ImageNet dataset to be trained on a single GPU in just a few hours, a stark contrast to the 5 days it took to train the AlexNet model on ImageNet in 2012!

However, powerful GPUs can overwhelm data preprocessing pipelines.To address this issue, TensorFlow released a new data loader: tf.data.Dataset, which is written in C++ and uses a graph-based approach to link multiple preprocessing operations together.

On the other hand, PyTorch uses a data loader written in Python on the PIL library, which is convenient and flexible but lacks speed (although the PIL-SIMD library has slightly improved this situation).

Enter NVIDIA’s Data Loading Library (DALI): It is designed to eliminate data preprocessing bottlenecks, allowing training and inference to run at full speed.DALI is primarily used for preprocessing on GPUs, but most operations also have fast implementations on CPUs.This article mainly focuses on PyTorch, but DALI also supports TensorFlow, MXNet, and TensorRT, with particular emphasis on TensorRT having high support.It allows the training and inference steps to use the exact same preprocessing code.It should be noted that different frameworks (like TensorFlow and PyTorch) often have slight differences between data loaders, which can affect accuracy.

This article showcases techniques demonstrated by a blogger on Medium to enhance DALI’s utilization and create a fully CPU-based pipeline.These techniques are used to maintain long-term memory stability and can increase batch sizes by 50% compared to the CPU and GPU pipelines provided by the DALI package.

DALI Long-Term Memory Usage
The first issue is that RAM usage increases with training time, leading to OOM errors (even on a VM with 78GB of RAM), and this has not yet been fixed.
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
The only solution is to re-import DALI and reconstruct the training and validation pipes each time:
del self.train_loader, self.val_loader, self.train_pipe, self.val_pipe
import torch.cuda
import gc
import importlib
import dali
from dali import HybridTrainPipe, HybridValPipe, DaliIteratorCPU, DaliIteratorGPU
# <rebuild DALI pipeline>
Note that with this method, DALI still requires a significant amount of RAM for optimal results.Considering today’s RAM prices, this is not a major issue.As shown in the table below, the maximum batch size of DALI may be 50% lower than that of TorchVision:
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
The next part discusses methods to reduce GPU usage.
Building a Fully CPU-Based Pipeline
Let’s first look at an example CPU pipeline.When peak throughput is not the primary concern, a CPU-based pipeline can be very useful.The CPU training pipeline only performs decoding and resizing operations on the CPU, while the CropMirrorNormalize operation runs on the GPU.Since transferring output to the GPU alone uses a significant amount of GPU memory, to avoid this, we modified the example CPU pipeline to run entirely on the CPU:
class HybridTrainPipe(Pipeline):
   def __init__(self, batch_size, num_threads, device_id, data_dir, crop,
                mean, std, local_rank=0, world_size=1, dali_cpu=False, shuffle=True, fp16=False,
                min_crop_size=0.08):
       # As we're recreating the Pipeline at every epoch, the seed must be -1 (random seed)
       super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id, seed=-1)
       # Enabling read_ahead slowed down processing ~40%
       self.input = ops.FileReader(file_root=data_dir, shard_id=local_rank, num_shards=world_size,
                                   random_shuffle=shuffle)
       # Let user decide which pipeline works best with the chosen model
       if dali_cpu:
           decode_device = "cpu"
           self.dali_device = "cpu"
           self.flip = ops.Flip(device=self.dali_device)
       else:
           decode_device = "mixed"
           self.dali_device = "gpu"
           output_dtype = types.FLOAT
           if self.dali_device == "gpu" and fp16:
               output_dtype = types.FLOAT16
           self.cmn = ops.CropMirrorNormalize(device="gpu",
                                              output_dtype=output_dtype,
                                              output_layout=types.NCHW,
                                              crop=(crop, crop),
                                              image_type=types.RGB,
                                              mean=mean,
                                              std=std,)
       # To be able to handle all images from full-sized ImageNet, this padding sets the size of the internal nvJPEG buffers without additional reallocations
       device_memory_padding = 211025920 if decode_device == 'mixed' else 0
       host_memory_padding = 140544512 if decode_device == 'mixed' else 0
       self.decode = ops.ImageDecoderRandomCrop(device=decode_device, output_type=types.RGB,
                                                device_memory_padding=device_memory_padding,
                                                host_memory_padding=host_memory_padding,
                                                random_aspect_ratio=[0.8, 1.25],
                                                random_area=[min_crop_size, 1.0],
                                                num_attempts=100)
       # Resize as desired.  To match torchvision data loader, use triangular interpolation.
       self.res = ops.Resize(device=self.dali_device, resize_x=crop, resize_y=crop,
                             interp_type=types.INTERP_TRIANGULAR)
       self.coin = ops.CoinFlip(probability=0.5)
       print('DALI "{0}" variant'.format(self.dali_device))
   def define_graph(self):
       rng = self.coin()
       self.jpegs, self.labels = self.input(name="Reader")
       # Combined decode & random crop
       images = self.decode(self.jpegs)
       # Resize as desired
       images = self.res(images)
       if self.dali_device == "gpu":
           output = self.cmn(images, mirror=rng)
       else:
           # CPU backend uses torch to apply mean & std
           output = self.flip(images, horizontal=rng)
       self.labels = self.labels.gpu()
       return [output, self.labels]
The DALI pipeline now outputs an 8-bit tensor on the CPU.We need to use PyTorch to handle the CPU->GPU transfer, floating-point conversion, and normalization.The last two operations are performed on the GPU, which is fast and reduces CPU->GPU memory bandwidth demand.Before transferring to the GPU, I tried fixing this tensor, but it did not yield any performance improvement, so I paired it with a pre-register:
def _preproc_worker(dali_iterator, cuda_stream, fp16, mean, std, output_queue, proc_next_input, done_event, pin_memory):
   """
   Worker function to parse DALI output & apply final preprocessing steps
   """
   while not done_event.is_set():
       # Wait until main thread signals to proc_next_input -- normally
       proc_next_input.wait()
       proc_next_input.clear()
       if done_event.is_set():
           print('Shutting down preproc thread')
           break
       try:
           data = next(dali_iterator)
           # Decode the data output
           input_orig = data[0]['data']
           target = data[0]['label'].squeeze().long()  # DALI should already output target
           # Copy to GPU and apply final processing in separate CUDA stream
           with torch.cuda.stream(cuda_stream):
               input = input_orig
               if pin_memory:
                   input = input.pin_memory()
                   del input_orig  # Save memory
               input = input.cuda(non_blocking=True)
               input = input.permute(0, 3, 1, 2)
               # Input tensor is kept as 8-bit integer for transfer to GPU, to save bandwidth
               if fp16:
                   input = input.half()
               else:
                   input = input.float()
               input = input.sub_(mean).div_(std)
           # Put the result
           output_queue.put((input, target))
       except StopIteration:
           print('Resetting DALI loader')
           dali_iterator.reset()
           output_queue.put(None)

class DaliIteratorCPU(DaliIterator):
   """
   Wrapper class to decode the DALI iterator output & provide iterator that functions in the same way as TorchVision.
   Note that permutation to channels first, converting from 8-bit integer to float & normalization are all performed
   """
   def __init__(self, fp16=False, mean=(0., 0., 0.), std=(1., 1., 1.), pin_memory=True, **kwargs):
       super().__init__(**kwargs)
       print('Using DALI CPU iterator')
       self.stream = torch.cuda.Stream()
       self.fp16 = fp16
       self.mean = torch.tensor(mean).cuda().view(1, 3, 1, 1)
       self.std = torch.tensor(std).cuda().view(1, 3, 1, 1)
       self.pin_memory = pin_memory
       if self.fp16:
           self.mean = self.mean.half()
           self.std = self.std.half()
       self.proc_next_input = Event()
       self.done_event = Event()
       self.output_queue = queue.Queue(maxsize=5)
       self.preproc_thread = threading.Thread(
           target=_preproc_worker,
           kwargs={'dali_iterator': self._dali_iterator, 'cuda_stream': self.stream, 'fp16': self.fp16, 'mean': self.mean, 'std': self.std, 'proc_next_input': self.proc_next_input, 'done_event': self.done_event, 'output_queue': self.output_queue, 'pin_memory': self.pin_memory})
       self.preproc_thread.daemon = True
       self.preproc_thread.start()
       self.proc_next_input.set()
   def __next__(self):
       torch.cuda.current_stream().wait_stream(self.stream)
       data = self.output_queue.get()
       self.proc_next_input.set()
       if data is None:
           raise StopIteration
       return data
   def __del__(self):
       self.done_event.set()
       self.proc_next_input.set()
       torch.cuda.current_stream().wait_stream(self.stream)
       self.preproc_thread.join()
GPU-Based Pipeline
In tests, the speed of the above CPU pipeline is about twice that of the TorchVision data loader under similar maximum batch sizes.The CPU pipeline works well with large models like ResNet50;however, when using smaller models like AlexNet or ResNet18, the CPU performs better.The issue with the GPU pipeline is that the maximum batch size is reduced by nearly 50%, limiting throughput.
One significant way to reduce GPU memory usage is to isolate the validation pipeline from the GPU until the last call.This is easy to do since we have already re-imported DALI and recreated the data loader in each epoch.
More Tips
During validation, using evenly divided batch sizes yields the best results, avoiding the need for incomplete batches at the end of the validation dataset.
Similar to TensorFlow and PyTorch data loaders, TorchVision and DALI pipelines do not produce the same outputs—you will see slight differences in validation accuracy.I found this to be due to different JPEG image decoders.On the other hand, DALI supports TensorRT, allowing the same preprocessing to be used for training and inference.
For peak throughput, try setting the number of data loaders to the number of virtual CPU cores, with 2 virtual cores corresponding to 1 physical core.
If you want absolute best performance and do not require outputs similar to TorchVision, try turning off DALI’s triangular interpolation.
Don’t forget disk I/O.Make sure you have enough memory to cache the dataset and/or a very fast SSD.DALI reads up to 400Mb/s!
Integration
To conveniently integrate these modifications, I created a data loader class that includes all the modifications described here, including DALI and TorchVision backends.Usage is simple.Instantiate the data loader:
dataset = Dataset(data_dir,
                 batch_size,
                 val_batch_size
                 workers,
                 use_dali,
                 dali_cpu,
                 fp16)
Then get the training and validation data loaders:
train_loader = dataset.get_train_loader()
val_loader = dataset.get_val_loader()
At the end of each training cycle, reset the data loader:
dataset.reset()
Alternatively, the validation pipeline can be recreated on the GPU before model validation:
dataset.prep_for_val()
Benchmark
Here are the maximum batch sizes using ResNet18:
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
Thus, by applying these modifications, the maximum batch size usable by DALI in both CPU and GPU modes has increased by approximately 50%!
Here are some throughput graphs using Shufflenet V2 0.5 and batch size 512:
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
Here are some results using DALI GPU pipelines to train various networks, including in TorchVision:
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
All tests were run on a Google Cloud V100 instance with 12 vCPUs (6 physical cores), 78GB RAM, using Apex FP16 training.To reproduce these results, use the following parameters:
--fp16 --batch-size 512 --workers 10 --arch "shufflenet_v2_x0_5 or resnet18" --prof --use-dali
So, DALI allows a single-core Tesla V100 to achieve nearly 4000 images/second processing speed!This is just over half of what the Nvidia DGX-1 can achieve (which has 8 V100 GPUs), even though we used small models.For me, being able to run ImageNet on a single GPU in a few hours is a productivity leap.
The code provided in this article is as follows:
https://github.com/yaysummeriscoming/DALI_pytorch_demo
Related reports:
https://towardsdatascience.com/nvidia-dali-speeding-up-pytorch-876c80182440

Intern/Full-Time Editor Recruitment

Join us and experience every detail of writing at a professional tech media outlet, growing alongside a group of the best talents in the most promising industry. Located in Beijing, Qinghua East Gate, reply with “Recruitment” on the Big Data Digest homepage to learn more. Please send your resume directly to [email protected]

Volunteer Introduction
Reply with “Volunteer” to join us
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
Speeding Up PyTorch by Four Times: Enhancing DALI Utilization and Creating CPU-Based Pipelines
People who click “Looking” have become better looking!

Leave a Comment