Skip to content

Commit

Permalink
Add optional lowering to Host IR in FusionKernelRuntime (#3835)
Browse files Browse the repository at this point in the history
This PR adds milestone 2 from Jingyue's document [Initial Integration of
Host IR to
FusionExecutorCache](https://docs.google.com/document/d/1QrRmN27XsVjZu7QrZWJJyRENO50878LC3MvlQY1cRYA/edit?usp=sharing)

It adds an optional codepath in FusionKernelRuntime (FKR) to lower a
fusion to Host IR. When enabled, FKR will create a HostIrEvaluator,
which has a container that all of the fusion's expressions are pushed
to. Expressions lowered to a kernel are launched with the LaunchKernel
HostIr.
  • Loading branch information
nsarka authored Feb 14, 2025
1 parent 1386954 commit bd2c81d
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 37 deletions.
11 changes: 8 additions & 3 deletions csrc/host_ir/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace nvfuser {

namespace hir {

HostIrContainer::HostIrContainer(int64_t num_kernel_executors)
: kernel_executors_(num_kernel_executors) {}

HostIrContainer::~HostIrContainer() = default;

Stream* HostIrContainer::getDefaultStream() {
Expand All @@ -41,12 +44,14 @@ const std::vector<Expr*>& HostIrContainer::topLevelExprs() const {

void HostIrContainer::pushBackTopLevelExprs(Expr* expr) {
assertInContainer(expr, "Cannot add expr, ");
return top_level_exprs_.push_back(expr);
top_level_exprs_.push_back(expr);
}

void HostIrContainer::pushBackKernelExecutor(
void HostIrContainer::setKernelExecutor(
int64_t index,
std::unique_ptr<KernelExecutor> ke) {
return kernel_executors_.push_back(std::move(ke));
NVF_ERROR(kernel_executors_.at(index) == nullptr);
kernel_executors_.at(index) = std::move(ke);
}

KernelExecutor* HostIrContainer::getKernelExecutor(int64_t index) const {
Expand Down
10 changes: 8 additions & 2 deletions csrc/host_ir/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ programs. Later, we it should support non-linear program having a DAG structure.

class HostIrContainer final : public Fusion {
public:
HostIrContainer() = default;
// num_kernel_executors is only needed when the container has LaunchKernel
// instructions.
explicit HostIrContainer(int64_t num_kernel_executors = 0);
HostIrContainer(const HostIrContainer&) = delete;
HostIrContainer& operator=(const HostIrContainer&) = delete;

Expand All @@ -43,7 +45,11 @@ class HostIrContainer final : public Fusion {

void pushBackTopLevelExprs(Expr* expr);

void pushBackKernelExecutor(std::unique_ptr<KernelExecutor> ke);
void setKernelExecutor(int64_t index, std::unique_ptr<KernelExecutor> ke);

bool hasKernelExecutor(int64_t index) const {
return kernel_executors_.at(index) != nullptr;
}

KernelExecutor* getKernelExecutor(int64_t index) const;

Expand Down
34 changes: 26 additions & 8 deletions csrc/host_ir/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,36 @@ HostIrEvaluator::HostIrEvaluator(
expr_evaluator_.bind("numberOfStreams", params_.number_of_streams);
}

std::vector<at::Tensor> HostIrEvaluator::dispatchAndCollectOutputs() {
// Interpret each instruction in an "eager" way by iterate over the Host Ir
// Container's top level expression list
for (auto expr : container_->topLevelExprs()) {
dispatch(expr);
}

// Collect global outputs
return getKnownTensorOrUndefined(container_->outputs(), expr_evaluator_);
}

std::vector<at::Tensor> HostIrEvaluator::runWithInput(
std::unordered_map<Val*, c10::IValue> val_to_IValue) {
// process input values
// process input values, converting IValue to PolymorphicValue
for (const auto& [val, ivalue] : val_to_IValue) {
expr_evaluator_.bind(
val, PolymorphicValue_functions::IValueToPolymorphicValue(ivalue));
}

// Interpret each instruction in an "eager" way by iterate over the Host Ir
// Container's top level expression list
for (auto expr : container_->topLevelExprs()) {
dispatch(expr);
return dispatchAndCollectOutputs();
}

std::vector<at::Tensor> HostIrEvaluator::runWithPolymorphicValues(
std::unordered_map<Val*, const PolymorphicValue*> val_to_PValue) {
// process input values
for (const auto& [val, pvalue] : val_to_PValue) {
expr_evaluator_.bind(val, *pvalue);
}

// Collect global outputs
return getKnownTensorOrUndefined(container_->outputs(), expr_evaluator_);
return dispatchAndCollectOutputs();
}

std::string HostIrEvaluator::canRun() const {
Expand Down Expand Up @@ -315,7 +329,11 @@ void HostIrEvaluator::handle(LaunchKernel* launch_kernel) {

// run the compiled kernel
std::vector<at::Tensor> outputs =
container_->getKernelExecutor(launch_kernel->getIndex())->run(args);
container_->getKernelExecutor(launch_kernel->getIndex())
->run(
args,
launch_kernel->launch_params(),
launch_kernel->compile_params());

// Store the outputs in the context
for (auto output_idx : c10::irange(outputs.size())) {
Expand Down
5 changes: 5 additions & 0 deletions csrc/host_ir/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ class HostIrEvaluator final : public OptOutDispatch {
std::unique_ptr<HostIrContainer> container,
Communicator* communicator = nullptr,
HostIrEvaluatorParams = HostIrEvaluatorParams());

std::vector<at::Tensor> runWithInput(
std::unordered_map<Val*, c10::IValue> val_to_IValue);
std::vector<at::Tensor> runWithPolymorphicValues(
std::unordered_map<Val*, const PolymorphicValue*> val_to_PValue);

const std::vector<Val*>& inputs() {
return container_->inputs();
Expand Down Expand Up @@ -133,6 +136,8 @@ class HostIrEvaluator final : public OptOutDispatch {

c10::cuda::CUDAStream getCUDAStream(Stream* stream);

std::vector<at::Tensor> dispatchAndCollectOutputs();

std::unique_ptr<HostIrContainer> container_;
Communicator* communicator_;
HostIrEvaluatorParams params_;
Expand Down
4 changes: 4 additions & 0 deletions csrc/host_ir/host_ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,14 @@ bool PostOnStream::sameAs(const Statement* other) const {
LaunchKernel::LaunchKernel(
IrBuilderPasskey passkey,
int64_t hic_executor_index,
const LaunchParams& launch_constraints,
const CompileParams& compile_params,
const std::vector<Val*>& inputs,
const std::vector<Val*>& outputs)
: Expr(passkey, inputs, outputs, {}) {
addDataAttribute(hic_executor_index);
addDataAttribute(launch_constraints);
addDataAttribute(compile_params);
}

NVFUSER_DEFINE_CLONE_AND_CREATE(LaunchKernel)
Expand Down
11 changes: 11 additions & 0 deletions csrc/host_ir/host_ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ir/base_nodes.h>
#include <ir/builder.h>
#include <multidevice/communication.h>
#include <scheduler/heuristic.h>
#include <atomic>

namespace nvfuser {
Expand Down Expand Up @@ -123,6 +124,8 @@ class LaunchKernel : public Expr {
int64_t hic_executor_index, // Index into the HostIrContainer's vector of
// KernelExecutors--i.e., the kernel this IR
// should launch
const LaunchParams& launch_constraints,
const CompileParams& compile_params,
const std::vector<Val*>& inputs,
const std::vector<Val*>& outputs);

Expand All @@ -142,6 +145,14 @@ class LaunchKernel : public Expr {
int64_t getIndex() const {
return attribute<int64_t>(0);
}

const auto& launch_params() const {
return attribute<LaunchParams>(1);
}

const auto& compile_params() const {
return attribute<CompileParams>(2);
}
};

class Stream : public Val {
Expand Down
1 change: 1 addition & 0 deletions csrc/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ const std::unordered_map<std::string, EnableOption>& getEnableOptions() {
{"static_fusion_count", EnableOption::StaticFusionCount},
{"wait_debugger", EnableOption::WaitDebugger},
{"warn_register_spill", EnableOption::WarnRegisterSpill},
{"host_ir_lowering", EnableOption::HostIrLowering},
};
return available_options;
}
Expand Down
1 change: 1 addition & 0 deletions csrc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ enum class EnableOption {
WaitDebugger, // Used for debugging multi-GPU. The rank given in the argument
// will wait for `gdb attach` at the start.
WarnRegisterSpill, //! Enable warnings of register spill
HostIrLowering, //! Enable FusionKernelRuntime lowering to host IR
EndOfOption //! Placeholder for counting the number of elements
};

Expand Down
130 changes: 110 additions & 20 deletions csrc/runtime/fusion_kernel_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,15 @@ void FusionKernelRuntime::evictCache(size_t input_id) {
}

bool FusionKernelRuntime::isCompiled() const {
std::lock_guard<std::mutex> guard(mutex_);
return std::all_of(
executors_.begin(), executors_.end(), [](const auto& executor) {
return ExecutorDispatch::isCompiled(executor.get());
});
if (isOptionEnabled(EnableOption::HostIrLowering)) {
return hie_ != nullptr;
} else {
std::lock_guard<std::mutex> guard(mutex_);
return std::all_of(
executors_.begin(), executors_.end(), [](const auto& executor) {
return ExecutorDispatch::isCompiled(executor.get());
});
}
}

flatbuffers::Offset<serde::FusionKernelRuntime> FusionKernelRuntime::serialize(
Expand Down Expand Up @@ -280,6 +284,24 @@ std::vector<at::Tensor> FusionKernelRuntime::runWithInputs(
KernelArgumentHolder& args) {
FUSER_PERF_SCOPE("FusionKernelRuntime::runWithInputs");

if (isOptionEnabled(EnableOption::HostIrLowering)) {
if (isDebugDumpEnabled(DebugDumpOption::PerfDebugVerbose)) {
debug() << "=================RUNNING HOSTIR EVALUATOR================="
<< std::endl;
}

std::unordered_map<Val*, const PolymorphicValue*> tensor_map;
for (const auto i : c10::irange(args.size())) {
tensor_map.emplace(hie_->inputs()[i], args[i]);
}
auto outputs = hie_->runWithPolymorphicValues(tensor_map);
if (isDebugDumpEnabled(DebugDumpOption::PerfDebugVerbose)) {
debug() << "============= FINISHED RUNNING HOSTIR EVALUATOR ============"
<< std::endl;
}
return outputs;
}

if (isDebugDumpEnabled(DebugDumpOption::PerfDebugVerbose)) {
debug() << "=================RUNNING FUSION SEGMENTS================="
<< std::endl;
Expand Down Expand Up @@ -333,6 +355,13 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
FusionProfiler::startCompile();
}

// host ir
std::unique_ptr<hir::HostIrContainer> hic;
if (isOptionEnabled(EnableOption::HostIrLowering)) {
hic = std::make_unique<hir::HostIrContainer>(
num_groups); // Some indices will be empty
}

std::atomic<bool> detect_exception_in_thread_pool{false};
std::string thread_pool_error_message;
std::mutex thread_pool_error_message_mutex;
Expand Down Expand Up @@ -362,21 +391,23 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
c10::cuda::CUDAGuard dg(args.getDeviceIndex());
c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
compileKernel(group_runtime_inputs, group_to_run);
compileKernel(group_runtime_inputs, group_to_run, hic.get());
} else {
hir::HostIrContainer* hic_p = hic.get();
// launch compileKernel thread here
getThreadPool()->run([this,
args,
group_runtime_inputs,
group_to_run,
&detect_exception_in_thread_pool,
&thread_pool_error_message,
&thread_pool_error_message_mutex]() {
&thread_pool_error_message_mutex,
hic_p]() {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
try {
c10::cuda::CUDAGuard dg(args.getDeviceIndex());
c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
compileKernel(group_runtime_inputs, group_to_run);
compileKernel(group_runtime_inputs, group_to_run, hic_p);
} catch (const std::exception& e) {
// Set flag inside lambda so we can throw an exception after thread
// pool completes its work.
Expand All @@ -401,6 +432,39 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
num_live_args_after_segment_runs_.push_back((int64_t)args.size());
}

// add all expressions and compiled kernels to the host ir container
if (isOptionEnabled(EnableOption::HostIrLowering)) {
IrCloner ir_cloner(hic.get());
FusionGuard::setCurFusion(hic.get());
for (int64_t run_order_id = 0; run_order_id < num_groups; ++run_order_id) {
auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
if (hic->hasKernelExecutor(run_order_id)) {
auto in_clone = ir_cloner.clone(group_to_run->inputs());
auto out_clone = ir_cloner.clone(group_to_run->outputs());
auto heuristic_params = schedulers().at(run_order_id).get();
auto launch_kernel = IrBuilder::create<hir::LaunchKernel>(
run_order_id,
heuristic_params->lparams,
heuristic_params->cparams,
std::vector<Val*>{in_clone},
std::vector<Val*>{out_clone});
hic->pushBackTopLevelExprs(launch_kernel);
} else {
// push back segment's exprs into the container as top level expressions
for (auto* expr : group_to_run->exprs()) {
auto cloned_expr = ir_cloner.clone(expr);
hic->pushBackTopLevelExprs(cloned_expr);
}
}
}
for (const Val* in : segmented_fusion_->inputs()) {
hic->addInput(ir_cloner.clone(in));
}
for (const Val* out : segmented_fusion_->outputs()) {
hic->addOutput(ir_cloner.clone(out));
}
}

if (num_groups != 1 && !isOptionDisabled(DisableOption::ParallelCompile)) {
// Wait until all segments finish compiling
getThreadPool()->waitWorkComplete();
Expand All @@ -411,6 +475,12 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
thread_pool_error_message,
"\nUse NVFUSER_DISABLE=parallel_compile to simplify error message.");
}

if (isOptionEnabled(EnableOption::HostIrLowering)) {
hie_ = std::make_unique<hir::HostIrEvaluator>(
hir::HostIrEvaluator(std::move(hic)));
}

if (isProfilerEnabled()) {
FusionProfiler::stopCompile();
}
Expand Down Expand Up @@ -661,7 +731,8 @@ std::vector<at::Tensor> FusionKernelRuntime::runKernelWithInput(

void FusionKernelRuntime::compileKernel(
const KernelArgumentHolder& args,
SegmentedGroup* sg) {
SegmentedGroup* sg,
hir::HostIrContainer* hic) {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileKernel");
auto group_id = sg->groupId();
auto heuristic_params = schedulers().at(group_id).get();
Expand All @@ -684,17 +755,36 @@ void FusionKernelRuntime::compileKernel(
heuristic_params->cparams.index_type.has_value(),
"Kernel index type is not defined.");

// Initialize associated executors
executors_[group_id] = ExecutorDispatch::makeExecutor(
fusion_to_run.get(), fusion_id_, concrete_id_, runtime_id_, group_id);

ExecutorDispatch::compile(
executors_.at(group_id).get(),
fusion_to_run.get(),
args,
heuristic_params->lparams,
heuristic_params->cparams,
heuristic_params->scheduler_type);
if (hic != nullptr) {
// if it's a kernel executor, compile the segment and append to hic
// otherwise, push the segment's exprs directly to the hic
if (!HostIrExecutor::supported(fusion_to_run.get()) &&
!ExprEvalExecutor::supported(fusion_to_run.get())) {
NVF_ERROR(
KernelExecutor::supported(fusion_to_run.get()),
"Fusion not supported by any executor type");
auto ke = std::make_unique<KernelExecutor>();
ke->compile(
fusion_to_run.get(),
args,
heuristic_params->lparams,
heuristic_params->cparams,
heuristic_params->scheduler_type);
hic->setKernelExecutor(group_id, std::move(ke));
}
} else {
// Initialize associated executors
executors_[group_id] = ExecutorDispatch::makeExecutor(
fusion_to_run.get(), fusion_id_, concrete_id_, runtime_id_, group_id);

ExecutorDispatch::compile(
executors_.at(group_id).get(),
fusion_to_run.get(),
args,
heuristic_params->lparams,
heuristic_params->cparams,
heuristic_params->scheduler_type);
}
}

std::pair<LaunchParams, CompileParams> FusionKernelRuntime::getKernelConfig(
Expand Down
Loading

0 comments on commit bd2c81d

Please sign in to comment.