Skip to content

Commit

Permalink
Scheduler supports multiple requirements for task or collection
Browse files Browse the repository at this point in the history
Resolves GH-395
  • Loading branch information
AndreyLebedev authored and AnarManafov committed Oct 13, 2021
1 parent b36525d commit 2a27434
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 113 deletions.
1 change: 1 addition & 0 deletions ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Added: new std::istream based APIs.
Added: new CTopology::getRuntimeTask and CTopology::getRuntimeCollection methods which take either ID or runtime path as input.
Added: task ID to STopoRuntimeTask and collection ID to STopoRuntimeCollection.
Added: On topology upodate/stop a task done event is now only sent when all processes of the given task are actually exited or killed. (GH-360)
Added: scheduler supports multiple requirements for task or collection. (GH-395)

### dds-session
Modified: improved default SID storage and handling. (GH-318)
Expand Down
198 changes: 98 additions & 100 deletions dds-commander/src/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
//
#include "Scheduler.h"
#include "TimeMeasure.h"
// STD
#include <iomanip>
#include <set>
// BOOST
#include "boost/range/adaptor/map.hpp"
#include <boost/algorithm/string/join.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <boost/regex.hpp>

Expand Down Expand Up @@ -51,7 +53,7 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology,
{
m_schedule.clear();

size_t nofChannels = _channels.size();
size_t nofChannels{ _channels.size() };
// Map pair<host name, worker id> to vector of channel indeces.
// This is needed in order to reduce number of regex matches and speed up scheduling.
hostToChannelMap_t hostToChannelMap;
Expand All @@ -62,8 +64,8 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology,
if (ptr == nullptr)
continue;

SAgentInfo& info = ptr->getAgentInfo();
SSlotInfo& slot = info.getSlotByID(v.m_protocolHeaderID);
SAgentInfo& info{ ptr->getAgentInfo() };
SSlotInfo& slot{ info.getSlotByID(v.m_protocolHeaderID) };
// Only idle DDS agents
if (slot.m_state != EAgentState::idle)
continue;
Expand All @@ -72,13 +74,10 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology,
hostToChannelMap[make_tuple(info.m_id, hostInfo.m_host, hostInfo.m_workerId)].push_back(iChannel);
}

// TODO: before scheduling the collections we have to sort them by a number of tasks in the collection.
// TODO: for the moment we are not able to schedule collection without requirements.

// Collect all tasks that belong to collections
set<uint64_t> tasksInCollections;
CollectionMap_t collectionMap;
STopoRuntimeCollection::FilterIteratorPair_t collections = _topology.getRuntimeCollectionIterator();
auto collections{ _topology.getRuntimeCollectionIterator() };
for (auto it = collections.first; it != collections.second; it++)
{
// Only collections that were added has to be scheduled
Expand All @@ -90,26 +89,38 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology,
collectionMap[it->second.m_idToRuntimeTaskMap.size()].push_back(it->first);
}

// Maximum number of requirements per task or collection
size_t maxRequirements{ 0 };
auto tasks{ _topology.getRuntimeTaskIterator() };
for (auto it = tasks.first; it != tasks.second; it++)
{
maxRequirements = max(maxRequirements, it->second.m_task->getNofRequirements());
}
for (auto it = collections.first; it != collections.second; it++)
{
maxRequirements = max(maxRequirements, it->second.m_collection->getNofRequirements());
}

// Counters of number of instances of tasks/collections on a host.
hostCounterMap_t taskHostCounter;
hostCounterMap_t collectionHostCounter;

set<uint64_t> scheduledTasks;

scheduleCollections(
_topology, _channels, hostToChannelMap, scheduledTasks, collectionMap, true, collectionHostCounter);
scheduleTasks(
_topology, _channels, hostToChannelMap, scheduledTasks, tasksInCollections, true, _addedTasks, taskHostCounter);
scheduleCollections(
_topology, _channels, hostToChannelMap, scheduledTasks, collectionMap, false, collectionHostCounter);
scheduleTasks(_topology,
_channels,
hostToChannelMap,
scheduledTasks,
tasksInCollections,
false,
_addedTasks,
taskHostCounter);
// First schedule collections and tasks with more requirements
for (int i = maxRequirements; i >= 0; i--)
{
scheduleCollections(
_topology, _channels, hostToChannelMap, scheduledTasks, collectionMap, i, collectionHostCounter);
scheduleTasks(_topology,
_channels,
hostToChannelMap,
scheduledTasks,
tasksInCollections,
i,
_addedTasks,
taskHostCounter);
}

size_t totalNofTasks =
(_addedTasks == nullptr) ? _topology.getMainGroup()->getTotalNofTasks() : _addedTasks->size();
Expand All @@ -130,14 +141,14 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology,
hostToChannelMap_t& _hostToChannelMap,
set<uint64_t>& _scheduledTasks,
const set<uint64_t>& _tasksInCollections,
bool useRequirement,
size_t _numRequirements,
const CTopoCore::IdSet_t* _addedTasks,
hostCounterMap_t& _hostCounterMap)
{
STopoRuntimeTask::FilterIteratorPair_t tasks = _topology.getRuntimeTaskIterator();
STopoRuntimeTask::FilterIteratorPair_t tasks{ _topology.getRuntimeTaskIterator() };
for (auto it = tasks.first; it != tasks.second; it++)
{
uint64_t id = it->first;
uint64_t id{ it->first };

// Check if tasks is in the added tasks
if (_addedTasks != nullptr && _addedTasks->find(it->first) == _addedTasks->end())
Expand All @@ -151,22 +162,9 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology,
if (_scheduledTasks.find(id) != _scheduledTasks.end())
continue;

CTopoTask::Ptr_t task = it->second.m_task;

// First path only for tasks with requirements;
// Second path for tasks without requirements.
CTopoTask::Ptr_t task{ it->second.m_task };

// SSH scheduler doesn't support multiple requirements
if (task->getNofRequirements() > 1)
{
stringstream ss;
ss << "Unable to schedule task <" << id << "> with path " << task->getPath()
<< ": SSH scheduler doesn't support multiple requirements.";
throw runtime_error(ss.str());
}

CTopoRequirement::Ptr_t requirement{ (task->getNofRequirements() == 1) ? task->getRequirements()[0] : nullptr };
if ((useRequirement && requirement == nullptr) || (!useRequirement && requirement != nullptr))
if (task->getNofRequirements() != _numRequirements)
continue;

bool taskAssigned{ false };
Expand All @@ -175,8 +173,8 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology,
{
const string hostName{ std::get<1>(v.first) };
const string wnName{ std::get<2>(v.first) };
const bool requirementOk{ checkRequirement(
requirement, useRequirement, hostName, wnName, task->getName(), _hostCounterMap) };
const bool requirementOk{ checkRequirements(
task->getRequirements(), hostName, wnName, task->getName(), _hostCounterMap) };
if (requirementOk)
{
if (!v.second.empty())
Expand Down Expand Up @@ -205,10 +203,18 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology,
{
LOG(debug) << toString();
stringstream ss;
string requirementStr = (useRequirement)
? ("Requirement " + requirement->getName() + " couldn't be satisfied.")
: "Not enough worker nodes.";
ss << "Unable to schedule task <" << id << "> with path " << task->getPath() << ".\n" << requirementStr;
string requirementStr{ "Not enough worker nodes." };
if (_numRequirements > 0)
{
vector<string> strs;
transform(task->getRequirements().begin(),
task->getRequirements().end(),
back_inserter(strs),
[](const CTopoRequirement::Ptr_t _requirement) -> string
{ return _requirement->toString(); });
requirementStr = "Cannot satisfy requirements (" + boost::algorithm::join(strs, ", ") + ").";
}
ss << "Unable to schedule task <" << id << "> with path " << task->getPath() << ": " << requirementStr;
throw runtime_error(ss.str());
}
}
Expand All @@ -219,7 +225,7 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology,
hostToChannelMap_t& _hostToChannelMap,
set<uint64_t>& _scheduledTasks,
const CollectionMap_t& _collectionMap,
bool useRequirement,
size_t _numRequirements,
hostCounterMap_t& _hostCounterMap)
{
for (const auto& it_col : _collectionMap)
Expand All @@ -229,22 +235,7 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology,
const STopoRuntimeCollection& collectionInfo = _topology.getRuntimeCollectionById(id);
auto collection{ collectionInfo.m_collection };

// First path only for collections with requirements;
// Second path for collections without requirements.

// SSH scheduler doesn't support multiple requirements
if (collection->getNofRequirements() > 1)
{
stringstream ss;
ss << "Unable to schedule collection <" << id << "> with path " << collection->getPath()
<< ": SSH scheduler doesn't support multiple requirements.";
throw runtime_error(ss.str());
}

CTopoRequirement::Ptr_t requirement{ (collection->getNofRequirements() == 1)
? collection->getRequirements()[0]
: nullptr };
if ((useRequirement && requirement == nullptr) || (!useRequirement && requirement != nullptr))
if (collection->getNofRequirements() != _numRequirements)
continue;

bool collectionAssigned{ false };
Expand All @@ -253,8 +244,8 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology,
{
const string hostName{ std::get<1>(v.first) };
const string wnName{ std::get<2>(v.first) };
const bool requirementOk{ checkRequirement(
requirement, useRequirement, hostName, wnName, collection->getName(), _hostCounterMap) };
const bool requirementOk{ checkRequirements(
collection->getRequirements(), hostName, wnName, collection->getName(), _hostCounterMap) };
if ((v.second.size() >= collectionInfo.m_collection->getNofTasks()) && requirementOk)
{
const STopoRuntimeCollection& collectionInfo{ _topology.getRuntimeCollectionById(id) };
Expand Down Expand Up @@ -296,57 +287,64 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology,
}
}

bool CScheduler::checkRequirement(CTopoRequirement::Ptr_t _requirement,
bool _useRequirement,
const string& _hostName,
const string& _wnName,
const string& _elementName,
hostCounterMap_t& _hostCounterMap) const
bool CScheduler::checkRequirements(const topology_api::CTopoRequirement::PtrVector_t& _requirements,
const string& _hostName,
const string& _wnName,
const string& _elementName,
hostCounterMap_t& _hostCounterMap) const
{
if (!_useRequirement)
if (_requirements.empty())
return true;

using EType = CTopoRequirement::EType;
const auto type{ _requirement->getRequirementType() };

if (type == EType::WnName && _wnName.empty())
bool result{ false };
for (const auto& requirement : _requirements)
{
LOG(warning) << "Requirement of type WnName is not supported for this RMS plug-in. Requirement: "
<< _requirement->toString();
return true;
}
const auto type{ requirement->getRequirementType() };

if (type == EType::WnName || type == EType::HostName)
{
return CScheduler::hostPatternMatches(_requirement->getValue(),
(type == EType::HostName) ? _hostName : _wnName);
}
else if (type == EType::MaxInstancesPerHost)
{
try
if (type == EType::WnName && _wnName.empty())
{
LOG(warning) << "Requirement of type WnName is not supported for this RMS plug-in. Requirement: "
<< requirement->toString();
result = true;
}
else if (type == EType::WnName || type == EType::HostName)
{
result = CScheduler::hostPatternMatches(requirement->getValue(),
(type == EType::HostName) ? _hostName : _wnName);
}
else if (type == EType::MaxInstancesPerHost)
{
size_t value = boost::lexical_cast<size_t>(_requirement->getValue());
const auto key{ make_pair(_hostName, _elementName) };
auto it = _hostCounterMap.find(key);
if (it != _hostCounterMap.end())
try
{
return it->second < value;
size_t value = boost::lexical_cast<size_t>(requirement->getValue());
const auto key{ make_pair(_hostName, _elementName) };
auto it = _hostCounterMap.find(key);
if (it != _hostCounterMap.end())
{
result = it->second < value;
}
else
{
result = true;
}
}
else
catch (boost::bad_lexical_cast&)
{
return true;
stringstream ss;
ss << "Unable to satisfy the requirement " << requirement->getName() << ". Value "
<< requirement->getValue() << " must be a positive number.";
throw runtime_error(ss.str());
}
}
catch (boost::bad_lexical_cast&)
{
stringstream ss;
ss << "Unable to satisfy the requirement " << _requirement->getName() << ". Value "
<< _requirement->getValue() << " must be a positive number.";
throw runtime_error(ss.str());
}

// All requirements must be satisfied at the same time
if (!result)
break;
}

return false;
return result;
}

const CScheduler::ScheduleVector_t& CScheduler::getSchedule() const
Expand Down
25 changes: 12 additions & 13 deletions dds-commander/src/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ namespace dds
dds::protocol_api::SWeakChannelInfo<CAgentChannel> m_weakChannelInfo;
};

typedef std::vector<SSchedule> ScheduleVector_t;
typedef std::map<size_t, std::vector<uint64_t>, std::greater<size_t>> CollectionMap_t;
typedef std::vector<dds::protocol_api::SWeakChannelInfo<CAgentChannel>> weakChannelInfoVector_t;
using ScheduleVector_t = std::vector<SSchedule>;
using CollectionMap_t = std::map<size_t, std::vector<uint64_t>, std::greater<size_t>>;
using weakChannelInfoVector_t = std::vector<dds::protocol_api::SWeakChannelInfo<CAgentChannel>>;

private:
// Map tuple<agent ID, host name, worker id> to vector of channel indeces.
typedef std::map<std::tuple<uint64_t, std::string, std::string>, std::vector<size_t>> hostToChannelMap_t;
using hostToChannelMap_t = std::map<std::tuple<uint64_t, std::string, std::string>, std::vector<size_t>>;
// Map pair<host name, task/collection name> to counter.
typedef std::map<std::pair<std::string, std::string>, size_t> hostCounterMap_t;
using hostCounterMap_t = std::map<std::pair<std::string, std::string>, size_t>;

public:
CScheduler();
Expand Down Expand Up @@ -70,24 +70,23 @@ namespace dds
hostToChannelMap_t& _hostToChannelMap,
std::set<uint64_t>& _scheduledTasks,
const CollectionMap_t& _collectionMap,
bool useRequirement,
size_t _numRequirements,
hostCounterMap_t& _hostCounterMap);

void scheduleTasks(const topology_api::CTopoCore& _topology,
const weakChannelInfoVector_t& _channels,
hostToChannelMap_t& _hostToChannelMap,
std::set<uint64_t>& _scheduledTasks,
const std::set<uint64_t>& _tasksInCollections,
bool useRequirement,
size_t _numRequirements,
const topology_api::CTopoCore::IdSet_t* _addedTasks,
hostCounterMap_t& _hostCounterMap);

bool checkRequirement(topology_api::CTopoRequirement::Ptr_t _requirement,
bool _useRequirement,
const std::string& _hostName,
const std::string& _wnName,
const std::string& _elementName,
hostCounterMap_t& _hostCounterMap) const;
bool checkRequirements(const topology_api::CTopoRequirement::PtrVector_t& _requirements,
const std::string& _hostName,
const std::string& _wnName,
const std::string& _elementName,
hostCounterMap_t& _hostCounterMap) const;

private:
ScheduleVector_t m_schedule;
Expand Down
1 change: 1 addition & 0 deletions dds-commander/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ if(BUILD_TESTS)
install(FILES
topology_scheduler_test_1.xml
topology_scheduler_test_2.xml
topology_scheduler_test_3.xml
DESTINATION "${PROJECT_INSTALL_TESTS}"
)
endif()
Loading

0 comments on commit 2a27434

Please sign in to comment.