Skip to content

Commit

Permalink
Executor 2.0: Stream assignment (#5602)
Browse files Browse the repository at this point in the history
* Add ExecNode stream assignment algorithms and tests.
NOTE: Follow up #5620 implements per-operator assignment.
---------
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
  • Loading branch information
mzient committed Sep 4, 2024
1 parent 6efdeef commit 22304f4
Show file tree
Hide file tree
Showing 2 changed files with 366 additions and 0 deletions.
170 changes: 170 additions & 0 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DALI_PIPELINE_EXECUTOR_EXECUTOR2_STREAM_ASSIGNMENT_H_
#define DALI_PIPELINE_EXECUTOR_EXECUTOR2_STREAM_ASSIGNMENT_H_

#include <algorithm>
#include <cassert>
#include <functional>
#include <optional>
#include <queue>
#include <unordered_map>
#include <set>
#include <utility>
#include <vector>
#include "dali/pipeline/graph/graph_util.h"
#include "dali/pipeline/executor/executor2/exec_graph.h"
// TODO(michalz): This is here for review process only. Remove when exec2.h is available
// #include "dali/pipeline/executor/executor2/exec2.h"
#include "dali/pipeline/graph/op_graph2.h"

namespace dali {
namespace exec2 {

// TODO(michalz): This is here for review process only. Remove when exec2.h is available
enum class StreamPolicy : int {
Single, //< There's just one stream that's used by all operators
PerBackend, //< Operators are scheduled on a stream specific to their backend (mixed or GPU)
PerOperator //< Independent operators are executed on separate streams.

// TODO(michalz): implement minimal assignment for PerOperator policy
};


template <StreamPolicy policy>
class StreamAssignment;

inline bool NeedsStream(const ExecNode *node) {
if (node->is_pipeline_output) {
for (auto &pipe_out : node->inputs) {
if (pipe_out->device == StorageDevice::GPU)
return true;
}
} else {
return node->backend != OpType::CPU;
}
return false;
}

inline OpType NodeType(const ExecNode *node) {
if (node->is_pipeline_output) {
OpType type = OpType::CPU;
for (auto &pipe_out : node->inputs) {
if (pipe_out->device == StorageDevice::GPU) {
auto producer_type = pipe_out->producer->backend;
if (producer_type == OpType::GPU) {
return OpType::GPU;
} else if (producer_type == OpType::MIXED) {
type = OpType::MIXED;
}
}
}
return type;
} else {
return node->backend;
}
}

/** A trivial stream policy, with just one stream shared by all non-CPU operaotrs. */
template <>
class StreamAssignment<StreamPolicy::Single> {
public:
explicit StreamAssignment(ExecGraph &graph) {
for (auto &node : graph.Nodes()) {
if (NeedsStream(&node)) {
needs_stream_ = true;
break;
}
}
}

std::optional<int> operator[](const ExecNode *node) const {
if (NeedsStream(node))
return 0;
else
return std::nullopt;
}

int NumStreams() const {
return needs_stream_ ? 1 : 0;
}

private:
bool needs_stream_ = false;
};


/** A simple stream policy where all mixed and GPU operators share their respective streams.
*
* In this policy there are 0..2 streams, depending on the number of mixed and GPU nodes:
* 0 - only CPU nodes
* 1 - there are some mixed or some GPU nodes, but not both
* 2 - there are both mixed and CPU nodes present.
*/
template <>
class StreamAssignment<StreamPolicy::PerBackend> {
public:
explicit StreamAssignment(ExecGraph &graph) {
for (auto &node : graph.Nodes()) {
switch (NodeType(&node)) {
case OpType::GPU:
has_gpu_ = true;
if (has_mixed_)
return; // we already have both, nothing more can happen
break;
case OpType::MIXED:
has_mixed_ = true;
if (has_gpu_)
return; // we already have both, nothing more can happen
break;
default:
break;
}
}
}

/** Returns a stream index for a non-CPU operator.
*
* If the node is a Mixed node, it gets stream index 0.
* If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise
* the only stream is the GPU stream and the returned index is 0.
*/
std::optional<int> operator[](const ExecNode *node) const {
switch (NodeType(node)) {
case OpType::CPU:
return std::nullopt;
case OpType::GPU:
return has_mixed_ ? 1 : 0;
case OpType::MIXED:
return 0;
default:
assert(false && "Unreachable");
return std::nullopt;
}
}

int NumStreams() const {
return has_gpu_ + has_mixed_;
}

private:
bool has_gpu_ = false;
bool has_mixed_ = false;
};

} // namespace exec2
} // namespace dali

#endif // DALI_PIPELINE_EXECUTOR_EXECUTOR2_STREAM_ASSIGNMENT_H_
196 changes: 196 additions & 0 deletions dali/pipeline/executor/executor2/stream_assignment_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>
#include <functional>
#include <map>
#include <optional>
#include <string>
#include <vector>
#include "dali/pipeline/executor/executor2/stream_assignment.h"
#include "dali/pipeline/operator/operator.h"
#include "dali/pipeline/operator/operator_factory.h"

namespace dali {

template <typename Backend>
class StreamAssignmentDummyOp : public Operator<Backend> {
public:
using Operator<Backend>::Operator;
USE_OPERATOR_MEMBERS();

void RunImpl(Workspace &ws) override {}
bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
return false;
}
};

DALI_SCHEMA(StreamAssignmentDummyOp)
.NumInput(0, 999)
.NumOutput(0)
.AdditionalOutputsFn([](const OpSpec &spec) {
return spec.NumOutput();
});

DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<CPUBackend>, CPU);
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<MixedBackend>, Mixed);
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<GPUBackend>, GPU);

namespace exec2 {

namespace {

OpSpec SpecDev(const std::string &device) {
return OpSpec("StreamAssignmentDummyOp")
.AddArg("device", device)
.AddArg("num_threads", 1)
.AddArg("max_batch_size", 1);
}

OpSpec SpecGPU() {
return SpecDev("gpu");
}

OpSpec SpecCPU() {
return SpecDev("cpu");
}

OpSpec SpecMixed() {
return SpecDev("mixed");
}

auto MakeNodeMap(const ExecGraph &graph) {
std::map<std::string_view, const ExecNode *, std::less<>> map;
for (auto &n : graph.Nodes())
if (!n.instance_name.empty()) {
map[n.instance_name] = &n;
}
return map;
}

} // namespace

TEST(Exec2Test, StreamAssignment_Single_OnlyCPU) {
graph::OpGraph::Builder b;
b.Add("a",
SpecCPU()
.AddOutput("a->out", "cpu"));
b.AddOutput("a->out_cpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<StreamPolicy::Single> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], std::nullopt);
}

TEST(Exec2Test, StreamAssignment_Single_CPUMixedGPU) {
graph::OpGraph::Builder b;
b.Add("a",
SpecCPU()
.AddOutput("a->b", "cpu"));
b.Add("b",
SpecMixed()
.AddInput("a->b", "cpu")
.AddOutput("b->c", "gpu"));
b.Add("c",
SpecGPU()
.AddInput("b->c", "gpu")
.AddOutput("c->out", "gpu"));
b.AddOutput("c->out_gpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<StreamPolicy::Single> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], std::nullopt);
EXPECT_EQ(assignment[map["b"]], 0);
EXPECT_EQ(assignment[map["c"]], 0);
}


TEST(Exec2Test, StreamAssignment_PerBackend_OnlyCPU) {
graph::OpGraph::Builder b;
b.Add("a",
SpecCPU()
.AddOutput("a->out", "cpu"));
b.AddOutput("a->out_cpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<StreamPolicy::Single> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], std::nullopt);
}


TEST(Exec2Test, StreamAssignment_PerBackend_CPUMixed) {
graph::OpGraph::Builder b;
b.Add("a",
SpecCPU()
.AddOutput("a->b", "cpu")
.AddOutput("a->c", "cpu"));
b.Add("b",
SpecMixed()
.AddInput("a->b", "cpu")
.AddOutput("b->out", "gpu"));
b.Add("c",
SpecMixed()
.AddInput("a->c", "cpu")
.AddOutput("c->out", "gpu"));
b.AddOutput("b->out_gpu");
b.AddOutput("c->out_gpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<StreamPolicy::PerBackend> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], std::nullopt);
EXPECT_EQ(assignment[map["b"]], 0);
EXPECT_EQ(assignment[map["c"]], 0);
}

TEST(Exec2Test, StreamAssignment_PerBackend_CPUMixedGPU) {
graph::OpGraph::Builder b;
b.Add("a",
SpecCPU()
.AddOutput("a->b", "cpu")
.AddOutput("a->c", "cpu"));
b.Add("b",
SpecGPU()
.AddInput("a->b", "cpu")
.AddOutput("b->out", "gpu"));
b.Add("c",
SpecMixed()
.AddInput("a->c", "cpu")
.AddOutput("c->out", "gpu"));
b.AddOutput("b->out_gpu");
b.AddOutput("c->out_gpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<StreamPolicy::PerBackend> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], std::nullopt);
EXPECT_EQ(assignment[map["b"]], 1);
EXPECT_EQ(assignment[map["c"]], 0);
}

} // namespace exec2
} // namespace dali

0 comments on commit 22304f4

Please sign in to comment.