Skip to content

Commit

Permalink
Fix bug that mpp_task is not move to cancel thread due to llvm SOO (#…
Browse files Browse the repository at this point in the history
…5637)

ref #5095, close #5638
  • Loading branch information
windtalker authored Aug 17, 2022
1 parent 8404e65 commit 0933d34
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 15 deletions.
26 changes: 16 additions & 10 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,30 @@ void CreatingSetsBlockInputStream::createAll()
}
Stopwatch watch;
auto thread_manager = newThreadManager();
for (auto & subqueries_for_sets : subqueries_for_sets_list)
try
{
for (auto & elem : subqueries_for_sets)
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
for (auto & elem : subqueries_for_sets)
{
if (is_cancelled || is_killed)
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
{
thread_manager->wait();
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return;
if (isCancelledOrThrowIfKilled())
{
thread_manager->wait();
return;
}
thread_manager->schedule(true, "CreatingSets", [this, &item = elem.second] { createOne(item); });
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
thread_manager->schedule(true, "CreatingSets", [this, &item = elem.second] { createOne(item); });
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
}
}
catch (...)
{
thread_manager->wait();
throw;
}

thread_manager->wait();

Expand Down
56 changes: 51 additions & 5 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,34 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std::
return it->second;
}

static LoggerPtr getLogger()
{
static LoggerPtr logger = Logger::get("MPPTaskCancelFunctor");
return logger;
}

class MPPTaskCancelFunctor
{
public:
MPPTaskPtr task;
String reason;
MPPTaskCancelFunctor(const MPPTaskCancelFunctor & other)
: task(other.task)
, reason(other.reason)
{
LOG_FMT_WARNING(getLogger(), "Copy constructor of MPPTaskCancelFunctor called, should not happen");
}
MPPTaskCancelFunctor(MPPTaskCancelFunctor && other) = default;
MPPTaskCancelFunctor(MPPTaskPtr && task_, const String & reason_)
: task(std::move(task_))
, reason(reason_)
{}
void operator()() const
{
task->cancel(reason);
}
};

void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
{
MPPQueryTaskSetPtr task_set;
Expand Down Expand Up @@ -97,14 +125,32 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
LOG_WARNING(log, fmt::format("Begin cancel query: {}", query_id));
FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id);
std::vector<std::function<void()>> cancel_functors;
// TODO: cancel tasks in order rather than issuing so many threads to cancel tasks
auto thread_manager = newThreadManager();
for (auto it = task_set->task_map.begin(); it != task_set->task_map.end();)
try
{
for (auto it = task_set->task_map.begin(); it != task_set->task_map.end();)
{
fmt_buf.fmtAppend("{} ", it->first.toString());
auto current_task = it->second;
it = task_set->task_map.erase(it);
// Note it is not acceptable to destruct `current_task` inside the loop, because destruct a mpp task before all
// other mpp tasks are cancelled may cause some deadlock issues, so `current_task` has to be moved to cancel thread.
// At first, we use std::move to move `current_task` to lambda like this:
// thread_manager->schedule(false, "CancelMPPTask", [task = std::move(current_task), &reason] { task->cancel(reason); });
// However, due to SOO in llvm(https://github.com/llvm/llvm-project/issues/32472), there is still a copy of `current_task`
// remaining in the current scope, as a workaround we add a wrap(MPPTaskCancelFunctor) here to make sure `current_task`
// can be moved to cancel thread. Meanwhile, also save the moved wrap in a vector to guarantee that even if cancel functors
// fail to move due to some other issues, it still does not destruct inside the loop
cancel_functors.push_back(MPPTaskCancelFunctor(std::move(current_task), reason));
thread_manager->schedule(false, "CancelMPPTask", std::move(cancel_functors[cancel_functors.size() - 1]));
}
}
catch (...)
{
fmt_buf.fmtAppend("{} ", it->first.toString());
auto current_task = it->second;
it = task_set->task_map.erase(it);
thread_manager->schedule(false, "CancelMPPTask", [task = std::move(current_task), &reason] { task->cancel(reason); });
thread_manager->wait();
throw;
}
LOG_WARNING(log, fmt_buf.toString());
thread_manager->wait();
Expand Down

0 comments on commit 0933d34

Please sign in to comment.