First Glance on GPU Accelerated Spark

Since I started to play with cluster, I thought there was no mission which was not able to be completed by cluster. If there is, add another node. However, except Cuda on the sing-alone machine, I have been rarely touched GPU accelerated cluster as data engineer. Yes, maybe spark ML can utilize GPU since spark 3.0, but most jobs of DE are data pipeline and data modeling related. Until recently I googled “GPU ETL”, then it came out SPARK-RAPIDS, which leverage RAPIDS libraries to accelerate processes by GPU.

Easy to configure on Spark to gain advantage of GPU

I almost didn’t change anything from my original spark code( mixed pyspark, spark SQL, python and delta table access) to let it running on GPU cluster. That is a great thing! Everybody wants to get double with half work.The result of mine is speed up to 3.1mins(12 cores, 3 NVIDIA T4 GPUs, 84GB memory) from 5.4 mins(24 cores, 84GB memory). I think I can get better results if my inputs is bigger chunk of data like 500MB+ parquet.

Here is my configuration on databricks. You can also refer to this official document. ( however, I didn’t make cluster running by this document)

# cluster runtime version: 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12)
# node type: NCasT4_v3-series(Azure)

# spark config:
spark.task.resource.gpu.amount 0.1 true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-0.5.0.jar:/databricks/spark/python
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.sql.adaptive.enabled false false
spark.rapids.sql.concurrentGpuTasks 2

# initial script path

# create initial script on your notebook
sudo wget -O /databricks/jars/rapids-4-spark_2.12-0.5.0.jar
sudo wget -O /databricks/jars/cudf-0.19.2-cuda10-1.jar""", True)
# some key configuration meaning
--conf spark.executor.resource.gpu.amout=1 # one executor per GPU, enforced
--conf spark.task.resource.gpu.amount = 0.5  # two tasks running on the same GPU
--conf spark.sql.files.maxPartitionBytes=512m  # big batch is better for GPU efficient, but not too big which lead to out of memory issue
--conf spark.sql.shffle.partitons=30 # reduce partiton size from default 200 to 30. large size of partition is better for GPU efficient
--conf spark.rapids.sql.explain=NOT_ON_GPU # watch the log message output why a spark operation is not able to run on the GPU

The only problem I met till now was, when the notebook includes the view composed of delta tables, it would popped out error message “covert struct type to GPU is not supported”. Easily persist view to table can solve this problem. Overall, very cool plugin.

The magic of spark-rapids

There are two major features to let spark running on GPU through Rapids.

  • Replace of CPU version ops with GPU version ops on the physical plan level.
  • Optimized spark shuffle using RDMA and GPU-to-GPU directly communication.
How RAPIDS accelerator works in spark: left side- replace CPU ops with GPU version based on data type & op type; right side-Optimized spark shuffle between GPUs.

Since the replacing happens on the physical plan level, we don’t have to change the query. Even the operations are not supported by RAPIDS, it still can run on CPU.

A CPU spark execute plan is like: logical plan —-> optimized by catalyst  ——> optimize the logical plan in a series of phases  —> physical plan  —-> executed on cluster. While A GPU spark execute plan replaces operations on physical plan with GPU version and execute on the columnar batch.

If we look at the physical plan on GPU and CPU, we will find they are almost one to one match except some GPU-CPU data exchange ops, like GPUColumnarToRow, GPURowToColumnar.

parts of GPU physical plan

parts of CPU physical plan

Compare of two physical plans, we can find lots of operations have GPU version already. “Scan parquet” replaced by “GpuScan parquet”, since GPU needs columnar format, so it skipped “ColumnarToRow” which in CPU version. Then “Project” replaced by “GpuProject”. followed by “GpuColumnarToRow”, because next step “InMemoryTelation” was running on CPU. so on so forth.

Let’s talk about another great feature: Optimized spark shuffle.

Spark needs shuffle on wild transforms while narrow transforms grouped into single stage. Spark-Rapids implements a custom spark shuffle manager for its GPU clusters shuffle operation.It is able to:

  • Spillable cache: it makes the data close to where it is produced, meanwhile it provides out of memory issue by moving data by the rule: GPU memory –> Host memory —> Disk.
Once GPU is out of memory, shuffle manager will put data into host memory, if host memory is not enough, then continuously push data into local disk.
  • Transport(shuffle): Handles block transfers between executors leveraging UCX libraries. ( see picture blew)
    • GPU0-GPU0: cache in GPU0 , zero copy
    • GPU0-GPU1: NVLink
    • GPU0-GPUX(remote): RDMA
      • Infiniband
      • RoCE
    • Disk- GPU: GPU direct storage(GDS)
RDMA and NVLINK is not surprised to be used in the cluster, since UCX is wildly used in OpenMPI for HPC area. These features have been included in RAPIDS libraries.

Some helpful information from official Q&A:

What operators are best suited for the GPU?

  • Group by operations with high cardinality
    • Joins with a high cardinality
    • Sorts with a high cardinality
    • Aggregates with a high cardinality
  • Window operations, especially for large windows
  • Aggregate with lots of distinct operations
  • Complicated processing
  • Writing Parquet/ORC
  • Reading CSV
  • Transcoding (reading an input file and doing minimal processing before writing it out again, possibly in a different format, like CSV to Parquet)

What  operators are not good for the GPU?

  • small amouts of data( hundred MB)
  • Cache coherent processing
  • Data movement
    • Slow I/O
    • Back forth to CPU(UDFs)
    • Shuffle
  • Limited GPU Memory

What is Rapids

  • a suit of open-source software libaraires and APIs for data science and data pipelines on GPUs. 
  • offering GPU dataframe  that is compatible with ApacheArrow
    • language independent columnar memory format
    • zeo-copy streaming messaging and interprocess communication without serialization overhead
  • Integrated with popluar framwworkd: PyTorch, Chainer, ApcheMxNet; spark, Dask

what is cuDF

  • GPU accelerated data preparation and feature engineering
  • python dropin pands replacement
  • features
    • CUDA 
      • low level libarary containing fuction implementations and c/C++ API
      • importing/exporting Apache Arrow using the CUDA IPC mechanism
      • CUDA kernels to perform element-wise math operations on GPU data frame columns
      • CUDA sort, join, groupby and reductions oprations on GPU dataframes
    • Python bindings
      • a python libaray forr GPU dataframe
      • python interface to CUDA C++ with addtional functionality
      • Creating Apache arrow form numpy arrays, Pands, DF, PyArrow Tables
      • JIT comppilation of UDFs using Numba




Getting started with RAPIDS Accelerator on Databricks,

Deep Dive into GPU Support in Apache Spark 3.x,

Accelerating Apache Spark 3.0 with GPUs and RAPIDS,

RAPIDS Shuffle Manager,


Unified Communication X,

Accelerating Apache Spark by Several Orders of Magnitude with GPUs,

Enable GPU Accelerate in WSL2 to support AI frameworks

Since Microsoft upgraded WSL to version 2, it introduced full Linux kernel and full VM manage features. Except the performance benefit through deep integration with windows, WSL2 allows installing additional powerful apps like docker and upgrading Linux kernel anytime when it is available.

Two months ago, Microsoft with NVIDIA brought GPU acceleration to WSL2. This new feature made me exciting, so that we don’t have to train our models on a separated Linux machine or install dual OS startup.

Figure 1: Stack image showing layers involved while running AI frameworks in WSL 2 containers. The container provides integration with CUDA related components. WSL2 communicates with windows host through GPU paravirtualization protocol

Before I start, I did some search about basic ideas of virtualization and WSL2 GPU. It is good for me to understand how GPU paravirtualization works in WSL2.

Types of virtualization

Figure 2: four major types of virtualization
  • Full virtualization. In full virtualization, there is almost a complete model of the underlying physical system resources that allows any and all installed software to run without modification. There are two types of full virtualization.
software assisted full virtualization( binary translation). like VMware workstation(32bit), virtual PC, VirtualBox(32 bits). issue: low performance
hardware- assisted full virtualization. eliminates the binary translation and directly interrupts with hardware ( intel VT-x and AMD-V). like , KVM, VMware ESX, Hyper-V, Xen. issue: virtual context execute privileged instruction directly on the processor.
  • Paravirtualization. Paravirtualization (PV) is an enhancement of virtualization technology in which a guest operating system (guest OS) is modified prior to installation inside a virtual machine (VM) in order to allow all guest OS within the system to share resources and successfully collaborate, rather than attempt to emulate an entire hardware environment. so the guests aware that it has been virtualized.  products like Xen, IBM LPAR, Oracle VM for X86
Xen supports both Full virtualization and Para-virtualization
  • Hybrid virtualization(hardware virtualized with PV drivers). virtual machine uses PV for specific hardware drivers(like I/O), and the host use full virtualization for other features. products like Oracle VM for x86, Xen. 
VMware paravirtual with hardware full virtualization
  • OS level Virtualization. aka containerization. No overhead . Products like docker, Linux LCX, AIX WPAR
The difference between VM and container

Except containerization, all virtualization use hypervisor to communicate with the host. We can take a look how hypervisor works blew.

  • Hypervisor
    • Emulation. (software full virtualization)
      • emulate a certain piece of hardware which guest VM can only see.
      • expense of performance since “common lowest” denominator
      • need to translate instruction
      • wide compatibility
    • Paravirtualization
      • only support certain hardware in certain configurations.
      • Direct hardware access is possible
      • Compatibility is limited
    • hardware pass-through(hardware full virtualization)
      • native performance, but need proper drivers for the real physical hardware
      • hardware specific images
      • GPU supported

GPU Virtualization on Windows

How it works on WSL

  • a new kernel driver “dxgkrnl” which expoes “/dev/dxg” device to user mode.
  • /dev/dxg mimic the native WDDM D3DKMT kernel service layer on Windows.
  • dxgkrnl communicate with its big brother on Windows through VM Bus WDDM paravirtualization protocol.
Figure 3: there is no partitioning of resources between Linux and Windows or limit on Linux application

DxCore & D3D12 on Linux

  • is compiled from the same source code as d3d12.dll on windows
  • except Present() function, all others are same with windows. 
  • libxcore(DxCore) is a simplified version of dxgi
  • GPU manufacturer partners provide UMD(user mode driver) for Linux
Figure 4: D3D12 builds upon the /dev/dxg device

DirectML and AI Training

  • DirectML sits on top of D3D12 API, provides a a collection of compute compute operations.
  • Tensorflow with an integrated DirectML backend.
Figure 5: DirectML provides beginner a basic ML framework

OpenGL, OpenCL & Vulkan

  • Mesa library is the mapping layer which bring hardware acceleration for OpenCL , OpenGL
  • vulkan is not supported right now.
Figure 6: WSL2 only support OpenGL and OpenCL right now.

Nvidia CUDA

  • a version of CUDA taht directly targets WDDM 2.9 abstraction exposed by /dev/dxg. 
  • enables CUDA-X libaries such as cuDNN, cuBLAS, TensorRT.
  • available on any glibc-based WSL distro
Figure 7: NVIDIA-docker tolls available ( NVIDIA container toolkit), which provides us container like plugin and usage experience.

GPU container in WSL

  • libnvidia-container libarary is able to detect the presence of at runtime and uses it to detect all the GPUs exposed to this interface.
  • driver store is a folder that containers all driver librarians for both Linux and Windows
Figure 8: NVIDIA docker provides NVIDIA container toolkits along with lots of good images.

GUI Application is still under developing.

How to enable GPU Acceleration in WSL

for the detail step, we can refer Here I brief some keypoints:

  1. Windows version: 20150 or above (Dev Channel)
  2. Enable WSL 2
  3. Install Ubuntu On WSL
  4. Install Windows Terminal
  5. Upgrade kernel to 4.19.121 or higher
  7. Install docker in WSL:  
    • curl | sh
    • You can see vmmen process on your windows task manger. It is the process for virtual machine in wsl2
  8. Install Nvidia Container Toolkit( nvidia-docker2)
Figure 9: docker in WSL2 with NIVIDA container toolkit

9. Start A TensorFlow Container

# test for docker
docker run --gpus all nbody -gpu -benchmark
# pull tersorflow image and run it
docker run -it --gpus all -p 8888:8888 tensorflow/tensorflow:latest-gpu-py3-jupyter

After you pull tersoflow image, and run it. You can see following instruction:

Figure 10: replace to localhost, and open this URL on your browser then we can use GPU acceleration in our WSL2


Para virtualization vs Full virtualization vs Hardware assisted Virtualization,

Emulation, paravirtualization, and pass-through: what you need to know for client hypervisors,

DirectX is coming to the Windows Subsystem for Linux,

NVIDIA Container Toolkit,

CUDA on WSL User Guide,

NVIDIA Drivers for CUDA on WSL,

Tensorflow image on Docker,