From 1fdca499462c6293a4a65e7dfdeaa16acc1f55b5 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Thu, 19 Dec 2024 12:23:06 +0200 Subject: [PATCH 01/17] Extracted setting core affinity to a helper function. --- Pcap++/src/PfRingDevice.cpp | 64 +++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 8a945a9c20..a7a1c7d32b 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -15,6 +15,26 @@ namespace pcpp { + namespace + { + void setThreadCoreAffinity(std::thread const& thread, int coreId) + { + if (thread.get_id() == std::thread::id{}) + { + throw std::invalid_argument("Can't set core affinity for a non-joinable thread"); + } + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(coreId, &cpuset); + int err = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t), &cpuset); + if (err != 0) + { + throw std::runtime_error("Error while binding thread to core " + std::to_string(coreId) + + ": errno=" + std::to_string(err)); + } + } + } // namespace PfRingDevice::PfRingDevice(const char* deviceName) : m_MacAddress(MacAddress::Zero) { @@ -457,7 +477,9 @@ namespace pcpp int rxChannel = 0; for (int coreId = 0; coreId < MAX_NUM_OF_CORES; coreId++) { - if (!m_CoreConfiguration[coreId].IsInUse) + auto& coreConfig = m_CoreConfiguration[coreId]; + + if (!coreConfig.IsInUse) continue; m_ReentrantMode = true; @@ -466,19 +488,18 @@ namespace pcpp m_OnPacketsArriveUserCookie = onPacketsArriveUserCookie; // create a new thread - m_CoreConfiguration[coreId].Channel = m_PfRingDescriptors[rxChannel++]; - m_CoreConfiguration[coreId].RxThread = + coreConfig.Channel = m_PfRingDescriptors[rxChannel++]; + coreConfig.RxThread = std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); // set affinity to cores - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(coreId, &cpuset); - int err = pthread_setaffinity_np(m_CoreConfiguration[coreId].RxThread.native_handle(), sizeof(cpu_set_t), - &cpuset); - if (err != 0) + try { - PCPP_LOG_ERROR("Error while binding thread to core " << coreId << ": errno=" << err); + setThreadCoreAffinity(coreConfig.RxThread, coreId); + } + catch (const std::exception& e) + { + PCPP_LOG_ERROR(e.what()); startThread = 1; clearCoreConfiguration(); return false; @@ -521,19 +542,20 @@ namespace pcpp std::condition_variable cond; int startThread = 0; - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(0, &cpuset); - m_CoreConfiguration[0].IsInUse = true; - m_CoreConfiguration[0].Channel = m_PfRingDescriptors[0]; - m_CoreConfiguration[0].RxThread = - std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); - m_CoreConfiguration[0].IsAffinitySet = false; - int err = pthread_setaffinity_np(m_CoreConfiguration[0].RxThread.native_handle(), sizeof(cpu_set_t), &cpuset); - if (err != 0) + auto& coreConfig = m_CoreConfiguration[0]; + coreConfig.IsInUse = true; + coreConfig.Channel = m_PfRingDescriptors[0]; + coreConfig.RxThread = std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); + coreConfig.IsAffinitySet = false; + + try + { + setThreadCoreAffinity(coreConfig.RxThread, 0); + } + catch (const std::exception& e) { + PCPP_LOG_ERROR(e.what()); startThread = 1; - PCPP_LOG_ERROR("Error while binding thread to core 0: errno=" << err); clearCoreConfiguration(); return false; } From 644ef3a1d9bcee9f758854a396775e6efe5f6b01 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 03:01:48 +0200 Subject: [PATCH 02/17] Refactored PfRing capture thread function. - Completely rewrote the capture thread function to be a free function. - Added StopToken and StopTokenSource to encapsulate stop requests. - Refactored startCaptureSingleThread to use startCaptureMultiThread. --- Pcap++/CMakeLists.txt | 4 +- Pcap++/header/PfRingDevice.h | 15 +- Pcap++/header/StopToken.h | 65 ++++++++ Pcap++/src/PfRingDevice.cpp | 300 ++++++++++++++++++++--------------- Pcap++/src/StopToken.cpp | 46 ++++++ 5 files changed, 290 insertions(+), 140 deletions(-) create mode 100644 Pcap++/header/StopToken.h create mode 100644 Pcap++/src/StopToken.cpp diff --git a/Pcap++/CMakeLists.txt b/Pcap++/CMakeLists.txt index ab3ef642bd..9c53f0aa60 100644 --- a/Pcap++/CMakeLists.txt +++ b/Pcap++/CMakeLists.txt @@ -20,6 +20,7 @@ add_library( $<$:src/PfRingDeviceList.cpp> $<$:src/XdpDevice.cpp> src/RawSocketDevice.cpp + src/StopToken.cpp $<$:src/WinPcapLiveDevice.cpp> # Force light pcapng to be link fully static $) @@ -32,7 +33,8 @@ set(public_headers header/PcapFilter.h header/PcapLiveDevice.h header/PcapLiveDeviceList.h - header/RawSocketDevice.h) + header/RawSocketDevice.h + header/StopToken.h) if(PCAPPP_USE_DPDK) list( diff --git a/Pcap++/header/PfRingDevice.h b/Pcap++/header/PfRingDevice.h index 304f40c3c8..2ef0c2b73f 100644 --- a/Pcap++/header/PfRingDevice.h +++ b/Pcap++/header/PfRingDevice.h @@ -6,8 +6,9 @@ #include "MacAddress.h" #include "SystemUtils.h" #include "Packet.h" +#include #include -#include +#include "StopToken.h" /// @file @@ -53,10 +54,11 @@ namespace pcpp int m_InterfaceIndex; MacAddress m_MacAddress; int m_DeviceMTU; - CoreConfiguration m_CoreConfiguration[MAX_NUM_OF_CORES]; - bool m_StopThread; - OnPfRingPacketsArriveCallback m_OnPacketsArriveCallback; - void* m_OnPacketsArriveUserCookie; + + std::array m_CoreConfiguration; + // An empty stop token source is used to indicate that no capture is running + internal::StopTokenSource m_StopTokenSource{internal::NoStopStateTag{}}; + bool m_ReentrantMode; bool m_HwClockEnabled; bool m_IsFilterCurrentlySet; @@ -64,7 +66,6 @@ namespace pcpp PfRingDevice(const char* deviceName); bool initCoreConfigurationByCoreMask(CoreMask coreMask); - void captureThreadMain(std::condition_variable* startCond, std::mutex* startMutex, const int* startState); int openSingleRxChannel(const char* deviceName, pfring** ring); @@ -246,7 +247,7 @@ namespace pcpp * Gets the core used in the current thread context * @return The system core used in the current thread context */ - SystemCore getCurrentCoreId() const; + static SystemCore getCurrentCoreId(); /** * Get the statistics of a specific thread/core (=RX channel) diff --git a/Pcap++/header/StopToken.h b/Pcap++/header/StopToken.h new file mode 100644 index 0000000000..87dd7dd917 --- /dev/null +++ b/Pcap++/header/StopToken.h @@ -0,0 +1,65 @@ +#pragma once + +#include + +namespace pcpp +{ + namespace internal + { + class StopToken; + struct NoStopStateTag + { + }; + + class StopTokenSource + { + friend class StopToken; + public: + /// Creates a new StopTokenSource. + StopTokenSource(); + /// Creates a new StopTokenSource without a shared state. + StopTokenSource(NoStopStateTag) noexcept : m_SharedState(nullptr) + {} + + /// Returns a StopToken that is associated with this source. + StopToken getToken() const noexcept + { + return StopToken(m_SharedState); + } + + /// Requests stop. + bool requestStop() noexcept; + + /// Returns true if stop has been requested. + bool stopRequested() const noexcept; + /// Returns true if stop can be requested. + bool stopPossible() const noexcept; + + private: + struct SharedState; + + std::shared_ptr m_SharedState; + }; + + class StopToken + { + friend class StopTokenSource; + + public: + /// Create a StopToken that never requests stop. + StopToken() noexcept = default; + + /// Returns true if stop has been requested. + bool stopRequested() const noexcept; + /// Returns true if stop can be requested. + bool stopPossible() const noexcept; + private: + /// Creates a StopToken that is associated with the given shared state. + StopToken::StopToken(std::shared_ptr sharedState) noexcept + : m_SharedState(std::move(sharedState)) + {} + + std::shared_ptr m_SharedState; + }; + } // namespace internal +} // namespace pcpp \ No newline at end of file diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index a7a1c7d32b..b885e4620b 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -10,6 +10,9 @@ #include #include #include +#include +#include +#include #define DEFAULT_PF_RING_SNAPLEN 1600 @@ -42,9 +45,6 @@ namespace pcpp m_DeviceOpened = false; m_DeviceName = std::string(deviceName); m_InterfaceIndex = -1; - m_StopThread = true; - m_OnPacketsArriveCallback = nullptr; - m_OnPacketsArriveUserCookie = nullptr; m_ReentrantMode = false; m_HwClockEnabled = false; m_DeviceMTU = 0; @@ -75,6 +75,8 @@ namespace pcpp { PCPP_LOG_DEBUG("Succeeded opening device [" << m_DeviceName << "]"); m_NumOfOpenedRxChannels = 1; + // Set reentrant mode to false as the channel is opened without the PF_RING_REENTRANT flag. + m_ReentrantMode = false; m_DeviceOpened = true; return true; } @@ -208,6 +210,8 @@ namespace pcpp return false; } + // Set reentrant mode to false as the channels are opened without the PF_RING_REENTRANT flag. + m_ReentrantMode = false; m_DeviceOpened = true; return true; @@ -330,6 +334,8 @@ namespace pcpp m_NumOfOpenedRxChannels = ringsOpen; + // Set reentrant mode to true as the channels are opened with the PF_RING_REENTRANT flag. + m_ReentrantMode = true; m_DeviceOpened = true; return true; } @@ -351,7 +357,7 @@ namespace pcpp } } - SystemCore PfRingDevice::getCurrentCoreId() const + SystemCore PfRingDevice::getCurrentCoreId() { return SystemCores::IdToSystemCore[sched_getcpu()]; } @@ -448,10 +454,102 @@ namespace pcpp return true; } + namespace + { + struct StartupBlock + { + std::mutex startMutex; + std::condition_variable startCond; + bool startupReady = false; + }; + + struct PfRingCaptureThreadData + { + std::shared_ptr startupBlock; /// The startup block to wait on + + pfring* ringChannel; /// The PF_RING channel to capture on (non-owning) + bool zeroCopySupport; /// True if zero copy is supported + + OnPfRingPacketsArriveCallback onPacketsArrive; /// Callback to be called when packets arrive + void* onPacketsArriveUserCookie = nullptr; /// User cookie to be passed to the callback + PfRingDevice* device = nullptr; /// The device this thread is capturing on (non-owning) + }; + + void pfRingCaptureThreadMain(PfRingCaptureThreadData threadData, internal::StopToken ct) + { + if (threadData.startupBlock == nullptr) + { + PCPP_LOG_ERROR("Capture thread started without a startup block"); + return; + } + + { + // Wait for the start signal + std::lock_guard lock(threadData.startupBlock->startMutex); + threadData.startupBlock->startCond.wait(lock, [&] { return threadData.startupBlock->startupReady; }); + } + + // Startup is complete, clear the startup block + threadData.startupBlock = nullptr; + + // Check if the thread should stop. + // If the initialization of other threads failed, this thread should stop. + if (ct.isCancellationRequested()) + { + return; + } + + // Core affinity should be set by now, so the core ID should be able to be cached. + const int coreId = PfRingDevice::getCurrentCoreId().Id; + + PCPP_LOG_DEBUG("Starting capture thread " << coreId); + + uint8_t bufferPtr = nullptr; + uint32_t bufferLen = 0; + std::vector recvBuffer; + + // If zero copy is not supported, allocate a buffer for the packet + if (!threadData.zeroCopySupport) + { + recvBuffer.resize(PCPP_MAX_PACKET_SIZE); + bufferPtr = recvBuffer.data(); + bufferLen = recvBuffer.size(); + } + + while (!ct.isCancellationRequested()) + { + struct pfring_pkthdr pktHdr; + int recvRes = pfring_recv(ring, &bufferPtr, bufferLen, &pktHdr, 0); + if (recvRes > 0) + { + // if caplen < len it means we don't have the whole packet. Treat this case as packet drop + // TODO: add this packet to dropped packet stats + // if (pktHdr.caplen != pktHdr.len) + // { + // PCPP_LOG_ERROR("Packet dropped due to len != caplen"); + // continue; + // } + + RawPacket rawPacket(bufferPtr, pktHdr.caplen, pktHdr.ts, false); + threadData.onPacketsArrive(&rawPacket, 1, coreId, threadData.device, + threadData.onPacketsArriveUserCookie); + } + else if (recvRes < 0) + { + // cppcheck-suppress shiftNegative + PCPP_LOG_ERROR("pfring_recv returned an error: [Err=" << recvRes << "]"); + } + } + + PCPP_LOG_DEBUG("Exiting capture thread " << coreId); + } + } // namespace + bool PfRingDevice::startCaptureMultiThread(OnPfRingPacketsArriveCallback onPacketsArrive, void* onPacketsArriveUserCookie, CoreMask coreMask) { - if (!m_StopThread) + // Uses the stop token to determine if the device is already capturing + if (!m_StopTokenSource.stopPossible()) { PCPP_LOG_ERROR("Device already capturing. Cannot start 2 capture sessions at the same time"); return false; @@ -460,37 +558,43 @@ namespace pcpp if (!initCoreConfigurationByCoreMask(coreMask)) return false; - if (m_NumOfOpenedRxChannels != getCoresInUseCount()) + const int requestedInUseCores = getCoresInUseCount(); + if (m_NumOfOpenedRxChannels != requestedInUseCores) { PCPP_LOG_ERROR("Cannot use a different number of channels and cores. Opened " - << m_NumOfOpenedRxChannels << " channels but set " << getCoresInUseCount() + << m_NumOfOpenedRxChannels << " channels but set " << requestedInUseCores << " cores in core mask"); clearCoreConfiguration(); return false; } - std::mutex mutex; - std::condition_variable cond; - int startThread = 0; + PCPP_LOG_DEBUG("Trying to start capturing on " << requestedInUseCores << " threads for device [" << m_DeviceName << "]"); + + // Create a new stop token source for this capture session. + m_StopTokenSource = internal::StopTokenSource(); + // Create a startup block for all threads + std::shared_ptr startupBlock = std::make_shared(); - m_StopThread = false; int rxChannel = 0; - for (int coreId = 0; coreId < MAX_NUM_OF_CORES; coreId++) + for (int coreId = 0; coreId < m_CoreConfiguration.size(); coreId++) { auto& coreConfig = m_CoreConfiguration[coreId]; if (!coreConfig.IsInUse) continue; - m_ReentrantMode = true; - - m_OnPacketsArriveCallback = onPacketsArrive; - m_OnPacketsArriveUserCookie = onPacketsArriveUserCookie; + pfring* ringChannel = m_PfRingDescriptors[rxChannel++]; + PfRingCaptureThreadData threadData; + threadData.startupBlock = startupBlock; + threadData.ringChannel = ringChannel; + threadData.zeroCopySupport = !m_ReentrantMode; // Zero copy is not supported in reentrant mode + threadData.onPacketsArrive = onPacketsArrive; + threadData.onPacketsArriveUserCookie = onPacketsArriveUserCookie; + threadData.device = this; - // create a new thread - coreConfig.Channel = m_PfRingDescriptors[rxChannel++]; - coreConfig.RxThread = - std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); + // Create a new thread + coreConfig.Channel = ringChannel; + coreConfig.RxThread = std::thread(&pfRingCaptureThreadMain, threadData, m_StopTokenSource.getToken()); // set affinity to cores try @@ -500,14 +604,37 @@ namespace pcpp catch (const std::exception& e) { PCPP_LOG_ERROR(e.what()); - startThread = 1; + + // Request stop and set the startup block to ready to prevent other threads from starting + m_StopTokenSource.requestStop(); + { + std::lock_guard lock(startupBlock->startMutex); + startupBlock->startupReady = true; + } + startupBlock->startCond.notify_all(); + + // Wait for all threads to stop + for (int coreId2 = coreId; coreId2 >= 0; coreId2--) + { + if (!m_CoreConfiguration[coreId2].IsInUse) + continue; + m_CoreConfiguration[coreId2].RxThread.join(); + PCPP_LOG_DEBUG("Thread on core [" << coreId2 << "] stopped"); + } + + // Clear the core configuration and stop token source + m_StopTokenSource = internal::StopTokenSource(internal::NoStopStateTag{}); clearCoreConfiguration(); return false; } } - startThread = 2; - cond.notify_all(); + // Set the startup block to ready to start all threads + { + std::lock_guard lock(startupBlock->startMutex); + startupBlock->startupReady = true; + } + startupBlock->startCond.notify_all(); return true; } @@ -515,7 +642,8 @@ namespace pcpp bool PfRingDevice::startCaptureSingleThread(OnPfRingPacketsArriveCallback onPacketsArrive, void* onPacketsArriveUserCookie) { - if (!m_StopThread) + // Uses the stop token to determine if the device is already capturing + if (!m_StopTokenSource.stopPossible()) { PCPP_LOG_ERROR("Device already capturing. Cannot start 2 capture sessions at the same time"); return false; @@ -528,49 +656,19 @@ namespace pcpp } PCPP_LOG_DEBUG("Trying to start capturing on a single thread for device [" << m_DeviceName << "]"); - - clearCoreConfiguration(); - - m_OnPacketsArriveCallback = onPacketsArrive; - m_OnPacketsArriveUserCookie = onPacketsArriveUserCookie; - - m_StopThread = false; - - m_ReentrantMode = false; - - std::mutex mutex; - std::condition_variable cond; - int startThread = 0; - - auto& coreConfig = m_CoreConfiguration[0]; - coreConfig.IsInUse = true; - coreConfig.Channel = m_PfRingDescriptors[0]; - coreConfig.RxThread = std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); - coreConfig.IsAffinitySet = false; - - try - { - setThreadCoreAffinity(coreConfig.RxThread, 0); - } - catch (const std::exception& e) - { - PCPP_LOG_ERROR(e.what()); - startThread = 1; - clearCoreConfiguration(); - return false; - } - startThread = 2; - cond.notify_all(); - - PCPP_LOG_DEBUG("Capturing started for device [" << m_DeviceName << "]"); - return true; + // Starts capture on a single thread by using a Core 0 mask. + // Multi-threaded capture spawns a thread for each core, so this is equivalent to starting capture on a single + // thread. + return startCaptureMultiThread(onPacketsArrive, onPacketsArriveUserCookie, + createCoreMaskFromCoreVector({ SystemCores::Core0 })); } void PfRingDevice::stopCapture() { PCPP_LOG_DEBUG("Trying to stop capturing on device [" << m_DeviceName << "]"); - m_StopThread = true; - for (int coreId = 0; coreId < MAX_NUM_OF_CORES; coreId++) + m_StopTokenSource.requestStop(); + + for (int coreId = 0; coreId < m_CoreConfiguration.size(); coreId++) { if (!m_CoreConfiguration[coreId].IsInUse) continue; @@ -578,73 +676,11 @@ namespace pcpp PCPP_LOG_DEBUG("Thread on core [" << coreId << "] stopped"); } - PCPP_LOG_DEBUG("All capturing threads stopped"); - } - - void PfRingDevice::captureThreadMain(std::condition_variable* startCond, std::mutex* startMutex, - const int* startState) - { - while (*startState == 0) - { - std::unique_lock lock(*startMutex); - startCond->wait_for(lock, std::chrono::milliseconds(100)); - } - if (*startState == 1) - { - return; - } - - int coreId = this->getCurrentCoreId().Id; - pfring* ring = nullptr; - - PCPP_LOG_DEBUG("Starting capture thread " << coreId); - - ring = this->m_CoreConfiguration[coreId].Channel; - - if (ring == nullptr) - { - PCPP_LOG_ERROR("Couldn't find ring for core " << coreId << ". Exiting capture thread"); - return; - } - - while (!this->m_StopThread) - { - // if buffer is nullptr PF_RING avoids copy of the data - uint8_t* buffer = nullptr; - uint32_t bufferLen = 0; - - // in multi-threaded mode flag PF_RING_REENTRANT is set, and this flag doesn't work with zero copy - // so I need to allocate a buffer and set buffer to point to it - if (this->m_ReentrantMode) - { - uint8_t tempBuffer[PCPP_MAX_PACKET_SIZE]; - buffer = tempBuffer; - bufferLen = PCPP_MAX_PACKET_SIZE; - } - - struct pfring_pkthdr pktHdr; - int recvRes = pfring_recv(ring, &buffer, bufferLen, &pktHdr, 0); - if (recvRes > 0) - { - // if caplen < len it means we don't have the whole packet. Treat this case as packet drop - // TODO: add this packet to dropped packet stats - // if (pktHdr.caplen != pktHdr.len) - // { - // PCPP_LOG_ERROR("Packet dropped due to len != caplen"); - // continue; - // } - - RawPacket rawPacket(buffer, pktHdr.caplen, pktHdr.ts, false); - this->m_OnPacketsArriveCallback(&rawPacket, 1, coreId, this, this->m_OnPacketsArriveUserCookie); - } - else if (recvRes < 0) - { - // cppcheck-suppress shiftNegative - PCPP_LOG_ERROR("pfring_recv returned an error: [Err=" << recvRes << "]"); - } - } + // Clear the core configuration and stop token source + m_StopTokenSource = internal::StopTokenSource(internal::NoStopStateTag{}); + clearCoreConfiguration(); - PCPP_LOG_DEBUG("Exiting capture thread " << coreId); + PCPP_LOG_DEBUG("All capturing threads stopped"); } void PfRingDevice::getThreadStatistics(SystemCore core, PfRingStats& stats) const @@ -698,15 +734,15 @@ namespace pcpp void PfRingDevice::clearCoreConfiguration() { - for (int i = 0; i < MAX_NUM_OF_CORES; i++) - m_CoreConfiguration[i].clear(); + for (auto& config : m_CoreConfiguration) + config.clear(); } int PfRingDevice::getCoresInUseCount() const { int res = 0; - for (int i = 0; i < MAX_NUM_OF_CORES; i++) - if (m_CoreConfiguration[i].IsInUse) + for (auto& config : m_CoreConfiguration) + if (config.IsInUse) res++; return res; diff --git a/Pcap++/src/StopToken.cpp b/Pcap++/src/StopToken.cpp new file mode 100644 index 0000000000..c226e3d2cf --- /dev/null +++ b/Pcap++/src/StopToken.cpp @@ -0,0 +1,46 @@ +#include "StopToken.h" + +#include +#include + +namespace pcpp +{ + namespace internal + { + struct StopTokenSource::SharedState + { + std::atomic IsCancellationRequested{ false }; + }; + + StopTokenSource::StopTokenSource() : m_SharedState(std::make_shared()) + {} + + bool StopTokenSource::requestStop() noexcept + { + if (m_SharedState != nullptr) + return false; + + // Try to set the flag to true. If it was already true, return false + // This is done to prevent multiple threads from setting the flag to true + bool expected = false; + return m_SharedState->IsCancellationRequested.compare_exchange_strong(expected, true, std::memory_order_relaxed); + } + bool StopTokenSource::stopRequested() const noexcept + { + return m_SharedState != nullptr && m_SharedState->IsCancellationRequested.load(std::memory_order_relaxed); + } + bool StopTokenSource::stopPossible() const noexcept + { + return m_SharedState != nullptr; + } + + bool StopToken::stopRequested() const noexcept + { + return m_SharedState != nullptr && m_SharedState->IsCancellationRequested.load(std::memory_order_relaxed); + } + bool StopToken::stopPossible() const noexcept + { + return m_SharedState != nullptr; + } + } // namespace internal +} \ No newline at end of file From 5a57b74d7035b2a7313ff8e6d36cfa6ce488f10c Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 15:02:06 +0200 Subject: [PATCH 03/17] Fixes. --- Pcap++/src/PfRingDevice.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index b885e4620b..4eba058558 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -483,9 +483,15 @@ namespace pcpp return; } + if (!ct.stopPossible()) + { + PCPP_LOG_ERROR("Capture thread started without a stop token"); + return; + } + { // Wait for the start signal - std::lock_guard lock(threadData.startupBlock->startMutex); + std::unique_lock lock(threadData.startupBlock->startMutex); threadData.startupBlock->startCond.wait(lock, [&] { return threadData.startupBlock->startupReady; }); } @@ -494,7 +500,7 @@ namespace pcpp // Check if the thread should stop. // If the initialization of other threads failed, this thread should stop. - if (ct.isCancellationRequested()) + if (ct.stopRequested()) { return; } @@ -504,7 +510,7 @@ namespace pcpp PCPP_LOG_DEBUG("Starting capture thread " << coreId); - uint8_t bufferPtr = nullptr; + uint8_t* bufferPtr = nullptr; uint32_t bufferLen = 0; std::vector recvBuffer; @@ -516,10 +522,10 @@ namespace pcpp bufferLen = recvBuffer.size(); } - while (!ct.isCancellationRequested()) + while (!ct.stopRequested()) { struct pfring_pkthdr pktHdr; - int recvRes = pfring_recv(ring, &bufferPtr, bufferLen, &pktHdr, 0); + int recvRes = pfring_recv(threadData.ringChannel, &bufferPtr, bufferLen, &pktHdr, 0); if (recvRes > 0) { // if caplen < len it means we don't have the whole packet. Treat this case as packet drop From c9713fc521336befc1b8b619e4d6f25eba0e1976 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 15:13:35 +0200 Subject: [PATCH 04/17] Lint --- Pcap++/header/PfRingDevice.h | 2 +- Pcap++/header/StopToken.h | 4 +++- Pcap++/src/PfRingDevice.cpp | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Pcap++/header/PfRingDevice.h b/Pcap++/header/PfRingDevice.h index 2ef0c2b73f..18227f0a26 100644 --- a/Pcap++/header/PfRingDevice.h +++ b/Pcap++/header/PfRingDevice.h @@ -57,7 +57,7 @@ namespace pcpp std::array m_CoreConfiguration; // An empty stop token source is used to indicate that no capture is running - internal::StopTokenSource m_StopTokenSource{internal::NoStopStateTag{}}; + internal::StopTokenSource m_StopTokenSource{ internal::NoStopStateTag{} }; bool m_ReentrantMode; bool m_HwClockEnabled; diff --git a/Pcap++/header/StopToken.h b/Pcap++/header/StopToken.h index 87dd7dd917..fc19853518 100644 --- a/Pcap++/header/StopToken.h +++ b/Pcap++/header/StopToken.h @@ -14,6 +14,7 @@ namespace pcpp class StopTokenSource { friend class StopToken; + public: /// Creates a new StopTokenSource. StopTokenSource(); @@ -53,6 +54,7 @@ namespace pcpp bool stopRequested() const noexcept; /// Returns true if stop can be requested. bool stopPossible() const noexcept; + private: /// Creates a StopToken that is associated with the given shared state. StopToken::StopToken(std::shared_ptr sharedState) noexcept @@ -62,4 +64,4 @@ namespace pcpp std::shared_ptr m_SharedState; }; } // namespace internal -} // namespace pcpp \ No newline at end of file +} // namespace pcpp diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 4eba058558..1718073097 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -574,7 +574,8 @@ namespace pcpp return false; } - PCPP_LOG_DEBUG("Trying to start capturing on " << requestedInUseCores << " threads for device [" << m_DeviceName << "]"); + PCPP_LOG_DEBUG("Trying to start capturing on " << requestedInUseCores << " threads for device [" << m_DeviceName + << "]"); // Create a new stop token source for this capture session. m_StopTokenSource = internal::StopTokenSource(); From 792d689252f3ede0015051092e1d09db0ba4bccf Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 15:48:34 +0200 Subject: [PATCH 05/17] Fixes StopToken incomplete error. --- Pcap++/header/StopToken.h | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Pcap++/header/StopToken.h b/Pcap++/header/StopToken.h index fc19853518..c69a80327c 100644 --- a/Pcap++/header/StopToken.h +++ b/Pcap++/header/StopToken.h @@ -23,10 +23,7 @@ namespace pcpp {} /// Returns a StopToken that is associated with this source. - StopToken getToken() const noexcept - { - return StopToken(m_SharedState); - } + StopToken getToken() const noexcept; /// Requests stop. bool requestStop() noexcept; @@ -47,7 +44,7 @@ namespace pcpp friend class StopTokenSource; public: - /// Create a StopToken that never requests stop. + /// Create a StopToken that is not associated with any shared state. StopToken() noexcept = default; /// Returns true if stop has been requested. @@ -63,5 +60,10 @@ namespace pcpp std::shared_ptr m_SharedState; }; + + inline StopToken StopTokenSource::getToken() const noexcept + { + return StopToken(m_SharedState); + } } // namespace internal } // namespace pcpp From 0349963dd59d2e319b23ab526f98346e80e5722f Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 21:56:56 +0200 Subject: [PATCH 06/17] Fixed stop token constructors. --- Pcap++/header/StopToken.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Pcap++/header/StopToken.h b/Pcap++/header/StopToken.h index c69a80327c..c64b33d2b5 100644 --- a/Pcap++/header/StopToken.h +++ b/Pcap++/header/StopToken.h @@ -18,6 +18,7 @@ namespace pcpp public: /// Creates a new StopTokenSource. StopTokenSource(); + // cppcheck-suppress noExplicitConstructor /// Creates a new StopTokenSource without a shared state. StopTokenSource(NoStopStateTag) noexcept : m_SharedState(nullptr) {} @@ -54,7 +55,7 @@ namespace pcpp private: /// Creates a StopToken that is associated with the given shared state. - StopToken::StopToken(std::shared_ptr sharedState) noexcept + explicit StopToken(std::shared_ptr sharedState) noexcept : m_SharedState(std::move(sharedState)) {} From 4b3d39e9f3dae303f65dec4364d1a2ee769aeebf Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 22:05:24 +0200 Subject: [PATCH 07/17] Removed const-ness of thread reference when setting thread affinity. --- Pcap++/src/PfRingDevice.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 1718073097..cbd2b4ac94 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -20,7 +20,7 @@ namespace pcpp { namespace { - void setThreadCoreAffinity(std::thread const& thread, int coreId) + void setThreadCoreAffinity(std::thread& thread, int coreId) { if (thread.get_id() == std::thread::id{}) { From 7ff6cba2a9adca3221379c24ddf76b3da2a9e1d7 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 22:10:13 +0200 Subject: [PATCH 08/17] Added documentation for StopToken and StopTokenSource. --- Pcap++/header/StopToken.h | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/Pcap++/header/StopToken.h b/Pcap++/header/StopToken.h index c64b33d2b5..adf4859ae5 100644 --- a/Pcap++/header/StopToken.h +++ b/Pcap++/header/StopToken.h @@ -7,10 +7,14 @@ namespace pcpp namespace internal { class StopToken; + + /// Tag type used to construct a StopTokenSource without a shared state. struct NoStopStateTag { }; + /// @class StopTokenSource + /// @brief A source that can be used to request a stop operation. class StopTokenSource { friend class StopToken; @@ -24,14 +28,20 @@ namespace pcpp {} /// Returns a StopToken that is associated with this source. + /// @return A StopToken associated with this StopTokenSource. StopToken getToken() const noexcept; - /// Requests stop. + /// Requests a stop operation. This will notify all associated StopTokens + /// that a stop has been requested. + /// @return True if the stop request was successful, false otherwise. bool requestStop() noexcept; - /// Returns true if stop has been requested. + /// Checks if a stop has been requested for this StopTokenSource. + /// @return True if a stop has been requested, false otherwise. bool stopRequested() const noexcept; - /// Returns true if stop can be requested. + + /// Checks if a stop can be requested for this StopTokenSource. + /// @return True if a stop can be requested, false otherwise. bool stopPossible() const noexcept; private: @@ -40,25 +50,36 @@ namespace pcpp std::shared_ptr m_SharedState; }; + /// @class StopToken + /// @brief A token that can be used to check if a stop has been requested. + /// + /// The StopToken class is used to check if a stop has been requested by a StopTokenSource. + /// It holds a shared state with the StopTokenSource to determine if a stop has been requested. class StopToken { friend class StopTokenSource; public: - /// Create a StopToken that is not associated with any shared state. + /// @brief Default constructor for StopToken. + /// Constructs a StopToken with no associated shared state. StopToken() noexcept = default; - /// Returns true if stop has been requested. + /// @brief Checks if a stop has been requested. + /// @return True if a stop has been requested, false otherwise. bool stopRequested() const noexcept; - /// Returns true if stop can be requested. + + /// @brief Checks if a stop can be requested. + /// @return True if a stop can be requested, false otherwise. bool stopPossible() const noexcept; private: - /// Creates a StopToken that is associated with the given shared state. + /// @brief Constructs a StopToken with the given shared state. + /// @param sharedState The shared state associated with this StopToken. explicit StopToken(std::shared_ptr sharedState) noexcept : m_SharedState(std::move(sharedState)) {} + /// @brief The shared state associated with this StopToken. std::shared_ptr m_SharedState; }; From 9c994c1f9499e8d464d382446db11410b7d48268 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 22:12:15 +0200 Subject: [PATCH 09/17] Lint and cppcheck fixes. --- Pcap++/src/PfRingDevice.cpp | 2 +- Pcap++/src/StopToken.cpp | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index cbd2b4ac94..f50b78910b 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -748,7 +748,7 @@ namespace pcpp int PfRingDevice::getCoresInUseCount() const { int res = 0; - for (auto& config : m_CoreConfiguration) + for (auto const& config : m_CoreConfiguration) if (config.IsInUse) res++; diff --git a/Pcap++/src/StopToken.cpp b/Pcap++/src/StopToken.cpp index c226e3d2cf..8ef1cc02a4 100644 --- a/Pcap++/src/StopToken.cpp +++ b/Pcap++/src/StopToken.cpp @@ -23,7 +23,8 @@ namespace pcpp // Try to set the flag to true. If it was already true, return false // This is done to prevent multiple threads from setting the flag to true bool expected = false; - return m_SharedState->IsCancellationRequested.compare_exchange_strong(expected, true, std::memory_order_relaxed); + return m_SharedState->IsCancellationRequested.compare_exchange_strong(expected, true, + std::memory_order_relaxed); } bool StopTokenSource::stopRequested() const noexcept { @@ -43,4 +44,4 @@ namespace pcpp return m_SharedState != nullptr; } } // namespace internal -} \ No newline at end of file +} // namespace pcpp From 223dc80cd5174d9955958e5ecb6c6c9e72e343a3 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 21 Dec 2024 22:22:27 +0200 Subject: [PATCH 10/17] Replaced for loop with algorithm. --- Pcap++/src/PfRingDevice.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index f50b78910b..716f84cab4 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #define DEFAULT_PF_RING_SNAPLEN 1600 @@ -747,12 +748,8 @@ namespace pcpp int PfRingDevice::getCoresInUseCount() const { - int res = 0; - for (auto const& config : m_CoreConfiguration) - if (config.IsInUse) - res++; - - return res; + return std::count_if(m_CoreConfiguration.begin(), m_CoreConfiguration.end(), + [](const CoreConfiguration& config) { return config.IsInUse; }); } void PfRingDevice::setPfRingDeviceAttributes() From 057cda25ff21ed64bb41ed0da1d3627545fbd97d Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sun, 22 Dec 2024 23:01:47 +0200 Subject: [PATCH 11/17] Reverted core config size to MAX_NUM_OF_CORES. --- Pcap++/src/PfRingDevice.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 716f84cab4..6581897cfe 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -584,7 +584,7 @@ namespace pcpp std::shared_ptr startupBlock = std::make_shared(); int rxChannel = 0; - for (int coreId = 0; coreId < m_CoreConfiguration.size(); coreId++) + for (int coreId = 0; coreId < MAX_NUM_OF_CORES; coreId++) { auto& coreConfig = m_CoreConfiguration[coreId]; @@ -676,7 +676,7 @@ namespace pcpp PCPP_LOG_DEBUG("Trying to stop capturing on device [" << m_DeviceName << "]"); m_StopTokenSource.requestStop(); - for (int coreId = 0; coreId < m_CoreConfiguration.size(); coreId++) + for (int coreId = 0; coreId < MAX_NUM_OF_CORES; coreId++) { if (!m_CoreConfiguration[coreId].IsInUse) continue; From ba9d32e2b52c74465f2745fa8cfc0b176d57de44 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Tue, 24 Dec 2024 16:48:21 +0200 Subject: [PATCH 12/17] Fixed stop token source guards being inverted. --- Pcap++/src/PfRingDevice.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 6581897cfe..673152c2dc 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -556,7 +556,8 @@ namespace pcpp void* onPacketsArriveUserCookie, CoreMask coreMask) { // Uses the stop token to determine if the device is already capturing - if (!m_StopTokenSource.stopPossible()) + // If a stop token has a internal shared state, then another capture has already started. + if (m_StopTokenSource.stopPossible()) { PCPP_LOG_ERROR("Device already capturing. Cannot start 2 capture sessions at the same time"); return false; @@ -651,7 +652,8 @@ namespace pcpp void* onPacketsArriveUserCookie) { // Uses the stop token to determine if the device is already capturing - if (!m_StopTokenSource.stopPossible()) + // If a stop token has a internal shared state, then another capture has already started. + if (m_StopTokenSource.stopPossible()) { PCPP_LOG_ERROR("Device already capturing. Cannot start 2 capture sessions at the same time"); return false; From 97d9863dc2259602775ab3db4c228a16c769f444 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Tue, 31 Dec 2024 14:31:01 +0200 Subject: [PATCH 13/17] Added stop token tests. --- Pcap++/header/StopToken.h | 3 +- Pcap++/src/StopToken.cpp | 2 +- Tests/Pcap++Test/CMakeLists.txt | 1 + Tests/Pcap++Test/TestDefinition.h | 3 ++ Tests/Pcap++Test/Tests/StopTokenTests.cpp | 52 +++++++++++++++++++++++ Tests/Pcap++Test/main.cpp | 2 + 6 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 Tests/Pcap++Test/Tests/StopTokenTests.cpp diff --git a/Pcap++/header/StopToken.h b/Pcap++/header/StopToken.h index adf4859ae5..14f153d121 100644 --- a/Pcap++/header/StopToken.h +++ b/Pcap++/header/StopToken.h @@ -22,9 +22,8 @@ namespace pcpp public: /// Creates a new StopTokenSource. StopTokenSource(); - // cppcheck-suppress noExplicitConstructor /// Creates a new StopTokenSource without a shared state. - StopTokenSource(NoStopStateTag) noexcept : m_SharedState(nullptr) + explicit StopTokenSource(NoStopStateTag) noexcept : m_SharedState(nullptr) {} /// Returns a StopToken that is associated with this source. diff --git a/Pcap++/src/StopToken.cpp b/Pcap++/src/StopToken.cpp index 8ef1cc02a4..8ab068dca6 100644 --- a/Pcap++/src/StopToken.cpp +++ b/Pcap++/src/StopToken.cpp @@ -17,7 +17,7 @@ namespace pcpp bool StopTokenSource::requestStop() noexcept { - if (m_SharedState != nullptr) + if (m_SharedState == nullptr) return false; // Try to set the flag to true. If it was already true, return false diff --git a/Tests/Pcap++Test/CMakeLists.txt b/Tests/Pcap++Test/CMakeLists.txt index 73c6fdcc06..9a2d4c073c 100644 --- a/Tests/Pcap++Test/CMakeLists.txt +++ b/Tests/Pcap++Test/CMakeLists.txt @@ -13,6 +13,7 @@ add_executable( Tests/PacketParsingTests.cpp Tests/PfRingTests.cpp Tests/RawSocketTests.cpp + Tests/StopTokenTests.cpp Tests/SystemUtilsTests.cpp Tests/TcpReassemblyTests.cpp Tests/XdpTests.cpp) diff --git a/Tests/Pcap++Test/TestDefinition.h b/Tests/Pcap++Test/TestDefinition.h index ccaaec4756..69fed280dd 100644 --- a/Tests/Pcap++Test/TestDefinition.h +++ b/Tests/Pcap++Test/TestDefinition.h @@ -115,6 +115,9 @@ PTF_TEST_CASE(TestKniDeviceSendReceive); // Implemented in RawSocketTests.cpp PTF_TEST_CASE(TestRawSockets); +// Implemented in StopTokenTests.cpp +PTF_TEST_CASE(TestStopToken); + // Implemented in SystemUtilsTests.cpp PTF_TEST_CASE(TestSystemCoreUtils); diff --git a/Tests/Pcap++Test/Tests/StopTokenTests.cpp b/Tests/Pcap++Test/Tests/StopTokenTests.cpp new file mode 100644 index 0000000000..bb9a8f588c --- /dev/null +++ b/Tests/Pcap++Test/Tests/StopTokenTests.cpp @@ -0,0 +1,52 @@ +#include "../TestDefinition.h" + +#include "StopToken.h" + +PTF_TEST_CASE(TestStopToken) +{ + using pcpp::internal::StopToken; + using pcpp::internal::StopTokenSource; + using pcpp::internal::NoStopStateTag; + + { + // A stop token source without a shared state should not be able to request a stop. + StopTokenSource stopTokenSource{ NoStopStateTag{} }; + + PTF_ASSERT_FALSE(stopTokenSource.stopPossible()); + PTF_ASSERT_FALSE(stopTokenSource.stopRequested()); + PTF_ASSERT_FALSE(stopTokenSource.requestStop()); + + // A stop token source without a shared state should generate an empty stop token. + StopToken stopToken = stopTokenSource.getToken(); + + PTF_ASSERT_FALSE(stopToken.stopRequested()); + PTF_ASSERT_FALSE(stopTokenSource.stopPossible()); + } + + { + // A default constructed stop token source should have a shared state and not have a stop requested. + StopTokenSource stopTokenSource; + + PTF_ASSERT_TRUE(stopTokenSource.stopPossible()); + PTF_ASSERT_FALSE(stopTokenSource.stopRequested()); + + // A stop token source with a shared state should generate a stop token that reflects the state of the source. + StopToken stopToken = stopTokenSource.getToken(); + PTF_ASSERT_TRUE(stopToken.stopPossible()); + PTF_ASSERT_FALSE(stopToken.stopRequested()); + + // Request a stop and check if the stop token reflects the change. + PTF_ASSERT_TRUE(stopTokenSource.requestStop()); + PTF_ASSERT_TRUE(stopTokenSource.stopRequested()); + PTF_ASSERT_TRUE(stopToken.stopRequested()); + + // Requesting a stop again should not change the state + PTF_ASSERT_FALSE(stopTokenSource.requestStop()); + PTF_ASSERT_TRUE(stopTokenSource.stopRequested()); + PTF_ASSERT_TRUE(stopToken.stopRequested()); + + // Creating a new stop token should reflect the state of the source. + StopToken stopToken2 = stopTokenSource.getToken(); + PTF_ASSERT_TRUE(stopToken2.stopRequested()); + } +} \ No newline at end of file diff --git a/Tests/Pcap++Test/main.cpp b/Tests/Pcap++Test/main.cpp index 784a9e0d6f..9e0ca575e5 100644 --- a/Tests/Pcap++Test/main.cpp +++ b/Tests/Pcap++Test/main.cpp @@ -301,6 +301,8 @@ int main(int argc, char* argv[]) PTF_RUN_TEST(TestRawSockets, "raw_sockets"); + PTF_RUN_TEST(TestStopToken, "no_network"); + PTF_RUN_TEST(TestSystemCoreUtils, "no_network;system_utils"); PTF_RUN_TEST(TestXdpDeviceReceivePackets, "xdp"); From baeb36ccccc11e069f54844242d29cafedf53fff Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Tue, 31 Dec 2024 16:01:29 +0200 Subject: [PATCH 14/17] Lint --- Tests/Pcap++Test/Tests/StopTokenTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Pcap++Test/Tests/StopTokenTests.cpp b/Tests/Pcap++Test/Tests/StopTokenTests.cpp index bb9a8f588c..82f5a8ca06 100644 --- a/Tests/Pcap++Test/Tests/StopTokenTests.cpp +++ b/Tests/Pcap++Test/Tests/StopTokenTests.cpp @@ -4,9 +4,9 @@ PTF_TEST_CASE(TestStopToken) { + using pcpp::internal::NoStopStateTag; using pcpp::internal::StopToken; using pcpp::internal::StopTokenSource; - using pcpp::internal::NoStopStateTag; { // A stop token source without a shared state should not be able to request a stop. From 61a62b6d8b6ec7f5ebdf67248b5d83002742ce8c Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Mon, 6 Jan 2025 22:41:25 +0200 Subject: [PATCH 15/17] Lint --- Tests/Pcap++Test/Tests/StopTokenTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Pcap++Test/Tests/StopTokenTests.cpp b/Tests/Pcap++Test/Tests/StopTokenTests.cpp index 82f5a8ca06..abfe627c4c 100644 --- a/Tests/Pcap++Test/Tests/StopTokenTests.cpp +++ b/Tests/Pcap++Test/Tests/StopTokenTests.cpp @@ -49,4 +49,4 @@ PTF_TEST_CASE(TestStopToken) StopToken stopToken2 = stopTokenSource.getToken(); PTF_ASSERT_TRUE(stopToken2.stopRequested()); } -} \ No newline at end of file +} From 07d8a70736f00812694c61244be541633d135dd1 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Mon, 6 Jan 2025 22:56:55 +0200 Subject: [PATCH 16/17] Incorporated signal and wait for signal code pieces into StartupBlock methods. - Changed StartupBlock to class and made all fields private. - Added signalStart() and waitForSignal() methods to simplify interactions. - StartupBlock is now single-use object. Once the block has been signaled, it cannot be reset. --- Pcap++/src/PfRingDevice.cpp | 48 ++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 673152c2dc..daaece5dd0 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -457,11 +457,31 @@ namespace pcpp namespace { - struct StartupBlock + /// @brief A block to signal when a thread is ready to start + class StartupBlock { - std::mutex startMutex; - std::condition_variable startCond; - bool startupReady = false; + public: + /// @brief Sets the ready flag to true and signals all waiting threads + void signalStart() + { + { + std::lock_guard lock(m_Mutex); + m_Ready = true; + } + m_CV.notify_all(); + } + + /// @brief Waits for the ready flag to be set. If it is already set, returns immediately. + void waitForSignal() + { + std::unique_lock lock(m_Mutex); + m_CV.wait(lock, [&] { return m_Ready; }); + } + + private: + std::mutex m_Mutex; + std::condition_variable m_CV; + bool m_Ready = false; }; struct PfRingCaptureThreadData @@ -490,11 +510,8 @@ namespace pcpp return; } - { - // Wait for the start signal - std::unique_lock lock(threadData.startupBlock->startMutex); - threadData.startupBlock->startCond.wait(lock, [&] { return threadData.startupBlock->startupReady; }); - } + // Wait for the startup block to be signaled + threadData.startupBlock->waitForSignal(); // Startup is complete, clear the startup block threadData.startupBlock = nullptr; @@ -616,11 +633,8 @@ namespace pcpp // Request stop and set the startup block to ready to prevent other threads from starting m_StopTokenSource.requestStop(); - { - std::lock_guard lock(startupBlock->startMutex); - startupBlock->startupReady = true; - } - startupBlock->startCond.notify_all(); + // Signal the startup block to unblock all threads so they can shutdown. + startupBlock->signalStart(); // Wait for all threads to stop for (int coreId2 = coreId; coreId2 >= 0; coreId2--) @@ -639,11 +653,7 @@ namespace pcpp } // Set the startup block to ready to start all threads - { - std::lock_guard lock(startupBlock->startMutex); - startupBlock->startupReady = true; - } - startupBlock->startCond.notify_all(); + startupBlock->signalStart(); return true; } From 379feac478cae55aaa5640fd6786a7f81497a891 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Mon, 6 Jan 2025 23:06:19 +0200 Subject: [PATCH 17/17] Simplified mutex lock. --- Pcap++/src/PfRingDevice.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index daaece5dd0..062daed17c 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -464,10 +464,8 @@ namespace pcpp /// @brief Sets the ready flag to true and signals all waiting threads void signalStart() { - { - std::lock_guard lock(m_Mutex); - m_Ready = true; - } + std::lock_guard lock(m_Mutex); + m_Ready = true; m_CV.notify_all(); }