diff --git a/ddsrouter_cmake/cmake/test/test_target.cmake b/ddsrouter_cmake/cmake/test/test_target.cmake index 82162512a..5645b95be 100644 --- a/ddsrouter_cmake/cmake/test/test_target.cmake +++ b/ddsrouter_cmake/cmake/test/test_target.cmake @@ -58,15 +58,24 @@ function(add_test_executable TEST_EXECUTABLE_NAME TEST_SOURCES TEST_NAME TEST_LI get_win32_path_dependencies(${TEST_EXECUTABLE_NAME} TEST_FRIENDLY_PATH) - foreach(test_name ${TEST_LIST}) - add_test(NAME ${TEST_NAME}.${test_name} - COMMAND ${TEST_EXECUTABLE_NAME} - --gtest_filter=${TEST_NAME}.${test_name}:**/${TEST_NAME}.${test_name}/**) - - if(TEST_FRIENDLY_PATH) - set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}") - endif(TEST_FRIENDLY_PATH) - endforeach() + if( TEST_LIST ) + # If list of tests is not empty, add each test separatly + foreach(test_name ${TEST_LIST}) + add_test(NAME ${TEST_NAME}.${test_name} + COMMAND ${TEST_EXECUTABLE_NAME} + --gtest_filter=${TEST_NAME}**.${test_name}:**/${TEST_NAME}**.${test_name}/**) + + if(TEST_FRIENDLY_PATH) + set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}") + endif(TEST_FRIENDLY_PATH) + endforeach() + else() + # If no tests are provided, create a single test + message(STATUS "Creating general test ${TEST_NAME}.") + add_test(NAME ${TEST_NAME} + COMMAND ${TEST_EXECUTABLE_NAME}) + endif( TEST_LIST ) + target_compile_definitions(${TEST_EXECUTABLE_NAME} PRIVATE FASTDDS_ENFORCE_LOG_INFO diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/_ThreadPool.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/_ThreadPool.hpp new file mode 100644 index 000000000..add408aae --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/_ThreadPool.hpp @@ -0,0 +1,100 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SlotThreadPool.hpp + * + * This file contains class SlotThreadPool definition. + */ + +#ifndef _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ +#define _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { + +class IThreadPool +{ +public: + void execute(std::unique_ptr&& task){} +}; + +class ITask +{ +public: + void operator() () noexcept; +}; + +class BasicTask : ITask +{ +public: + BasicTask(const std::function* callback_ptr){} + void operator() () noexcept; +}; + +class OwnedTask : ITask +{ +public: + OwnedTask(const std::function& callback_){} + OwnedTask(std::function&& callback_){} + void operator() () noexcept; +}; + +template +class ConnectorOneShotArgs +{ +public: + static void execute(const IThreadPool& tp, const std::function& callback, Args... args){} + static void execute(const IThreadPool& tp, std::function&& callback, Args... args){} +}; +using ConnectorOneShot = ConnectorOneShotArgs<>; + +template +class ConnectorSlotArgs +{ +public: + ConnectorSlotArgs(const IThreadPool& tp, const std::function& callback){} + // ConnectorSlotArgs(const IThreadPool& tp, std::function&& callback){} + void execute(Args...){} +}; +using ConnectorSlot = ConnectorSlotArgs<>; + +int main() +{ + IThreadPool pool; + + ConnectorOneShot::execute(pool, [](){ /* do something */ }); + ConnectorOneShotArgs::execute(pool, [](int x){ /* do something */ }, 3); + ConnectorOneShotArgs::execute(pool, [](std::string s, bool b){ /* do something */ }, std::string("hello"), true); + + ConnectorSlot slot_connector(pool, [](){ /* do something */ }); + slot_connector.execute(); + + ConnectorSlotArgs slot_args(pool, [](int x){}); + slot_args.execute(2); + + ConnectorSlotArgs slot_args_2(pool, std::function([](std::string s, bool b){})); + slot_args_2.execute(std::string("hello"), true); + +} + +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp new file mode 100644 index 000000000..3ab15b825 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp @@ -0,0 +1,48 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +class OneShotConnector +{ +public: + static void execute(IManager* tp, const std::function& callback, Args... args); + static void execute(IManager* tp, std::function&& callback, Args... args); +}; +using SimpleOneShotConnector = OneShotConnector<>; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp new file mode 100644 index 000000000..fa2e5debc --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp @@ -0,0 +1,48 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.hpp + * + * This file contains class Task definition. + */ + +#ifndef _DDSROUTERTHREAD_TASK_TASK_HPP_ +#define _DDSROUTERTHREAD_TASK_TASK_HPP_ + +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +class SlotConnector +{ +public: + ConnectorSlotArgs(const IThreadPool& tp, const std::function& callback); + ConnectorSlotArgs(const IThreadPool& tp, std::function&& callback); + void execute(Args...); +}; +using SimpleSlotConnector = SlotConnector<>; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD_TASK_TASK_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp new file mode 100644 index 000000000..5b6c32ae7 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp @@ -0,0 +1,61 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OneShotConnector.hpp + * + * This file contains class OneShotConnector implementation. + */ + +#pragma once + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +void OneShotConnector::execute( + IManager* tp, + const std::function& callback, + Args... args) +{ + tp->execute( + std::make_unique>( + callback, + args... + ) + ); +} + +template +void OneShotConnector::execute( + IManager* tp, + std::function&& callback, + Args... args) +{ + tp->execute( + std::make_unique>( + std::move(callback), + args... + ) + ); +} + +} /* namespace thread */ +} /* namespace event */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp new file mode 100644 index 000000000..e027e8db2 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp @@ -0,0 +1,68 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file AsyncManager.hpp + * + * This file contains class AsyncManager definition. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +using TasksCollectionType = + Atomicable< + std::vector< + std::pair< + std::unique_ptr, + std::unique_ptr>>>; + +/** + * TODO + */ +class AsyncManager : public IManager +{ +public: + + AsyncManager() = default; + + ~AsyncManager(); + + virtual void execute(std::unique_ptr&& task) override; + + void clean_threads(); + +protected: + + TasksCollectionType tasks_running_; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp new file mode 100644 index 000000000..a093e86f2 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp @@ -0,0 +1,40 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file IManager.hpp + * + * This file contains class SlotThreadPool definition. + */ + +#pragma once + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +class IManager +{ +public: + virtual ~IManager() {}; + virtual void execute(std::unique_ptr&& task) = 0; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp new file mode 100644 index 000000000..c3035026b --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp @@ -0,0 +1,86 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StdThreadPool.hpp + * + * This file contains class StdThreadPool definition. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +/** + * TODO + */ +class StdThreadPool : public IManager +{ +public: + + StdThreadPool( + unsigned int n_threads, + bool start_running = true); + + virtual ~StdThreadPool(); + + void start(); + + void stop(); + + virtual void execute(std::unique_ptr&& task) override; + +protected: + + void thread_routine_(); + + /** + * @brief Double Queue Wait Handler to store task ids + * + * This double queue implement methods \c produce , to add tasks to the queue, and \c consume to wait until any + * task is available, and return the next task available. + * + * It will retrieve tasks in FIFO order. + * Produce and consume methods are not reciprocally blocking. + */ + event::DBQueueWaitHandler> task_queue_; + + /** + * @brief Threads container + * + * @note \c CustomThread are used instead of \c std::thread so some extra logic could be added to threads + * in future implementation (e.g. performance info). + */ + std::vector threads_; + + const unsigned int n_threads_; + +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp new file mode 100644 index 000000000..bc1310ed6 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp @@ -0,0 +1,49 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SyncManager.hpp + * + * This file contains class SyncManager definition. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +/** + * TODO + */ +class SyncManager : public IManager +{ +public: + // virtual void execute(const ITask& task) override; + // virtual void execute(ITask&& task) override; + virtual void execute(std::unique_ptr&& task) override; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp new file mode 100644 index 000000000..e2bf8974c --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp @@ -0,0 +1,84 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ArgsOwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +namespace helper +{ + template + struct index {}; + + template + struct gen_seq : gen_seq {}; + + template + struct gen_seq<0, Ts...> : index {}; +} + +template +class ArgsOwnedTask : public ITask +{ +public: + + ArgsOwnedTask( + const std::function& callback, + Args... args); + + ArgsOwnedTask( + std::function&& callback, + Args... args); + + virtual ~ArgsOwnedTask() = default; + + template + void call_(helper::index) + { + callback_(std::get(args_)...); + } + + virtual void operator()() noexcept override + { + call_(helper::gen_seq{}); + } + +protected: + + std::function callback_; + + std::tuple args_; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp new file mode 100644 index 000000000..ef11f38e6 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp @@ -0,0 +1,41 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ITask.hpp + * + * This file contains class Task definition. + */ + +#ifndef _DDSROUTERTHREAD_THREAD_TASK_ITASK_HPP_ +#define _DDSROUTERTHREAD_THREAD_TASK_ITASK_HPP_ + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +class ITask +{ +public: + virtual ~ITask() {}; + virtual void operator()() noexcept = 0; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD_THREAD_TASK_ITASK_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/task/Task.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp similarity index 71% rename from ddsrouter_utils/include/ddsrouter_utils/thread_pool/task/Task.hpp rename to ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp index c57c1800e..17e12ad04 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/task/Task.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp @@ -13,7 +13,7 @@ // limitations under the License. /** - * @file Task.hpp + * @file OwnedTask.hpp * * This file contains class Task definition. */ @@ -23,21 +23,30 @@ #include +#include + namespace eprosima { namespace ddsrouter { namespace utils { +namespace thread { -/** - * This class represents a task that can be executed by a Thread Pool. - * - * @note this first implementation only uses this class as a \c std::function for simplicity. - * In future implementations, this could be a more complex class. - */ -class Task : public std::function +class OwnedTask : public ITask { - using std::function::function; +public: + + OwnedTask(const std::function& callback); + + OwnedTask(std::function&& callback); + + virtual ~OwnedTask() = default; + + virtual void operator()() noexcept override; + + const std::function callback; }; + +} /* namespace thread */ } /* namespace utils */ } /* namespace ddsrouter */ } /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/task/TaskId.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp similarity index 66% rename from ddsrouter_utils/include/ddsrouter_utils/thread_pool/task/TaskId.hpp rename to ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp index cf00a97e5..4b7e519af 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/task/TaskId.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp @@ -13,36 +13,38 @@ // limitations under the License. /** - * @file TaskId.hpp + * @file ReferenceTask.hpp * * This file contains class Task definition. */ -#ifndef _DDSROUTERTHREAD_TASK_TASKID_HPP_ -#define _DDSROUTERTHREAD_TASK_TASKID_HPP_ - -#include +#pragma once #include +#include + namespace eprosima { namespace ddsrouter { namespace utils { +namespace thread { -//! Type of the task ID. -using TaskId = unsigned int; +class ReferenceTask : public ITask +{ +public: -/** - * @brief Get a new unique task ID. - * - * It uses a random number to generate a new ID. - * - * @return new unique TaskId - */ -DDSROUTER_UTILS_DllAPI TaskId new_unique_task_id(); + ReferenceTask(const std::function* callback_ptr); + virtual ~ReferenceTask() = default; + + virtual void operator()() noexcept override; + + const std::function* callback_ptr; + +}; + + +} /* namespace thread */ } /* namespace utils */ } /* namespace ddsrouter */ } /* namespace eprosima */ - -#endif /* _DDSROUTERTHREAD_TASK_TASKID_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp new file mode 100644 index 000000000..c8c088ffe --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp @@ -0,0 +1,57 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ArgsOwnedTask.ipp + * + * This file contains class OneShotConnector implementation. + */ + +#pragma once + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +ArgsOwnedTask::ArgsOwnedTask( + const std::function& callback, + Args... args) + : callback_(callback) + , args_(std::forward(args)...) +{ +} + +template +ArgsOwnedTask::ArgsOwnedTask( + std::function&& callback, + Args... args) + : callback_(std::move(callback)) + , args_(std::forward(args)...) +{ +} + +// template +// void ArgsOwnedTask::operator()() noexcept +// { +// call_(); +// } + +} /* namespace thread */ +} /* namespace event */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/thread/CustomThread.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp similarity index 86% rename from ddsrouter_utils/include/ddsrouter_utils/thread_pool/thread/CustomThread.hpp rename to ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp index ec068f0f1..359af09f4 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/thread/CustomThread.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp @@ -18,14 +18,16 @@ * This file contains class CustomThread definition. */ -#ifndef _DDSROUTERTHREAD__SRC_CPP_THREAD_CUSTOMTHREAD_HPP_ -#define _DDSROUTERTHREAD__SRC_CPP_THREAD_CUSTOMTHREAD_HPP_ +#pragma once #include +#include + namespace eprosima { namespace ddsrouter { namespace utils { +namespace thread { /** * This class represents a thread that can be executed by a Thread Pool. @@ -35,11 +37,11 @@ namespace utils { */ class CustomThread : public std::thread { +public: using std::thread::thread; }; +} /* namespace thread */ } /* namespace utils */ } /* namespace ddsrouter */ } /* namespace eprosima */ - -#endif /* _DDSROUTERTHREAD__SRC_CPP_THREAD_CUSTOMTHREAD_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp deleted file mode 100644 index fa15909e4..000000000 --- a/ddsrouter_utils/include/ddsrouter_utils/thread_pool/pool/SlotThreadPool.hpp +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file SlotThreadPool.hpp - * - * This file contains class SlotThreadPool definition. - */ - -#ifndef _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ -#define _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ - -#include -#include -#include - -#include -#include -#include -#include -#include - -namespace eprosima { -namespace ddsrouter { -namespace utils { - -/** - * This class represents a thread pool that can register tasks inside. - * - * This is another implementation of \c ThreadPool but with the difference that if does not contain actual - * task objects, but only task ids. These ids are much more efficient than actual task objects in order to copy - * or store them. Each id identifies one and only one task. By adding an id to the queue, the thread that consumes - * it will execute the task associated, that must be previously registered. - * - * @note Qt notation is used for this implementation, so \c emit means to add a task to the queue and - * \c slot means to register a task. - * - * @note This class does not inherit from \c ThreadPool as methods and internal variables are not shared, - * even when both solve the same problem in similar ways. - */ -class SlotThreadPool -{ -public: - - /** - * @brief Construct a new Slot Thread Pool object - * - * This creates the internal threads in the pool and make them wait for tasks. - * Each thread is executed with function \c thread_routine_ . - * - * @param n_threads number of threads in the pool - */ - DDSROUTER_UTILS_DllAPI SlotThreadPool( - const uint32_t n_threads); - - /** - * @brief Destroy the Thread Pool object - * - * It disables the queue, what makes the threads to stop to finish their tasks and exit. - */ - DDSROUTER_UTILS_DllAPI ~SlotThreadPool(); - - /** - * Enable Slot Thread Pool in case it is not enabled - * Does nothing if it is already enabled - */ - DDSROUTER_UTILS_DllAPI void enable() noexcept; - - /** - * Disable Slot Thread Pool in case it is enabled - * Does nothing if it is already disabled - * - * It stops all the threads running, not allowing them to take new tasks. - * It blocks until every thread has finished executing. - * It does not remove tasks from queue. - * - * @todo this is a first approach, a new design should be taken into account to not block until threads finish - * when disabling the thread pool, but joining them afterwards. - */ - DDSROUTER_UTILS_DllAPI void disable() noexcept; - - /** - * @brief Add a task Id (that represents a registered Task) to be executed by the threads in the pool - * - * This add \c task_id to the queue, and the task identified will be executed by the threads in the pool. - * - * @pre \c task_id must identify a registered task. - * - * @param task_id task Id to be added to the queue so task identified is executed. - */ - DDSROUTER_UTILS_DllAPI void emit( - const TaskId& task_id); - - /** - * @brief Register a new task identified by a task Id. - * - * This method registers a new task that will be executed when its task Id is added to the queue. - * - * @param task_id task Id that identifies the task. - * @param task task to be registered. - */ - DDSROUTER_UTILS_DllAPI void slot( - const TaskId& task_id, - Task&& task); - -protected: - - /** - * @brief This is the function that every thread in the pool executes. - * - * This function enters an infinite loop where it \c consume an element from the queue (this means it will - * wait for an element to be added to the queue in case it is empty, and it will take one if any available). - * Once a task id is available, it will get the task refering this id and execute it - * Afterwards it will return to consume another task id. - * This will be repeated until the queue is disabled, what is communicated by a \c DisabledException . - */ - void thread_routine_(); - - unsigned int number_of_threads_; - - /** - * @brief Double Queue Wait Handler to store task ids - * - * This double queue implement methods \c produce , to add tasks to the queue, and \c consume to wait until any - * task is available, and return the next task available. - * - * It will retrieve tasks in FIFO order. - * Produce and consume methods are not reciprocally blocking. - */ - event::DBQueueWaitHandler task_queue_; - - /** - * @brief Threads container - * - * @note \c CustomThread are used instead of \c std::thread so some extra logic could be added to threads - * in future implementation (e.g. performance info). - */ - std::vector threads_; - - /** - * @brief Map of tasks indexed by their task Id. - * - * This object is protected by the \c slots_mutex_ mutex. - */ - std::map slots_; - - //! Protects access to \c slots_ . - std::mutex slots_mutex_; - - //! Whether the object is currently enabled - std::atomic enabled_; - -}; - -} /* namespace utils */ -} /* namespace ddsrouter */ -} /* namespace eprosima */ - -#endif /* _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp b/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp index 64f8a2abc..6bc718173 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp @@ -74,7 +74,6 @@ class ConsumerWaitHandler : protected CounterWaitHandler void produce( T&& value); - /** * @brief Add a new value to the collection. Use copy constructor. * @@ -123,18 +122,11 @@ class ConsumerWaitHandler : protected CounterWaitHandler virtual void add_value_( T&& value) = 0; - - /** - * @brief Method that adds a new value in the collection. Use copy constructor. - * - * This method must be reimplemented in child classes specialized to the internal collection. - * - * This method is called without any mutex taken and afterwards the internal counter is increased by 1. - * - * @param value new value + /* + * NOTE: + * Function add_value_ called with const reference is not available because of weird behaviour of override methods + * in template classes. */ - virtual void add_value_( - const T& value) = 0; /** * @brief Method that gets next available value from the collection diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp b/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp index a1add9237..15802996b 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp @@ -55,10 +55,6 @@ class DBQueueWaitHandler : public ConsumerWaitHandler void add_value_( T&& value) override; - //! Override of ConsumerWaitHandler method to copy a new value into the queue - void add_value_( - const T& value) override; - /** * @brief Override of \c ConsumerWaitHandler method to remove a value from the queue * diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp index 1ee0bf4a5..3b3e15d3d 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp @@ -53,7 +53,7 @@ template void ConsumerWaitHandler::produce( const T& value) { - add_value_(value); + add_value_(T(value)); this->operator ++(); } diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp index b540d4a99..e6bdaf35a 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp @@ -32,14 +32,6 @@ void DBQueueWaitHandler::add_value_( queue_.Push(std::move(value)); } -template -void DBQueueWaitHandler::add_value_( - const T& value) -{ - logDebug(DDSROUTER_WAIT_DBQUEUE, "Copying element to DBQueue."); - queue_.Push(value); -} - template T DBQueueWaitHandler::get_next_value_() { @@ -59,11 +51,11 @@ T DBQueueWaitHandler::get_next_value_() throw utils::InconsistencyException("Empty DBQueue, impossible to get value."); } - // TODO: Do it without copy - auto value = queue_.Front(); + // TODO: Do it with front and pop without copy + auto value = std::move(queue_.Front()); queue_.Pop(); - return value; + return std::move(value); } } /* namespace event */ diff --git a/ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp b/ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp new file mode 100644 index 000000000..38800ff1c --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp @@ -0,0 +1,75 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file AsyncManager.cpp + * + */ + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +AsyncManager::~AsyncManager() +{ + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Closing Async Manager."); + clean_threads(); + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Async Manager closed."); +} + +void AsyncManager::execute(std::unique_ptr&& task) +{ + // Lock mutex + std::unique_lock lock(tasks_running_); + + // Get reference to task + ITask* task_reference = task.get(); + + // Create and Insert task in new index + // Being indexed in map the unique ptr will not be erased + tasks_running_.push_back( + std::make_pair( + std::make_unique( + [task_reference](){ + task_reference->operator()(); + } + ), + std::move(task) + ) + ); + + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "New thread executing task."); +} + +void AsyncManager::clean_threads() +{ + std::unique_lock lock(tasks_running_); + for (auto& task : tasks_running_) + { + task.first->join(); + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Thread finished, removing task and thread."); + } + tasks_running_.clear(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp b/ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp new file mode 100644 index 000000000..bd9621bc9 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp @@ -0,0 +1,136 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StdThreadPool.cpp + * + */ + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +StdThreadPool::StdThreadPool( + unsigned int n_threads, + bool start_running /* = true */) + : task_queue_(0, false) + , threads_() + , n_threads_(n_threads) +{ + if (start_running) + { + start(); + } +} + +StdThreadPool::~StdThreadPool() +{ + stop(); +} + +void StdThreadPool::start() +{ + if (!task_queue_.enabled()) + { + logDebug(DDSROUTER_STDTHREADPOOL, "Starting thread pool."); + + task_queue_.enable(); + + // Execute all threads + for (unsigned int i=0; i&& task) +{ + task_queue_.produce(std::move(task)); +} + +void StdThreadPool::thread_routine_() +{ + logDebug(DDSROUTER_STDTHREADPOOL, "Starting thread routine: " << std::this_thread::get_id() << "."); + + try + { + while (true) + { + logDebug( + DDSROUTER_STDTHREADPOOL, + "Thread: " << std::this_thread::get_id() << " free, getting new callback."); + + // Wait till there is a new task available + auto task = task_queue_.consume(); + + logDebug( + DDSROUTER_STDTHREADPOOL, + "Thread: " << std::this_thread::get_id() << " executing callback."); + + // Executing callback + task->operator()(); + + // NOTE: at this point task would not be further referenced and it will be destroyed. + } + } + catch (const utils::DisabledException& e) + { + logDebug(DDSROUTER_STDTHREADPOOL, "Stopping thread: " << std::this_thread::get_id() << "."); + } +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread_pool/task/TaskId.cpp b/ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp similarity index 78% rename from ddsrouter_utils/src/cpp/thread_pool/task/TaskId.cpp rename to ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp index 86f98e8d4..1f85bf0d5 100644 --- a/ddsrouter_utils/src/cpp/thread_pool/task/TaskId.cpp +++ b/ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp @@ -13,24 +13,23 @@ // limitations under the License. /** - * @file TaskId.cpp + * @file SyncManager.cpp * - * This file contains class TaskId implementation. */ -#include - -#include +#include namespace eprosima { namespace ddsrouter { namespace utils { +namespace thread { -TaskId new_unique_task_id() +void SyncManager::execute(std::unique_ptr&& task) { - return static_cast(std::rand()); + task->operator()(); } +} /* namespace thread */ } /* namespace utils */ } /* namespace ddsrouter */ } /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp b/ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp new file mode 100644 index 000000000..382c78242 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp @@ -0,0 +1,47 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.cpp + * + */ + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +OwnedTask::OwnedTask(const std::function& callback) + : callback(callback) +{ + +} + +OwnedTask::OwnedTask(std::function&& callback) + : callback(std::move(callback)) +{ + +} + +void OwnedTask::operator()() noexcept +{ + callback.operator()(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp b/ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp new file mode 100644 index 000000000..7aa842668 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp @@ -0,0 +1,45 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ReferenceTask.cpp + * + */ + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +ReferenceTask::ReferenceTask(const std::function* callback_ptr) + : callback_ptr(callback_ptr) +{ + if (!callback_ptr) + { + throw InitializationException(STR_ENTRY << "ReferenceTask must be initialized with a valid ptr."); + } +} + +void ReferenceTask::operator()() noexcept +{ + callback_ptr->operator()(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp b/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp deleted file mode 100644 index ff5ea97d0..000000000 --- a/ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file SlotThreadPool.cpp - * - * This file contains class SlotThreadPool implementation. - */ - -#include -#include - -#include - - -namespace eprosima { -namespace ddsrouter { -namespace utils { - -SlotThreadPool::SlotThreadPool( - const uint32_t n_threads) - : number_of_threads_(n_threads) - , enabled_(false) -{ - logDebug(DDSROUTER_THREAD_POOL, "Creating Thread Pool with " << n_threads << " threads."); -} - -SlotThreadPool::~SlotThreadPool() -{ - disable(); - // Disable queue in case it has not been stopped yet. - task_queue_.disable(); - - for (auto& thread : threads_) - { - thread.join(); - } -} - -void SlotThreadPool::enable() noexcept -{ - if (!enabled_.exchange(true)) - { - // Execute threads - for (uint32_t i = 0; i < number_of_threads_; ++i) - { - threads_.emplace_back( - CustomThread( - std::bind(&SlotThreadPool::thread_routine_, this))); - } - } -} - -void SlotThreadPool::disable() noexcept -{ - if (enabled_.exchange(false)) - { - // Disable Task Queue, so threads will stop eventually when their current task is finished - task_queue_.disable(); - - for (auto& thread : threads_) - { - thread.join(); - } - - threads_.clear(); - } -} - -void SlotThreadPool::emit( - const TaskId& task_id) -{ - // Lock to access the slot map - std::lock_guard lock(slots_mutex_); - - auto it = slots_.find(task_id); - - if (it == slots_.end()) - { - throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " not registered."); - } - else - { - task_queue_.produce(it->first); - } -} - -void SlotThreadPool::slot( - const TaskId& task_id, - Task&& task) -{ - // Lock to access the slot map - std::lock_guard lock(slots_mutex_); - - auto it = slots_.find(task_id); - - if (it != slots_.end()) - { - throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " already exists."); - } - else - { - slots_.insert(std::make_pair(task_id, std::move(task))); - } -} - -void SlotThreadPool::thread_routine_() -{ - logDebug(DDSROUTER_THREAD_POOL, "Starting thread routine: " << std::this_thread::get_id() << "."); - - try - { - while (true) - { - logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " free, getting new callback."); - TaskId task_id = task_queue_.consume(); - - // Lock to access the slot map - slots_mutex_.lock(); - - auto it = slots_.find(task_id); - // Check the slot is correct - if (it == slots_.end()) - { - utils::tsnh(STR_ENTRY << "Slot in Queue must be stored in slots register"); - } - - Task& task = it->second; - - slots_mutex_.unlock(); - - logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " executing callback."); - task(); - } - } - catch (const utils::DisabledException& e) - { - logDebug(DDSROUTER_THREAD_POOL, "Stopping thread: " << std::this_thread::get_id() << "."); - } -} - -} /* namespace utils */ -} /* namespace ddsrouter */ -} /* namespace eprosima */ diff --git a/ddsrouter_utils/test/unittest/CMakeLists.txt b/ddsrouter_utils/test/unittest/CMakeLists.txt index b9920d846..b3d669d4b 100644 --- a/ddsrouter_utils/test/unittest/CMakeLists.txt +++ b/ddsrouter_utils/test/unittest/CMakeLists.txt @@ -19,7 +19,7 @@ add_subdirectory(macros) add_subdirectory(math) add_subdirectory(memory) add_subdirectory(return_code) -add_subdirectory(thread_pool) +add_subdirectory(thread) add_subdirectory(time) add_subdirectory(utils) add_subdirectory(wait) diff --git a/ddsrouter_utils/test/unittest/thread/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/CMakeLists.txt new file mode 100644 index 000000000..4a3c0b94a --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Add test subdirectories +add_subdirectory(manager) +add_subdirectory(task) diff --git a/ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt new file mode 100644 index 000000000..e4093cbcf --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt @@ -0,0 +1,90 @@ +# Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################### +# IManager Specializations Test +################################### + +set(TEST_NAME + ParametrizedThreadManagerTest) + +set(TEST_SOURCES + manager_interface_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/SyncManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/AsyncManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + manager_execute + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) + +################################### +# StdThreadPool Test +################################### + +set(TEST_NAME + StdThreadPoolTest) + +set(TEST_SOURCES + std_thread_pool_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + pool_1_threads_1_tasks + pool_1_threads_M_tasks + pool_N_threads_N_tasks + pool_N_threads_NM_tasks + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/ddsrouter_utils/test/unittest/thread/connector/manager_interface_test.cpp b/ddsrouter_utils/test/unittest/thread/connector/manager_interface_test.cpp new file mode 100644 index 000000000..dd5e5f0fa --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/connector/manager_interface_test.cpp @@ -0,0 +1,152 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Task type to use. + * Using \c OwnedTask because it is the simpler and easier one. + */ +using TaskType = utils::thread::OwnedTask; + +template +utils::thread::IManager* create_manager_interface() +{ + return new Manager(); +} + +template <> +utils::thread::IManager* create_manager_interface() +{ + utils::thread::StdThreadPool* pool = new utils::thread::StdThreadPool(DEFAULT_THREADS, false); + pool->start(); + return pool; +} + +} /* namespace eprosima */ + +using namespace eprosima::ddsrouter; + +template +struct ThreadManagerTest : public ::testing::Test +{}; + +TYPED_TEST_SUITE_P(ThreadManagerTest); + +TYPED_TEST_P(ThreadManagerTest, manager_execute) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Create manager + utils::thread::IManager* manager( + test::create_manager_interface()); + + // Execute lambda increasing in 1 + { + auto lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + test::TaskType task(lambda); + manager->execute(std::make_unique(lambda)); + } + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by copy increasing in 1 + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + manager->execute( + std::make_unique( + [&counter, i](){ test::test_lambda_increase_waiter(counter, i); })); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Erase Manager + delete manager; + + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); +} + +// Register test class and test cases +REGISTER_TYPED_TEST_SUITE_P( + ThreadManagerTest, + manager_execute +); + +// Set types used in parametrization +typedef ::testing::Types< + utils::thread::SyncManager, + utils::thread::AsyncManager, + utils::thread::StdThreadPool + > CaseTypes; + +// Generate each test case for each type case +INSTANTIATE_TYPED_TEST_SUITE_P( + ParametrizedThreadManagerTest, + ThreadManagerTest, + CaseTypes); + + +int main( + int argc, + char** argv) +{ + utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread/connector/std_thread_pool_test.cpp b/ddsrouter_utils/test/unittest/thread/connector/std_thread_pool_test.cpp new file mode 100644 index 000000000..85db94c17 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/connector/std_thread_pool_test.cpp @@ -0,0 +1,170 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace test { + +constexpr const utils::Duration_ms DEFAULT_TIME_TEST = 200u; // T +constexpr const utils::Duration_ms RESIDUAL_TIME_TEST = DEFAULT_TIME_TEST / 2u; // dT + +constexpr const uint32_t N_THREADS_IN_TEST = 10; // N +constexpr const uint32_t N_EXECUTIONS_IN_TEST = 20; // M + +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Task type to use. + * Using \c OwnedTask because it is the simpler and easier one. + */ +using TaskType = utils::thread::OwnedTask; + +/** + * TESTS EXPLANATION + * These tests create a StdThreadPool and execute tasks in it. + * + * Tasks: + * Tasks objects used are OwnedTask and are created in the moment to send it to execute, so they will be destroyed + * automatically when finishing the task. + * + * Task function: + * The function used waits for a time T and increases a WaitHandler value the amount of time given by parameter. + * The WaitHandler is used so the test can wait in main thread to the expected value. + * + * Parameteres: + * Two parameters are used within the tests: + * @param n_threads Number of threads + * @param m_tasks Number of repetitions (#tasks added to pool) + * + * @warning if \c m_tasks is not dividible by \c n_threads the test may not work as expected because of + * non exact division solution. + */ +void test_thread_pool_with_parameters( + unsigned int n_threads, + unsigned int m_tasks) +{ + // Create thread_pool + thread::StdThreadPool thread_pool(n_threads, false); + thread_pool.start(); + + // Create timer to know the task has been executed in the time expected + utils::Timer timer; + + // Counter Wait Handler to wait for the task to be executed and check the final value + event::IntWaitHandler waiter(0); + + // Emit N tasks n times + for (uint32_t i = 1; i <= m_tasks; ++i) + { + thread_pool.execute( + std::make_unique( + [&waiter, i] () { test::test_lambda_increase_waiter(waiter, i); } + ) + ); + } + + // Wait for counter value to be greater than 0 (so 1 task is being executed) + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, m_tasks); + waiter.wait_greater_equal_than(target_value); + + auto time_elapsed = timer.elapsed(); + + // Check that the task has been executed in more than the time expected for lambda and less than expected + // time and residual; and that function has been called exactly once + double lower_time_expected = test::DEFAULT_TIME_TEST * std::floor(m_tasks / n_threads); + double higher_time_expected = test::DEFAULT_TIME_TEST * std::ceil(m_tasks / n_threads) + test::RESIDUAL_TIME_TEST; + + ASSERT_GE(time_elapsed, lower_time_expected); + ASSERT_LE(time_elapsed, higher_time_expected); + ASSERT_EQ(waiter.get_value(), target_value); + + // Thread Pool is destroyed automatically and without errors +} + +} /* namespace test */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +using namespace utils; + +/** + * Emit 1 tasks to a ThreadPool with 1 thread. + * Check that time elapsed is > T + */ +TEST(StdThreadPoolTest, pool_1_threads_1_tasks) +{ + test::test_thread_pool_with_parameters(1, 1); +} + +/** + * Emit M tasks to a ThreadPool with 1 thread. + */ +TEST(StdThreadPoolTest, pool_1_threads_M_tasks) +{ + test::test_thread_pool_with_parameters(1, test::N_EXECUTIONS_IN_TEST); +} + +/** + * Emit N tasks to a ThreadPool with N threads. + */ +TEST(StdThreadPoolTest, pool_N_threads_N_tasks) +{ + test::test_thread_pool_with_parameters(test::N_THREADS_IN_TEST, test::N_THREADS_IN_TEST); +} + +/** + * Emit M*N tasks to a ThreadPool with N threads. + */ +TEST(StdThreadPoolTest, pool_N_threads_NM_tasks) +{ + test::test_thread_pool_with_parameters( + test::N_THREADS_IN_TEST, + test::N_THREADS_IN_TEST * test::N_EXECUTIONS_IN_TEST); +} + +int main( + int argc, + char** argv) +{ + // eprosima::ddsxrouter::utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt new file mode 100644 index 000000000..93787d535 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt @@ -0,0 +1,39 @@ + +################################### +# One Shot Connector Test +################################### + +set(TEST_NAME + OneShotConnectorTest) + +set(TEST_SOURCES + one_shot_connector_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + one_shot_test_no_params + one_shot_test_int + one_shot_test_string + one_shot_test_bool_int_string + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/ddsrouter_utils/test/unittest/thread/manager/one_shot_connector_test.cpp b/ddsrouter_utils/test/unittest/thread/manager/one_shot_connector_test.cpp new file mode 100644 index 000000000..bf26d24c9 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/manager/one_shot_connector_test.cpp @@ -0,0 +1,350 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +void test_lambda_increase_waiter_add_string( + event::IntWaitHandler& counter, + utils::Atomicable& bucket, + std::string string_to_add, + int increase = 1, + bool append_string = true) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + // Lock string that will be modified + if (append_string) + { + std::unique_lock> lock(bucket); + bucket.append(string_to_add); + } + + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Manager type to use. + * Using \c StdThreadPool because it is the one that will be used the most. + */ +using ManagerType = utils::thread::StdThreadPool; + +utils::thread::IManager* create_manager() +{ + return new ManagerType(DEFAULT_THREADS, true); +} + +} /* namespace eprosima */ + +/** + * Construct a StdThreadPool and uses OneShotConnector to send executions without parameters + * + * STEPS: + * - Create Manager + * - Call a OneShotConnector by copying an already existing std::function + * - Call OneShotConnector N times with a new created lambda each time + * - Check that the final value is the expected + */ +TEST(OneShotConnectorTest, one_shot_test_no_params) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Execute lambda by copy increasing in 1 + std::function lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + utils::thread::SimpleOneShotConnector::execute( + manager, + lambda); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in n + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::thread::SimpleOneShotConnector::execute( + manager, + [&counter, i](){ test::test_lambda_increase_waiter(counter, i); }); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_int) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = + [&counter, &bucket] + (bool x) + { test::test_lambda_increase_waiter_add_string(counter, bucket, "Hello", 1, x); }; + + utils::thread::OneShotConnector::execute( + manager, + std::move(lambda_move), + true); + + // Wait for lambda to be called required times + counter.wait_equal(1); + + // Erase Manager + delete manager; +} + +// TEST(OneShotConnectorTest, one_shot_test_int) +// { +// // Waiter to check result +// event::IntWaitHandler counter(0); + +// // Manager object +// utils::thread::IManager* manager = test::create_manager(); + +// // Execute lambda by moving increasing in 1 +// std::function lambda_move = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; +// utils::thread::OneShotConnector::execute( +// manager, +// std::move(lambda_move), +// 1); + +// // Wait for lambda to be called required times +// counter.wait_equal(1); +// // Check that lambda has been called only that amount of times +// ASSERT_EQ(counter.get_value(), 1); +// // Reset counter +// counter.set_value(0); + +// // Execute lambda N times by copy increasing in 1 +// std::function lambda = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; +// for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) +// { +// utils::thread::OneShotConnector::execute( +// manager, +// lambda, +// static_cast(i)); +// } + +// // Wait for lambda to be called required times +// uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); +// counter.wait_equal(target_value); +// // Check that lambda has been called only that amount of times +// ASSERT_EQ(counter.get_value(), target_value); + +// // Erase Manager +// delete manager; +// } + +// TEST(OneShotConnectorTest, one_shot_test_string) +// { +// // Waiter to check result +// event::IntWaitHandler counter(0); +// // String to check result +// utils::Atomicable bucket; + +// // Manager object +// utils::thread::IManager* manager = test::create_manager(); + +// // Execute lambda by moving increasing in 1 +// std::function lambda_move = +// [&counter, &bucket](std::string s){ test::test_lambda_increase_waiter_add_string(counter, bucket, s, 1); }; +// utils::thread::OneShotConnector::execute( +// manager, +// std::move(lambda_move), +// "Hello"); + +// // Wait for lambda to be called required times +// counter.wait_equal(1); +// // Check that lambda has been called only that amount of times and the string result is the correct +// ASSERT_EQ(counter.get_value(), 1); +// ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done +// // Reset counter +// counter.set_value(0); +// bucket.erase(); + +// // Execute lambda N times by copy increasing in 1 +// std::function lambda = +// [&counter, &bucket](std::string s){ test::test_lambda_increase_waiter_add_string(counter, bucket, s, 1); }; +// for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) +// { +// // Call execute with a character adding 'a' + i (-1 to start from 'a') +// utils::thread::OneShotConnector::execute( +// manager, +// lambda, +// std::string(1, static_cast('a' + i - 1))); +// } + +// // Wait for lambda to be called required times +// uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); +// counter.wait_equal(target_value); +// // Check that lambda has been called only that amount of times +// ASSERT_EQ(counter.get_value(), target_value); + +// // Check the result string. It may not be in the order expected as the order of threads is not deterministic +// // Thus, check that every char from 'a' to 'a' + N is in the string +// for (char c = 'a'; c <= 'a' + test::DEFAULT_TIME_REPETITIONS; ++c) +// { +// ASSERT_NE(bucket.find(c), std::string::npos); +// } + +// // Erase Manager +// delete manager; +// } + +// TEST(OneShotConnectorTest, one_shot_test_bool_int_string) +// { +// // Waiter to check result +// event::IntWaitHandler counter(0); +// // String to check result +// utils::Atomicable bucket; + +// // Manager object +// utils::thread::IManager* manager = test::create_manager(); + +// // Execute lambda by moving increasing in 1 +// std::function lambda_move = +// [&counter, &bucket] +// (bool b, int i, std::string s) +// { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + +// utils::thread::OneShotConnector::execute( +// manager, +// std::move(lambda_move), +// true, +// 1, +// "Hello"); + +// // Wait for lambda to be called required times +// counter.wait_equal(1); +// // Check that lambda has been called only that amount of times and the string result is the correct +// ASSERT_EQ(counter.get_value(), 1); +// ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done +// // Reset counter +// counter.set_value(0); +// bucket.erase(); + +// // Execute lambda N times by copy increasing in 1 +// std::function lambda = +// [&counter, &bucket] +// (bool b, int i, std::string s) +// { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + +// for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) +// { +// // Whether it should add the char. Only add odd number chars +// char c = static_cast('a' + i - 1); +// bool append_char = static_cast(static_cast(c) % 2); + +// // Call execute with a character adding 'a' + i (-1 to start from 'a') +// utils::thread::OneShotConnector::execute( +// manager, +// lambda, +// append_char, +// i, +// std::string(1, c)); +// } + +// // Wait for lambda to be called required times +// uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); +// counter.wait_equal(target_value); +// // Check that lambda has been called only that amount of times +// ASSERT_EQ(counter.get_value(), target_value); + +// // Check the result string. It may not be in the order expected as the order of threads is not deterministic +// // Thus, check that every char from 'a' to 'a' + N is in the string +// for (char c = 'a'; c <= 'a' + test::DEFAULT_TIME_REPETITIONS; ++c) +// { +// bool append_char = static_cast(static_cast(c) % 2); +// if (append_char) +// { +// ASSERT_NE(bucket.find(c), std::string::npos); +// } +// else +// { +// ASSERT_EQ(bucket.find(c), std::string::npos); +// } +// } + +// // Erase Manager +// delete manager; +// } + +int main( + int argc, + char** argv) +{ + utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread_pool/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/task/CMakeLists.txt similarity index 80% rename from ddsrouter_utils/test/unittest/thread_pool/CMakeLists.txt rename to ddsrouter_utils/test/unittest/thread/task/CMakeLists.txt index 00f047678..6c8084ec8 100644 --- a/ddsrouter_utils/test/unittest/thread_pool/CMakeLists.txt +++ b/ddsrouter_utils/test/unittest/thread/task/CMakeLists.txt @@ -13,29 +13,27 @@ # limitations under the License. ################################### -# Slot Thread Pool execution Test +# ITask Specializations Test ################################### set(TEST_NAME - slot_thread_pool_test) + ParametrizedThreadTaskTest) set(TEST_SOURCES - slot_thread_pool_test.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/thread_pool/pool/SlotThreadPool.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/thread_pool/task/TaskId.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + task_interface_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/ReferenceTask.cpp ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp ) set(TEST_LIST - pool_one_thread_one_slot - pool_one_thread_n_slots - pool_n_threads_one_slot + task_operator ) set(TEST_EXTRA_LIBRARIES diff --git a/ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp b/ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp new file mode 100644 index 000000000..bce7a98a2 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp @@ -0,0 +1,138 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include + +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + utils::sleep_for(DEFAULT_TIME_TEST); + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +template +utils::thread::ITask* create_task_specialization(std::function* callback); + +template <> +utils::thread::ITask* create_task_specialization( + std::function* callback) +{ + // Copy callback value inside new object + return new utils::thread::ReferenceTask(callback); +} + +template <> +utils::thread::ITask* create_task_specialization( + std::function* callback) +{ + // Copy callback value inside new object + return new utils::thread::OwnedTask(*callback); +} + +} /* namespace test */ + +// Empty class to parametrized tests +template +struct ThreadTaskTest : public ::testing::Test +{}; +// Needed gtest macro +TYPED_TEST_SUITE_P(ThreadTaskTest); + +/** + * This tests operator() of every specialization if ITask + * + * Uses a IntWaitHandler to increase a value and at the same time wait for it to be updated to a exact value. + * It is increased from a function executed inside the ITask, and the wait for it to be updated and check the value. + * + */ +TYPED_TEST_P(ThreadTaskTest, task_operator) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Function object to create tasks + std::function task_function( + [&counter](){ test::test_lambda_increase_waiter(counter, 1); }); + + // Create task + utils::thread::ITask* task( + test::create_task_specialization( + &task_function)); + + // Execute lambda 1 time + task->operator()(); + counter.wait_equal(1); + counter.set_value(0); + + // Execute lambda N times + for (unsigned int i = 0; i < test::DEFAULT_TIME_REPETITIONS; ++i) + { + task->operator()(); + } + counter.wait_equal(test::DEFAULT_TIME_REPETITIONS); + + // Erase Task + delete task; +} + +// Register test class and test cases +REGISTER_TYPED_TEST_SUITE_P( + ThreadTaskTest, + task_operator +); + +// Set types used in parametrization +typedef ::testing::Types< + utils::thread::ReferenceTask, + utils::thread::OwnedTask + > CaseTypes; + +// Generate each test case for each type case +INSTANTIATE_TYPED_TEST_SUITE_P( + ParametrizedThreadTaskTest, + ThreadTaskTest, + CaseTypes); + +int main( + int argc, + char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp b/ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp deleted file mode 100644 index 788e1944d..000000000 --- a/ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include -#include -#include - -#include - -namespace eprosima { -namespace ddsrouter { -namespace utils { -namespace test { - -eprosima::ddsrouter::utils::Duration_ms DEFAULT_TIME_TEST = 200u; -eprosima::ddsrouter::utils::Duration_ms RESIDUAL_TIME_TEST = DEFAULT_TIME_TEST / 2u; - -uint32_t N_THREADS_IN_TEST = 10; -uint32_t N_EXECUTIONS_IN_TEST = 5; - -void test_lambda_increase_waiter( - eprosima::ddsrouter::event::IntWaitHandler& counter, - unsigned int increase = 1) -{ - std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); - - for (unsigned int i = 0; i < increase; ++i) - { - ++counter; - } -} - -} /* namespace test */ -} /* namespace utils */ -} /* namespace ddsrouter */ -} /* namespace eprosima */ - -using namespace eprosima::ddsrouter::utils; - -/** - * Emit N tasks to a ThreadPool with one thread by storing slot. - */ -TEST(slot_thread_pool_test, pool_one_thread_one_slot) -{ - // Create thread_pool - SlotThreadPool thread_pool(1); - thread_pool.enable(); - - // Counter Wait Handler to wait for the task to be executed and check the final value - eprosima::ddsrouter::event::IntWaitHandler waiter(0); - - // Create slot - TaskId task_id(27); - thread_pool.slot( - task_id, - [&waiter] - () - { - test::test_lambda_increase_waiter(waiter); - } - ); - - // Emit task n times - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST; ++i) - { - thread_pool.emit(task_id); - } - - // Wait for counter value to be greater than 0 (so 1 task is being executed) - waiter.wait_greater_equal_than(test::N_EXECUTIONS_IN_TEST); - - ASSERT_EQ(waiter.get_value(), test::N_EXECUTIONS_IN_TEST); -} - -/** - * Emit N tasks to a ThreadPool with one thread by storing N slots. - */ -TEST(slot_thread_pool_test, pool_one_thread_n_slots) -{ - // Create thread_pool - SlotThreadPool thread_pool(1); - thread_pool.enable(); - - // Counter Wait Handler to wait for the task to be executed and check the final value - eprosima::ddsrouter::event::IntWaitHandler waiter(0); - // Create timer to know the task has been executed in the time expected - eprosima::ddsrouter::utils::Timer timer; - - // Create slot - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST; ++i) - { - TaskId task_id(i); - thread_pool.slot( - task_id, - [&waiter, &i] - () - { - test::test_lambda_increase_waiter(waiter, i); - } - ); - } - - // Emit every task 1 time - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST; ++i) - { - thread_pool.emit(TaskId(i)); - } - - // Wait for counter value to be M being M = N*(N+1)/2 that is the increase value that should be achieved - waiter.wait_greater_equal_than((test::N_EXECUTIONS_IN_TEST* (test::N_EXECUTIONS_IN_TEST + 1)) / 2); - - ASSERT_EQ(waiter.get_value(), (test::N_EXECUTIONS_IN_TEST* (test::N_EXECUTIONS_IN_TEST + 1)) / 2); -} - -/** - * Emit N*T tasks to a ThreadPool with T threads by storing 1 slot. - */ -TEST(slot_thread_pool_test, pool_n_threads_one_slot) -{ - // Create thread_pool - SlotThreadPool thread_pool(test::N_THREADS_IN_TEST); - thread_pool.enable(); - - // Counter Wait Handler to wait for the task to be executed and check the final value - eprosima::ddsrouter::event::IntWaitHandler waiter(0); - // Create timer to know the task has been executed in the time expected - eprosima::ddsrouter::utils::Timer timer; - - // Create slot - TaskId task_id(27); - thread_pool.slot( - task_id, - [&waiter] - () - { - test::test_lambda_increase_waiter(waiter); - } - ); - - // Emit task n times - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST* test::N_THREADS_IN_TEST; ++i) - { - thread_pool.emit(task_id); - } - - // Wait for counter value to be greater than 0 (so 1 task is being executed) - waiter.wait_greater_equal_than(test::N_EXECUTIONS_IN_TEST* test::N_THREADS_IN_TEST); - - auto time_elapsed = timer.elapsed(); - - // Check that the task has been executed in more than waiting time and less than waiting time + residual time - // and that function has been called exactly once - ASSERT_GE(time_elapsed, test::DEFAULT_TIME_TEST* test::N_EXECUTIONS_IN_TEST); - ASSERT_LE(time_elapsed, test::DEFAULT_TIME_TEST* test::N_EXECUTIONS_IN_TEST + test::RESIDUAL_TIME_TEST); - ASSERT_EQ(waiter.get_value(), test::N_EXECUTIONS_IN_TEST* test::N_THREADS_IN_TEST); -} - -int main( - int argc, - char** argv) -{ - // eprosima::ddsxrouter::utils::Log::SetVerbosity(eprosima::ddsrouter::utils::Log::Kind::Info); - - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp b/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp index 9cfda8f17..5cdb715fb 100644 --- a/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp +++ b/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp @@ -89,7 +89,7 @@ TEST(DBQueueWaitHandlerTest, push_pop_one_thread_string_move) // This lvalue is moved as rvalue, so after moving it will be empty handler.produce(std::move(lvalue)); // TODO uncomment it once DBQueue supports moving values - // ASSERT_EQ(lvalue.size(), 0); + ASSERT_EQ(lvalue.size(), 0); // Getting first value std::string pop_value = handler.consume(); @@ -109,7 +109,7 @@ TEST(DBQueueWaitHandlerTest, push_pop_one_thread_string_copy) std::string lvalue("test_data"); - handler.produce(lvalue); + handler.produce(std::string(lvalue)); // Getting first value std::string pop_value = handler.consume();