diff --git a/worker/include/RTC/RTCP/CompoundPacket.hpp b/worker/include/RTC/RTCP/CompoundPacket.hpp index d950b81427..646049a237 100644 --- a/worker/include/RTC/RTCP/CompoundPacket.hpp +++ b/worker/include/RTC/RTCP/CompoundPacket.hpp @@ -2,6 +2,7 @@ #define MS_RTC_RTCP_COMPOUND_PACKET_HPP #include "common.hpp" +#include "Utils.hpp" #include "RTC/RTCP/ReceiverReport.hpp" #include "RTC/RTCP/Sdes.hpp" #include "RTC/RTCP/SenderReport.hpp" @@ -15,7 +16,14 @@ namespace RTC class CompoundPacket { public: - using UniquePtr = std::unique_ptr; + struct CompoundPacketDeleter + { + void operator()(CompoundPacket* packet) const; + }; + + using UniquePtr = std::unique_ptr; + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; static UniquePtr Create(); public: @@ -57,11 +65,11 @@ namespace RTC private: // Use `CompoundPacket::Create()` instead CompoundPacket() = default; - // Use `CompoundPacket::ReturnIntoPool()` instead + // Used by CompoundPacketDeleter ~CompoundPacket() = default; - friend struct std::default_delete; - static void ReturnIntoPool(CompoundPacket* packet); + friend struct CompoundPacketDeleter; + friend AllocatorTraits; private: uint8_t* header{ nullptr }; @@ -77,11 +85,18 @@ namespace RTC namespace std { template<> - struct default_delete + struct allocator_traits { - void operator()(RTC::RTCP::CompoundPacket* ptr) const + template + static void construct( + RTC::RTCP::CompoundPacket::Allocator& a, RTC::RTCP::CompoundPacket* p, Args&&... args) + { + new (p) RTC::RTCP::CompoundPacket(forward(args)...); + } + + static void destroy(RTC::RTCP::CompoundPacket::Allocator& a, RTC::RTCP::CompoundPacket* p) { - RTC::RTCP::CompoundPacket::ReturnIntoPool(ptr); + p->~CompoundPacket(); } }; }; // namespace std diff --git a/worker/include/RTC/RtpPacket.hpp b/worker/include/RTC/RtpPacket.hpp index ebf0e7c8a1..621eed7497 100644 --- a/worker/include/RTC/RtpPacket.hpp +++ b/worker/include/RTC/RtpPacket.hpp @@ -24,6 +24,11 @@ namespace RTC public: using RtpPacketBuffer = std::array; using SharedPtr = std::shared_ptr; + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + // Memory to hold the cloned packet (with extra space for RTX encoding). + using BufferAllocator = Utils::ObjectPoolAllocator; + using BufferAllocatorTraits = std::allocator_traits; /* Struct for RTP header. */ struct Header @@ -135,7 +140,7 @@ namespace RTC static SharedPtr Parse(const uint8_t* data, size_t len); - private: + public: RtpPacket( Header* header, HeaderExtension* headerExtension, @@ -609,7 +614,7 @@ namespace RTC void ShiftPayload(size_t payloadOffset, size_t shift, bool expand = true); private: - friend SharedPtr; + friend AllocatorTraits; void ParseExtensions(); @@ -643,5 +648,4 @@ namespace RTC RtpPacketBuffer* buffer{ nullptr }; }; } // namespace RTC - #endif diff --git a/worker/include/RTC/RtpStreamSend.hpp b/worker/include/RTC/RtpStreamSend.hpp index 647553dfc0..51db3270d5 100644 --- a/worker/include/RTC/RtpStreamSend.hpp +++ b/worker/include/RTC/RtpStreamSend.hpp @@ -20,6 +20,9 @@ namespace RTC public: struct StorageItem { + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + void Dump() const; // Original packet. diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index 243077f57a..5f66cd8f3b 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -61,6 +61,9 @@ namespace RTC #else struct OnSendCallbackCtx { + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + RTC::TransportCongestionControlClient* tccClient; webrtc::RtpPacketSendInfo packetInfo; }; diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index e782e5ce5e..56dff61ffc 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -351,51 +351,82 @@ namespace Utils } }; + // Simple implementation of object pool only for single objects + // Arrays are allocated as usual template - class ObjectPool + class ObjectPoolAllocator { + std::shared_ptr> pool_data; + public: - ~ObjectPool() + typedef T value_type; + thread_local static Utils::ObjectPoolAllocator Pool; + + ObjectPoolAllocator() { - for (auto ptr : this->pool) - { - std::free(ptr); - } + pool_data = std::shared_ptr>( + new std::vector(), + [](std::vector* pool) + { + for (auto* ptr : *pool) + { + std::free(ptr); + } + delete pool; + }); } - // Get pointer to allocated memory. This can be newly allocated memory or re-use of previously - // returned object. Object is not initialized and shouldn't be considered to be in a valid state. - T* Allocate() + template + ObjectPoolAllocator(const ObjectPoolAllocator& other) + : pool_data(ObjectPoolAllocator::Pool.pool_data) { - if (this->pool.empty()) + } + + ~ObjectPoolAllocator() + { + } + + T* allocate(size_t n) + { + if (n > 1) + { + return static_cast(std::malloc(sizeof(T) * n)); + } + + if (this->pool_data->empty()) { return static_cast(std::malloc(sizeof(T))); } - T* ptr = this->pool.back(); - this->pool.pop_back(); + T* ptr = this->pool_data->back(); + this->pool_data->pop_back(); return ptr; } - // Return allocated memory into internal pool for future use, make sure to run destructor before - // returning memory, ObjectPool will only de-allocate memory on exit. - void Return(T* ptr) + void deallocate(T* ptr, size_t n) { - if (ptr) + if (!ptr) + { + return; + } + + if (n > 1) { -#ifdef MS_MEM_POOL_FREE_ON_RETURN std::free(ptr); + return; + } + +#ifdef MS_MEM_POOL_FREE_ON_RETURN + std::free(ptr); #else - this->pool.push_back(ptr); + this->pool_data->push_back(ptr); #endif - } } - - private: - std::vector pool; }; + template + thread_local Utils::ObjectPoolAllocator Utils::ObjectPoolAllocator::Pool; } // namespace Utils #endif diff --git a/worker/src/RTC/RTCP/CompoundPacket.cpp b/worker/src/RTC/RTCP/CompoundPacket.cpp index c414b3ba38..cef998aa55 100644 --- a/worker/src/RTC/RTCP/CompoundPacket.cpp +++ b/worker/src/RTC/RTCP/CompoundPacket.cpp @@ -8,24 +8,13 @@ namespace RTC { namespace RTCP { - thread_local static Utils::ObjectPool CompoundPacketPool; - /* Instance methods. */ CompoundPacket::UniquePtr CompoundPacket::Create() { - auto* packet = CompoundPacketPool.Allocate(); - - return UniquePtr(new (packet) CompoundPacket()); - } - - void CompoundPacket::ReturnIntoPool(CompoundPacket* packet) - { - if (packet) - { - packet->~CompoundPacket(); - CompoundPacketPool.Return(packet); - } + auto* packet = CompoundPacket::Allocator::Pool.allocate(1); + CompoundPacket::AllocatorTraits::construct(CompoundPacket::Allocator::Pool, packet); + return UniquePtr(packet); } void CompoundPacket::Serialize(uint8_t* data) @@ -163,5 +152,14 @@ namespace RTC this->xrPacket.AddReport(report); } + + void CompoundPacket::CompoundPacketDeleter::operator()(CompoundPacket* packet) const + { + if (packet) + { + CompoundPacket::AllocatorTraits::destroy(CompoundPacket::Allocator::Pool, packet); + CompoundPacket::Allocator::Pool.deallocate(packet, 1); + } + } } // namespace RTCP } // namespace RTC diff --git a/worker/src/RTC/RtpPacket.cpp b/worker/src/RTC/RtpPacket.cpp index 23b20a2079..394364edcf 100644 --- a/worker/src/RTC/RtpPacket.cpp +++ b/worker/src/RTC/RtpPacket.cpp @@ -9,10 +9,6 @@ namespace RTC { - thread_local static Utils::ObjectPool RtpPacketPool; - // Memory to hold the cloned packet (with extra space for RTX encoding). - thread_local static Utils::ObjectPool RtpPacketBufferPool; - /* Class methods. */ RtpPacket::SharedPtr RtpPacket::Parse(const uint8_t* data, size_t len) @@ -121,21 +117,8 @@ namespace RTC payloadLength + size_t{ payloadPadding }, "packet's computed size does not match received size"); - auto* packet = RtpPacketPool.Allocate(); - new (packet) RtpPacket(header, headerExtension, payload, payloadLength, payloadPadding, len); - - SharedPtr shared( - packet, - /*Deleter*/ - [](RtpPacket* packet) - { - // Call destructor manually since memory was pre-allocated upfront. - packet->~RtpPacket(); - // Return packet into object pool for future reuse of memory allocation. - RtpPacketPool.Return(packet); - }); - - return shared; + return std::allocate_shared>( + RtpPacket::Allocator::Pool, header, headerExtension, payload, payloadLength, payloadPadding, len); } /* Instance methods. */ @@ -166,7 +149,7 @@ namespace RTC if (this->buffer) { this->buffer->~array(); - RtpPacketBufferPool.Return(this->buffer); + RtpPacket::BufferAllocator::Pool.deallocate(this->buffer, 1); this->buffer = nullptr; } } @@ -657,8 +640,8 @@ namespace RTC { MS_TRACE(); - auto* buffer = RtpPacketBufferPool.Allocate(); - new (buffer) RtpPacketBuffer(); + auto* buffer = RtpPacket::BufferAllocator::Pool.allocate(1); + RtpPacket::BufferAllocatorTraits::construct(RtpPacket::BufferAllocator::Pool, buffer); auto* ptr = const_cast(buffer->data()); size_t numBytes{ 0 }; @@ -714,35 +697,29 @@ namespace RTC } // Create the new RtpPacket instance and return it. - auto* packet = RtpPacketPool.Allocate(); - new (packet) RtpPacket( - newHeader, newHeaderExtension, newPayload, this->payloadLength, this->payloadPadding, this->size); - - SharedPtr shared( - packet, - /*Deleter*/ - [](RtpPacket* packet) - { - // Call destructor manually since memory was pre-allocated upfront. - packet->~RtpPacket(); - // Return packet into object pool for future reuse of memory allocation. - RtpPacketPool.Return(packet); - }); + SharedPtr shared = std::allocate_shared( + RtpPacket::Allocator::Pool, + newHeader, + newHeaderExtension, + newPayload, + this->payloadLength, + this->payloadPadding, + this->size); // Keep already set extension ids. - packet->midExtensionId = this->midExtensionId; - packet->ridExtensionId = this->ridExtensionId; - packet->rridExtensionId = this->rridExtensionId; - packet->absSendTimeExtensionId = this->absSendTimeExtensionId; - packet->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId; - packet->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC. - packet->frameMarkingExtensionId = this->frameMarkingExtensionId; - packet->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId; - packet->videoOrientationExtensionId = this->videoOrientationExtensionId; + shared->midExtensionId = this->midExtensionId; + shared->ridExtensionId = this->ridExtensionId; + shared->rridExtensionId = this->rridExtensionId; + shared->absSendTimeExtensionId = this->absSendTimeExtensionId; + shared->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId; + shared->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC. + shared->frameMarkingExtensionId = this->frameMarkingExtensionId; + shared->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId; + shared->videoOrientationExtensionId = this->videoOrientationExtensionId; // Clone payload descriptor handler. - packet->payloadDescriptorHandler = this->payloadDescriptorHandler; + shared->payloadDescriptorHandler = this->payloadDescriptorHandler; // Store allocated buffer. - packet->buffer = buffer; + shared->buffer = buffer; return shared; } diff --git a/worker/src/RTC/RtpStreamSend.cpp b/worker/src/RTC/RtpStreamSend.cpp index 87515c6e16..b6945fdd7a 100644 --- a/worker/src/RTC/RtpStreamSend.cpp +++ b/worker/src/RTC/RtpStreamSend.cpp @@ -10,8 +10,6 @@ namespace RTC { /* Static. */ - thread_local static Utils::ObjectPool StorageItemPool; - // 17: 16 bit mask + the initial sequence number. static constexpr size_t MaxRequestedPackets{ 17 }; thread_local static std::vector RetransmissionContainer( @@ -131,7 +129,7 @@ namespace RTC // Reset (free RTP packet) the old storage item. resetStorageItem(storageItem); // Return into the pool. - StorageItemPool.Return(storageItem); + StorageItem::Allocator::Pool.deallocate(storageItem, 1); this->buffer[0] = nullptr; @@ -154,7 +152,7 @@ namespace RTC // Reset (free RTP packet) the storage item. resetStorageItem(storageItem); // Return into the pool. - StorageItemPool.Return(storageItem); + StorageItem::Allocator::Pool.deallocate(storageItem, 1); } this->buffer.clear(); @@ -462,10 +460,10 @@ namespace RTC else { // Allocate a new storage item. - storageItem = StorageItemPool.Allocate(); + storageItem = StorageItem::Allocator::Pool.allocate(1); // Memory is not initialized in any way, reset it. Create a new StorageItem instance // in this memory. - new (storageItem) StorageItem{}; + StorageItem::AllocatorTraits::construct(StorageItem::Allocator::Pool, storageItem); MS_ASSERT(this->storageItemBuffer.Insert(seq, storageItem), "sequence number must be empty"); } diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index fad8bc804a..eaef57e9a1 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -29,7 +29,6 @@ namespace RTC { static size_t DefaultSctpSendBufferSize{ 262144 }; // 2^18. static size_t MaxSctpSendBufferSize{ 268435456 }; // 2^28. - thread_local static Utils::ObjectPool TransportOnSendCallbackCtxPool; #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR void Transport::OnSendCallback(bool sent, OnSendCallbackCtx* ctx) @@ -43,7 +42,7 @@ namespace RTC ctx->senderBwe->RtpPacketSent(ctx->sentInfo); } - TransportOnSendCallbackCtxPool.Return(ctx); + OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); } #else void Transport::OnSendCallback(bool sent, OnSendCallbackCtx* ctx) @@ -51,7 +50,7 @@ namespace RTC if (sent) ctx->tccClient->PacketSent(ctx->packetInfo, DepLibUV::GetTimeMsInt64()); - TransportOnSendCallbackCtxPool.Return(ctx); + OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); } #endif @@ -2606,7 +2605,8 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); - auto* ctx = TransportOnSendCallbackCtxPool.Allocate(); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2663,7 +2663,8 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); - auto* ctx = TransportOnSendCallbackCtxPool.Allocate(); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2987,7 +2988,8 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); - auto* ctx = TransportOnSendCallbackCtxPool.Allocate(); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; diff --git a/worker/test/src/RTC/TestNackGenerator.cpp b/worker/test/src/RTC/TestNackGenerator.cpp index d4dbccb0b0..af79b89879 100644 --- a/worker/test/src/RTC/TestNackGenerator.cpp +++ b/worker/test/src/RTC/TestNackGenerator.cpp @@ -115,11 +115,11 @@ uint8_t rtpBuffer[] = }; // clang-format on -// [pt:123, seq:21006, timestamp:1533790901] -auto packet = RtpPacket::Parse(rtpBuffer, sizeof(rtpBuffer)); - void validate(std::vector& inputs) { + // [pt:123, seq:21006, timestamp:1533790901] + auto packet = RtpPacket::Parse(rtpBuffer, sizeof(rtpBuffer)); + TestNackGeneratorListener listener; NackGenerator nackGenerator = NackGenerator(&listener);