From 2a27434104f5822c65f06622bc2f1cbb7556b0a6 Mon Sep 17 00:00:00 2001 From: Andrey Lebedev Date: Wed, 13 Oct 2021 12:52:40 +0200 Subject: [PATCH] Scheduler supports multiple requirements for task or collection Resolves GH-395 --- ReleaseNotes.md | 1 + dds-commander/src/Scheduler.cpp | 198 +++++++++--------- dds-commander/src/Scheduler.h | 25 ++- dds-commander/tests/CMakeLists.txt | 1 + dds-commander/tests/TestScheduler.cpp | 33 +++ .../tests/topology_scheduler_test_1.xml | 5 + .../tests/topology_scheduler_test_3.xml | 56 +++++ 7 files changed, 206 insertions(+), 113 deletions(-) create mode 100644 dds-commander/tests/topology_scheduler_test_3.xml diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 096ab7f5..cb638eef 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -31,6 +31,7 @@ Added: new std::istream based APIs. Added: new CTopology::getRuntimeTask and CTopology::getRuntimeCollection methods which take either ID or runtime path as input. Added: task ID to STopoRuntimeTask and collection ID to STopoRuntimeCollection. Added: On topology upodate/stop a task done event is now only sent when all processes of the given task are actually exited or killed. (GH-360) +Added: scheduler supports multiple requirements for task or collection. (GH-395) ### dds-session Modified: improved default SID storage and handling. (GH-318) diff --git a/dds-commander/src/Scheduler.cpp b/dds-commander/src/Scheduler.cpp index c93b137c..6f1d08c8 100644 --- a/dds-commander/src/Scheduler.cpp +++ b/dds-commander/src/Scheduler.cpp @@ -4,10 +4,12 @@ // #include "Scheduler.h" #include "TimeMeasure.h" +// STD #include #include // BOOST #include "boost/range/adaptor/map.hpp" +#include #include #include @@ -51,7 +53,7 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology, { m_schedule.clear(); - size_t nofChannels = _channels.size(); + size_t nofChannels{ _channels.size() }; // Map pair to vector of channel indeces. // This is needed in order to reduce number of regex matches and speed up scheduling. hostToChannelMap_t hostToChannelMap; @@ -62,8 +64,8 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology, if (ptr == nullptr) continue; - SAgentInfo& info = ptr->getAgentInfo(); - SSlotInfo& slot = info.getSlotByID(v.m_protocolHeaderID); + SAgentInfo& info{ ptr->getAgentInfo() }; + SSlotInfo& slot{ info.getSlotByID(v.m_protocolHeaderID) }; // Only idle DDS agents if (slot.m_state != EAgentState::idle) continue; @@ -72,13 +74,10 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology, hostToChannelMap[make_tuple(info.m_id, hostInfo.m_host, hostInfo.m_workerId)].push_back(iChannel); } - // TODO: before scheduling the collections we have to sort them by a number of tasks in the collection. - // TODO: for the moment we are not able to schedule collection without requirements. - // Collect all tasks that belong to collections set tasksInCollections; CollectionMap_t collectionMap; - STopoRuntimeCollection::FilterIteratorPair_t collections = _topology.getRuntimeCollectionIterator(); + auto collections{ _topology.getRuntimeCollectionIterator() }; for (auto it = collections.first; it != collections.second; it++) { // Only collections that were added has to be scheduled @@ -90,26 +89,38 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology, collectionMap[it->second.m_idToRuntimeTaskMap.size()].push_back(it->first); } + // Maximum number of requirements per task or collection + size_t maxRequirements{ 0 }; + auto tasks{ _topology.getRuntimeTaskIterator() }; + for (auto it = tasks.first; it != tasks.second; it++) + { + maxRequirements = max(maxRequirements, it->second.m_task->getNofRequirements()); + } + for (auto it = collections.first; it != collections.second; it++) + { + maxRequirements = max(maxRequirements, it->second.m_collection->getNofRequirements()); + } + // Counters of number of instances of tasks/collections on a host. hostCounterMap_t taskHostCounter; hostCounterMap_t collectionHostCounter; set scheduledTasks; - scheduleCollections( - _topology, _channels, hostToChannelMap, scheduledTasks, collectionMap, true, collectionHostCounter); - scheduleTasks( - _topology, _channels, hostToChannelMap, scheduledTasks, tasksInCollections, true, _addedTasks, taskHostCounter); - scheduleCollections( - _topology, _channels, hostToChannelMap, scheduledTasks, collectionMap, false, collectionHostCounter); - scheduleTasks(_topology, - _channels, - hostToChannelMap, - scheduledTasks, - tasksInCollections, - false, - _addedTasks, - taskHostCounter); + // First schedule collections and tasks with more requirements + for (int i = maxRequirements; i >= 0; i--) + { + scheduleCollections( + _topology, _channels, hostToChannelMap, scheduledTasks, collectionMap, i, collectionHostCounter); + scheduleTasks(_topology, + _channels, + hostToChannelMap, + scheduledTasks, + tasksInCollections, + i, + _addedTasks, + taskHostCounter); + } size_t totalNofTasks = (_addedTasks == nullptr) ? _topology.getMainGroup()->getTotalNofTasks() : _addedTasks->size(); @@ -130,14 +141,14 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology, hostToChannelMap_t& _hostToChannelMap, set& _scheduledTasks, const set& _tasksInCollections, - bool useRequirement, + size_t _numRequirements, const CTopoCore::IdSet_t* _addedTasks, hostCounterMap_t& _hostCounterMap) { - STopoRuntimeTask::FilterIteratorPair_t tasks = _topology.getRuntimeTaskIterator(); + STopoRuntimeTask::FilterIteratorPair_t tasks{ _topology.getRuntimeTaskIterator() }; for (auto it = tasks.first; it != tasks.second; it++) { - uint64_t id = it->first; + uint64_t id{ it->first }; // Check if tasks is in the added tasks if (_addedTasks != nullptr && _addedTasks->find(it->first) == _addedTasks->end()) @@ -151,22 +162,9 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology, if (_scheduledTasks.find(id) != _scheduledTasks.end()) continue; - CTopoTask::Ptr_t task = it->second.m_task; - - // First path only for tasks with requirements; - // Second path for tasks without requirements. + CTopoTask::Ptr_t task{ it->second.m_task }; - // SSH scheduler doesn't support multiple requirements - if (task->getNofRequirements() > 1) - { - stringstream ss; - ss << "Unable to schedule task <" << id << "> with path " << task->getPath() - << ": SSH scheduler doesn't support multiple requirements."; - throw runtime_error(ss.str()); - } - - CTopoRequirement::Ptr_t requirement{ (task->getNofRequirements() == 1) ? task->getRequirements()[0] : nullptr }; - if ((useRequirement && requirement == nullptr) || (!useRequirement && requirement != nullptr)) + if (task->getNofRequirements() != _numRequirements) continue; bool taskAssigned{ false }; @@ -175,8 +173,8 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology, { const string hostName{ std::get<1>(v.first) }; const string wnName{ std::get<2>(v.first) }; - const bool requirementOk{ checkRequirement( - requirement, useRequirement, hostName, wnName, task->getName(), _hostCounterMap) }; + const bool requirementOk{ checkRequirements( + task->getRequirements(), hostName, wnName, task->getName(), _hostCounterMap) }; if (requirementOk) { if (!v.second.empty()) @@ -205,10 +203,18 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology, { LOG(debug) << toString(); stringstream ss; - string requirementStr = (useRequirement) - ? ("Requirement " + requirement->getName() + " couldn't be satisfied.") - : "Not enough worker nodes."; - ss << "Unable to schedule task <" << id << "> with path " << task->getPath() << ".\n" << requirementStr; + string requirementStr{ "Not enough worker nodes." }; + if (_numRequirements > 0) + { + vector strs; + transform(task->getRequirements().begin(), + task->getRequirements().end(), + back_inserter(strs), + [](const CTopoRequirement::Ptr_t _requirement) -> string + { return _requirement->toString(); }); + requirementStr = "Cannot satisfy requirements (" + boost::algorithm::join(strs, ", ") + ")."; + } + ss << "Unable to schedule task <" << id << "> with path " << task->getPath() << ": " << requirementStr; throw runtime_error(ss.str()); } } @@ -219,7 +225,7 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology, hostToChannelMap_t& _hostToChannelMap, set& _scheduledTasks, const CollectionMap_t& _collectionMap, - bool useRequirement, + size_t _numRequirements, hostCounterMap_t& _hostCounterMap) { for (const auto& it_col : _collectionMap) @@ -229,22 +235,7 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology, const STopoRuntimeCollection& collectionInfo = _topology.getRuntimeCollectionById(id); auto collection{ collectionInfo.m_collection }; - // First path only for collections with requirements; - // Second path for collections without requirements. - - // SSH scheduler doesn't support multiple requirements - if (collection->getNofRequirements() > 1) - { - stringstream ss; - ss << "Unable to schedule collection <" << id << "> with path " << collection->getPath() - << ": SSH scheduler doesn't support multiple requirements."; - throw runtime_error(ss.str()); - } - - CTopoRequirement::Ptr_t requirement{ (collection->getNofRequirements() == 1) - ? collection->getRequirements()[0] - : nullptr }; - if ((useRequirement && requirement == nullptr) || (!useRequirement && requirement != nullptr)) + if (collection->getNofRequirements() != _numRequirements) continue; bool collectionAssigned{ false }; @@ -253,8 +244,8 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology, { const string hostName{ std::get<1>(v.first) }; const string wnName{ std::get<2>(v.first) }; - const bool requirementOk{ checkRequirement( - requirement, useRequirement, hostName, wnName, collection->getName(), _hostCounterMap) }; + const bool requirementOk{ checkRequirements( + collection->getRequirements(), hostName, wnName, collection->getName(), _hostCounterMap) }; if ((v.second.size() >= collectionInfo.m_collection->getNofTasks()) && requirementOk) { const STopoRuntimeCollection& collectionInfo{ _topology.getRuntimeCollectionById(id) }; @@ -296,57 +287,64 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology, } } -bool CScheduler::checkRequirement(CTopoRequirement::Ptr_t _requirement, - bool _useRequirement, - const string& _hostName, - const string& _wnName, - const string& _elementName, - hostCounterMap_t& _hostCounterMap) const +bool CScheduler::checkRequirements(const topology_api::CTopoRequirement::PtrVector_t& _requirements, + const string& _hostName, + const string& _wnName, + const string& _elementName, + hostCounterMap_t& _hostCounterMap) const { - if (!_useRequirement) + if (_requirements.empty()) return true; using EType = CTopoRequirement::EType; - const auto type{ _requirement->getRequirementType() }; - if (type == EType::WnName && _wnName.empty()) + bool result{ false }; + for (const auto& requirement : _requirements) { - LOG(warning) << "Requirement of type WnName is not supported for this RMS plug-in. Requirement: " - << _requirement->toString(); - return true; - } + const auto type{ requirement->getRequirementType() }; - if (type == EType::WnName || type == EType::HostName) - { - return CScheduler::hostPatternMatches(_requirement->getValue(), - (type == EType::HostName) ? _hostName : _wnName); - } - else if (type == EType::MaxInstancesPerHost) - { - try + if (type == EType::WnName && _wnName.empty()) + { + LOG(warning) << "Requirement of type WnName is not supported for this RMS plug-in. Requirement: " + << requirement->toString(); + result = true; + } + else if (type == EType::WnName || type == EType::HostName) + { + result = CScheduler::hostPatternMatches(requirement->getValue(), + (type == EType::HostName) ? _hostName : _wnName); + } + else if (type == EType::MaxInstancesPerHost) { - size_t value = boost::lexical_cast(_requirement->getValue()); - const auto key{ make_pair(_hostName, _elementName) }; - auto it = _hostCounterMap.find(key); - if (it != _hostCounterMap.end()) + try { - return it->second < value; + size_t value = boost::lexical_cast(requirement->getValue()); + const auto key{ make_pair(_hostName, _elementName) }; + auto it = _hostCounterMap.find(key); + if (it != _hostCounterMap.end()) + { + result = it->second < value; + } + else + { + result = true; + } } - else + catch (boost::bad_lexical_cast&) { - return true; + stringstream ss; + ss << "Unable to satisfy the requirement " << requirement->getName() << ". Value " + << requirement->getValue() << " must be a positive number."; + throw runtime_error(ss.str()); } } - catch (boost::bad_lexical_cast&) - { - stringstream ss; - ss << "Unable to satisfy the requirement " << _requirement->getName() << ". Value " - << _requirement->getValue() << " must be a positive number."; - throw runtime_error(ss.str()); - } + + // All requirements must be satisfied at the same time + if (!result) + break; } - return false; + return result; } const CScheduler::ScheduleVector_t& CScheduler::getSchedule() const diff --git a/dds-commander/src/Scheduler.h b/dds-commander/src/Scheduler.h index 28a387bc..ea165170 100644 --- a/dds-commander/src/Scheduler.h +++ b/dds-commander/src/Scheduler.h @@ -32,15 +32,15 @@ namespace dds dds::protocol_api::SWeakChannelInfo m_weakChannelInfo; }; - typedef std::vector ScheduleVector_t; - typedef std::map, std::greater> CollectionMap_t; - typedef std::vector> weakChannelInfoVector_t; + using ScheduleVector_t = std::vector; + using CollectionMap_t = std::map, std::greater>; + using weakChannelInfoVector_t = std::vector>; private: // Map tuple to vector of channel indeces. - typedef std::map, std::vector> hostToChannelMap_t; + using hostToChannelMap_t = std::map, std::vector>; // Map pair to counter. - typedef std::map, size_t> hostCounterMap_t; + using hostCounterMap_t = std::map, size_t>; public: CScheduler(); @@ -70,7 +70,7 @@ namespace dds hostToChannelMap_t& _hostToChannelMap, std::set& _scheduledTasks, const CollectionMap_t& _collectionMap, - bool useRequirement, + size_t _numRequirements, hostCounterMap_t& _hostCounterMap); void scheduleTasks(const topology_api::CTopoCore& _topology, @@ -78,16 +78,15 @@ namespace dds hostToChannelMap_t& _hostToChannelMap, std::set& _scheduledTasks, const std::set& _tasksInCollections, - bool useRequirement, + size_t _numRequirements, const topology_api::CTopoCore::IdSet_t* _addedTasks, hostCounterMap_t& _hostCounterMap); - bool checkRequirement(topology_api::CTopoRequirement::Ptr_t _requirement, - bool _useRequirement, - const std::string& _hostName, - const std::string& _wnName, - const std::string& _elementName, - hostCounterMap_t& _hostCounterMap) const; + bool checkRequirements(const topology_api::CTopoRequirement::PtrVector_t& _requirements, + const std::string& _hostName, + const std::string& _wnName, + const std::string& _elementName, + hostCounterMap_t& _hostCounterMap) const; private: ScheduleVector_t m_schedule; diff --git a/dds-commander/tests/CMakeLists.txt b/dds-commander/tests/CMakeLists.txt index c0f62d1f..a5b4eb8e 100644 --- a/dds-commander/tests/CMakeLists.txt +++ b/dds-commander/tests/CMakeLists.txt @@ -41,6 +41,7 @@ if(BUILD_TESTS) install(FILES topology_scheduler_test_1.xml topology_scheduler_test_2.xml + topology_scheduler_test_3.xml DESTINATION "${PROJECT_INSTALL_TESTS}" ) endif() diff --git a/dds-commander/tests/TestScheduler.cpp b/dds-commander/tests/TestScheduler.cpp index f9b6fc4f..75a4deed 100644 --- a/dds-commander/tests/TestScheduler.cpp +++ b/dds-commander/tests/TestScheduler.cpp @@ -185,6 +185,39 @@ BOOST_AUTO_TEST_CASE(test_dds_scheduler_2) cout << scheduler.toString(); } +BOOST_AUTO_TEST_CASE(test_dds_scheduler_3) +{ + boost::asio::io_context io_context; + + CTopoCore topology; + topology.init("topology_scheduler_test_3.xml"); + + CConnectionManager::channelInfo_t::container_t agents; + + // Calibration node + make_agent(io_context, agents, "host_calib_0", "", 123456, 10); + + // Reco nodes + const size_t numAgents{ 10 }; + const size_t numSlotsPerAgent{ 30 }; + for (size_t ia = 0; ia < numAgents; ++ia) + { + const string host{ "host_reco_" + to_string(ia) }; + const uint64_t agentID{ ia + 1 }; + make_agent(io_context, agents, host, "", agentID, numSlotsPerAgent); + } + + using weak_t = CConnectionManager::weakChannelInfo_t; + weak_t::container_t weakAgents; + std::transform( + agents.begin(), agents.end(), std::back_inserter(weakAgents), [](const auto& _v) -> auto { + return weak_t(_v.m_channel, _v.m_protocolHeaderID, _v.m_isSlot); + }); + + CScheduler scheduler; + BOOST_CHECK_NO_THROW(scheduler.makeSchedule(topology, weakAgents)); +} + BOOST_AUTO_TEST_CASE(test_dds_scheduler_host_pattern_matches) { BOOST_CHECK(CScheduler::hostPatternMatches(".+.gsi.de", "dds.gsi.de") == true); diff --git a/dds-commander/tests/topology_scheduler_test_1.xml b/dds-commander/tests/topology_scheduler_test_1.xml index 8e0a3893..032fbfbd 100644 --- a/dds-commander/tests/topology_scheduler_test_1.xml +++ b/dds-commander/tests/topology_scheduler_test_1.xml @@ -5,11 +5,14 @@ + + test_task.exe requirement1 + requirementi @@ -17,6 +20,7 @@ test_task.exe requirement2 + requirementi @@ -24,6 +28,7 @@ test_task.exe requirement3 + requirementi diff --git a/dds-commander/tests/topology_scheduler_test_3.xml b/dds-commander/tests/topology_scheduler_test_3.xml new file mode 100644 index 00000000..979540c8 --- /dev/null +++ b/dds-commander/tests/topology_scheduler_test_3.xml @@ -0,0 +1,56 @@ + + + + + + + + test_task.exe + + + + test_task.exe + + + + test_task.exe + + + + test_task.exe + + + + test_task.exe + + + + + CalibHostRequirement + + + CalibTask1 + CalibTask2 + + + + + + RecoHostRequirement + RecoInstanceRequirement + + + RecoTask1 + RecoTask2 + RecoTask3 + + + +
+ CalibCollection + + RecoCollection + +
+ +