Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] GDS Integration #1445

Closed
11 tasks done
rongou opened this issue Jan 5, 2021 · 2 comments
Closed
11 tasks done

[FEA] GDS Integration #1445

rongou opened this issue Jan 5, 2021 · 2 comments
Assignees
Labels
epic Issue that encompasses a significant feature or body of work feature request New feature or request P1 Nice to have for release performance A performance related task/issue

Comments

@rongou
Copy link
Collaborator

rongou commented Jan 5, 2021

Motivation

For many jobs running on spark-rapids, the ratio of computation to data movement is low, making them I/O bound. GPUDirect Storage (GDS) enables Direct Memory Access (DMA) between storage and GPUs, with increased bandwidth and lower latency, thus could potentially reduce file I/O overhead and increase overall job performance.

Goals

  • Integrate GDS and the cuFile API into spark-rapids.
  • Support reading input data and writing output data through GDS.
  • Support storing Spark temporary data using GDS.
  • Benchmark GDS performance with spark-rapids and provide recommendations on when to use GDS.

Non-Goals

  • GDS/cuFile is currently distributed as a user library and a kernel module, and requires installing MOFED and a few third-party libraries. In the future GDS may become part of the CUDA driver and the CUDA toolkit. The exact distribution mechanism is beyond the scope of this document.
  • For distributed filesystems, GDS currently supports Lustre, NFS, and WekaFS, which are not commonly used for storing data for Spark jobs. The initial integration will not focus on distributed filesystem support.
  • In a multi-node environment, GDS can write to and read from a remote storage server through NVMe-oF. This may be an alternative to the current UCX-based shuffle plugin, but probably not a full replacement.

Assumptions

In a deployment environment, we assume the user has correctly set up NVMe/NVMe-oF support for GDS, and installed the cuFile kernel module and the cuFile user library.

Risks

  • Until the final distribution mechanism is sorted out, spark-rapids with GDS may add extra complexity to the deployment process.
  • Depending on the hardware and software stack, and the particular Spark job, GDS may not provide a significant performance boost.

Design

This blog post provides a good overview of GDS. There is some public documentation.

For spark-rapids, integrating with GDS can be roughly considered along two dimensions: the type of data, and the type of storage.

A Spark job may deal with these types of data involving disk storage:

  • Input data. This is the data a Spark job reads from disk for processing, typically in CSV or Parquet format, and usually of large size.
  • Output data. This is the data a Spark job writes out. For an analytical query, the output size may be very small, but for data transformations (e.g. converting from CSV to Parquet) the output volume can be considerable.
  • Shuffle data. For many jobs, between the “map” and the “reduce” phases, the whole intermediate dataset needs to be redistributed across the whole cluster (“shuffled”). The default Spark shuffle manager writes all partitions to disk, while the spark-rapids shuffle plugin tries to keep buffers in the GPU.
  • Spilling. When a dataset cannot fit in memory, a portion may be spilled to disk. In external sorting, data is spilled to disk and read back in during merge sort. For the spark-rapids shuffle plugin, GPU buffers may be spilled to system memory or disk.

Storage can be accessed in several ways:

  • Local storage. These are NVMe drives attached to the local host, maybe in a RAID 0 configuration. Different PCIe switches may provide different affinity for each GPU, but as a first order of approximation, we can consider a single filesystem.
  • NVMe-oF. A storage server attached to the network (Ethernet, InfiniBand, or Fibre Channel) can be accessed using NVMe over fabrics.
  • Distributed filesystems. These provide a file system abstraction over remote storage. GDS currently supports Lustre, NFS, and WekaFS.

Here we can see how these two dimensions interact:

Data Type Local storage NVMe-oF Distributed filesystem
Input/Output Not suitable for “production” use, demo/benchmarks only Needs storage server Future
Shuffle Single-node only, or with UCX on top? Needs storage server; alternative to UCX? Future
Spilling Should work in general May be overkill Future

We should start with local storage since it is the easiest to set up. Possible first steps:

  • Input/output data on local storage. Since input data tends to have the biggest size, optimizing the I/O here may provide a big performance boost. It should be relatively easy to implement as only the readers/writers need to be changed to use the cuFile API. The main drawback is that in a real user environment the data is unlikely to fit on a single node, so this is only useful for initial demos and benchmarking.
  • Shuffle data on local storage. Spark’s default sort-based shuffle now has a ShuffleDataIO interface that can be overridden by passing the configuration spark.shuffle.sort.io.plugin.class. A new class can be implemented on top of the cuFile API. See blog post, JIRA, and SPIP. The drawback again is that this is single-node only.
  • Spilling to local storage. The easiest way to do this is to change the spilling mechanism in the spark-rapids shuffle plugin to spill directly to NVMe using the cuFile API.

At this point it is hard to predict which approach would provide the biggest benefit, so they probably should be tried in parallel.

Alternatives Considered

We may want to set up a distributed filesystem (e.g. NFS) on a storage server and implement everything on top of that. This would make the implementation production ready. But this may be a complex process and the performance may suffer, so it is higher risk as a first step.

Tasks

These are the initial list of tasks. May be modified or expanded later.

@rongou rongou added feature request New feature or request ? - Needs Triage Need team to review and classify labels Jan 5, 2021
@rongou rongou self-assigned this Jan 5, 2021
@sameerz sameerz added P1 Nice to have for release epic Issue that encompasses a significant feature or body of work performance A performance related task/issue and removed ? - Needs Triage Need team to review and classify labels Jan 5, 2021
@rongou
Copy link
Collaborator Author

rongou commented Jun 4, 2021

Closing this, there are a few followup issues that will be tracked separately: #2492 #2504 #2592 #2593

@rongou rongou closed this as completed Jun 4, 2021
@sameerz
Copy link
Collaborator

sameerz commented Jun 13, 2021

@rongou can we create an issue with the epic label for the follow up issues?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
epic Issue that encompasses a significant feature or body of work feature request New feature or request P1 Nice to have for release performance A performance related task/issue
Projects
None yet
Development

No branches or pull requests

2 participants