diff --git a/examples/streamer/main.cpp b/examples/streamer/main.cpp index 75a509b36..e0a788315 100644 --- a/examples/streamer/main.cpp +++ b/examples/streamer/main.cpp @@ -20,6 +20,8 @@ #include "helpers.hpp" #include "ArgParser.hpp" +#include + using namespace rtc; using namespace std; using namespace std::chrono_literals; @@ -208,7 +210,7 @@ shared_ptr addVideo(const shared_ptr pc, const video.addSSRC(ssrc, cname, msid, cname); auto track = pc->addTrack(video); // create RTP configuration - auto rtpConfig = make_shared(ssrc, cname, payloadType, H264RtpPacketizer::defaultClockRate); + auto rtpConfig = make_shared(ssrc, cname, payloadType, H264RtpPacketizer::ClockRate); // create packetizer auto packetizer = make_shared(NalUnit::Separator::Length, rtpConfig); // add RTCP SR handler @@ -351,26 +353,11 @@ shared_ptr createStream(const string h264Samples, const unsigned fps, co for (auto clientTrack: tracks) { auto client = clientTrack.id; auto trackData = clientTrack.trackData; - auto rtpConfig = trackData->sender->rtpConfig; - - // sample time is in us, we need to convert it to seconds - auto elapsedSeconds = double(sampleTime) / (1000 * 1000); - // get elapsed time in clock rate - uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds); - // set new timestamp - rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp; - - // get elapsed time in clock rate from last RTCP sender report - auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->lastReportedTimestamp(); - // check if last report was at least 1 second ago - if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) { - trackData->sender->setNeedsToReport(); - } cout << "Sending " << streamType << " sample with size: " << to_string(sample.size()) << " to " << client << endl; try { // send sample - trackData->track->send(sample); + trackData->track->sendFrame(sample, std::chrono::duration(sampleTime)); } catch (const std::exception &e) { cerr << "Unable to send "<< streamType << " packet: " << e.what() << endl; } diff --git a/include/rtc/av1rtppacketizer.hpp b/include/rtc/av1rtppacketizer.hpp index b56a875f9..c2b2b79c3 100644 --- a/include/rtc/av1rtppacketizer.hpp +++ b/include/rtc/av1rtppacketizer.hpp @@ -20,8 +20,8 @@ namespace rtc { // RTP packetization of AV1 payload class RTC_CPP_EXPORT AV1RtpPacketizer final : public RtpPacketizer { public: - // Default clock rate for AV1 in RTP - inline static const uint32_t defaultClockRate = 90 * 1000; + inline static const uint32_t ClockRate = VideoClockRate; + [[deprecated("Use ClockRate")]] inline static const uint32_t defaultClockRate = ClockRate; // Define how OBUs are seperated in a AV1 Sample enum class Packetization { @@ -33,17 +33,18 @@ class RTC_CPP_EXPORT AV1RtpPacketizer final : public RtpPacketizer { // @note RTP configuration is used in packetization process which may change some configuration // properties such as sequence number. AV1RtpPacketizer(Packetization packetization, shared_ptr rtpConfig, - uint16_t maxFragmentSize = NalUnits::defaultMaximumFragmentSize); - - void outgoing(message_vector &messages, const message_callback &send) override; + size_t maxFragmentSize = DefaultMaxFragmentSize); private: - shared_ptr splitMessage(binary_ptr message); - std::vector> packetizeObu(binary_ptr message, uint16_t maxFragmentSize); + static std::vector extractTemporalUnitObus(const binary &data); + + std::vector fragment(binary data) override; + std::vector fragmentObu(const binary &data); + + const Packetization mPacketization; + const size_t mMaxFragmentSize; - const uint16_t maxFragmentSize; - const Packetization packetization; - std::shared_ptr sequenceHeader; + std::unique_ptr mSequenceHeader; }; // For backward compatibility, do not use diff --git a/include/rtc/common.hpp b/include/rtc/common.hpp index 08f981cb8..371f18f4b 100644 --- a/include/rtc/common.hpp +++ b/include/rtc/common.hpp @@ -67,7 +67,6 @@ using std::variant; using std::weak_ptr; using binary = std::vector; -using binary_ptr = shared_ptr; using message_variant = variant; using std::int16_t; diff --git a/include/rtc/frameinfo.hpp b/include/rtc/frameinfo.hpp index 022c430dd..b468b5d0f 100644 --- a/include/rtc/frameinfo.hpp +++ b/include/rtc/frameinfo.hpp @@ -11,12 +11,20 @@ #include "common.hpp" +#include + namespace rtc { struct RTC_CPP_EXPORT FrameInfo { - FrameInfo(uint8_t payloadType, uint32_t timestamp) : payloadType(payloadType), timestamp(timestamp){}; - uint8_t payloadType; // Indicates codec of the frame - uint32_t timestamp = 0; // RTP Timestamp + FrameInfo(uint32_t timestamp) : timestamp(timestamp) {}; + template> FrameInfo(std::chrono::duration timestamp) : timestampSeconds(timestamp) {}; + + [[deprecated]] FrameInfo(uint8_t payloadType, uint32_t timestamp) : timestamp(timestamp), payloadType(payloadType) {}; + + uint32_t timestamp = 0; + uint8_t payloadType = 0; + + optional> timestampSeconds; }; } // namespace rtc diff --git a/include/rtc/h264rtpdepacketizer.hpp b/include/rtc/h264rtpdepacketizer.hpp index e4fe9064e..045d8f04d 100644 --- a/include/rtc/h264rtpdepacketizer.hpp +++ b/include/rtc/h264rtpdepacketizer.hpp @@ -27,6 +27,8 @@ class RTC_CPP_EXPORT H264RtpDepacketizer : public MediaHandler { public: using Separator = NalUnit::Separator; + inline static const uint32_t ClockRate = 90 * 1000; + H264RtpDepacketizer(Separator separator = Separator::LongStartSequence); virtual ~H264RtpDepacketizer() = default; diff --git a/include/rtc/h264rtppacketizer.hpp b/include/rtc/h264rtppacketizer.hpp index 9aeb1147c..ffc5f2e7b 100644 --- a/include/rtc/h264rtppacketizer.hpp +++ b/include/rtc/h264rtppacketizer.hpp @@ -22,8 +22,8 @@ class RTC_CPP_EXPORT H264RtpPacketizer final : public RtpPacketizer { public: using Separator = NalUnit::Separator; - /// Default clock rate for H264 in RTP - inline static const uint32_t defaultClockRate = 90 * 1000; + inline static const uint32_t ClockRate = VideoClockRate; + [[deprecated("Use ClockRate")]] inline static const uint32_t defaultClockRate = ClockRate; /// Constructs h264 payload packetizer with given RTP configuration. /// @note RTP configuration is used in packetization process which may change some configuration @@ -32,20 +32,19 @@ class RTC_CPP_EXPORT H264RtpPacketizer final : public RtpPacketizer { /// @param rtpConfig RTP configuration /// @param maxFragmentSize maximum size of one NALU fragment H264RtpPacketizer(Separator separator, shared_ptr rtpConfig, - uint16_t maxFragmentSize = NalUnits::defaultMaximumFragmentSize); + size_t maxFragmentSize = DefaultMaxFragmentSize); // For backward compatibility, do not use [[deprecated]] H264RtpPacketizer( shared_ptr rtpConfig, - uint16_t maxFragmentSize = NalUnits::defaultMaximumFragmentSize); - - void outgoing(message_vector &messages, const message_callback &send) override; + size_t maxFragmentSize = DefaultMaxFragmentSize); private: - shared_ptr splitMessage(binary_ptr message); + std::vector fragment(binary data) override; + std::vector splitFrame(const binary &frame); - const uint16_t maxFragmentSize; - const Separator separator; + const Separator mSeparator; + const size_t mMaxFragmentSize; }; // For backward compatibility, do not use diff --git a/include/rtc/h265nalunit.hpp b/include/rtc/h265nalunit.hpp index b322fc6bd..ad40cf041 100644 --- a/include/rtc/h265nalunit.hpp +++ b/include/rtc/h265nalunit.hpp @@ -15,6 +15,7 @@ #include "nalunit.hpp" #include +#include namespace rtc { @@ -72,8 +73,13 @@ struct RTC_CPP_EXPORT H265NalUnitFragmentHeader { #pragma pack(pop) -/// Nal unit +struct H265NalUnitFragment; + +/// NAL unit struct RTC_CPP_EXPORT H265NalUnit : NalUnit { + static std::vector GenerateFragments(const std::vector &nalus, + size_t maxFragmentSize); + H265NalUnit(const H265NalUnit &unit) = default; H265NalUnit(size_t size, bool includingHeader = true) : NalUnit(size, includingHeader, NalUnit::Type::H265) {} @@ -104,6 +110,8 @@ struct RTC_CPP_EXPORT H265NalUnit : NalUnit { insert(end(), payload.begin(), payload.end()); } + std::vector generateFragments(size_t maxFragmentSize) const; + protected: const H265NalUnitHeader *header() const { assert(size() >= H265_NAL_HEADER_SIZE); @@ -116,9 +124,9 @@ struct RTC_CPP_EXPORT H265NalUnit : NalUnit { } }; -/// Nal unit fragment A +/// NAL unit fragment struct RTC_CPP_EXPORT H265NalUnitFragment : H265NalUnit { - static std::vector> fragmentsFrom(shared_ptr nalu, + [[deprecated]] static std::vector> fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSize); enum class FragmentType { Start, Middle, End }; @@ -171,7 +179,7 @@ struct RTC_CPP_EXPORT H265NalUnitFragment : H265NalUnit { } }; -class RTC_CPP_EXPORT H265NalUnits : public std::vector> { +class [[deprecated]] RTC_CPP_EXPORT H265NalUnits : public std::vector> { public: static const uint16_t defaultMaximumFragmentSize = uint16_t(RTC_DEFAULT_MTU - 12 - 8 - 40); // SRTP/UDP/IPv6 diff --git a/include/rtc/h265rtppacketizer.hpp b/include/rtc/h265rtppacketizer.hpp index b629c6aa7..e788bac0e 100644 --- a/include/rtc/h265rtppacketizer.hpp +++ b/include/rtc/h265rtppacketizer.hpp @@ -21,8 +21,8 @@ class RTC_CPP_EXPORT H265RtpPacketizer final : public RtpPacketizer { public: using Separator = NalUnit::Separator; - // Default clock rate for H265 in RTP - inline static const uint32_t defaultClockRate = 90 * 1000; + inline static const uint32_t ClockRate = VideoClockRate; + [[deprecated("Use ClockRate")]] inline static const uint32_t defaultClockRate = ClockRate; // Constructs h265 payload packetizer with given RTP configuration. // @note RTP configuration is used in packetization process which may change some configuration @@ -31,19 +31,18 @@ class RTC_CPP_EXPORT H265RtpPacketizer final : public RtpPacketizer { // @param rtpConfig RTP configuration // @param maxFragmentSize maximum size of one NALU fragment H265RtpPacketizer(Separator separator, shared_ptr rtpConfig, - uint16_t maxFragmentSize = H265NalUnits::defaultMaximumFragmentSize); + size_t maxFragmentSize = DefaultMaxFragmentSize); - // for backward compatibility + // For backward compatibility, do not use [[deprecated]] H265RtpPacketizer(shared_ptr rtpConfig, - uint16_t maxFragmentSize = H265NalUnits::defaultMaximumFragmentSize); - - void outgoing(message_vector &messages, const message_callback &send) override; + size_t maxFragmentSize = DefaultMaxFragmentSize); private: - shared_ptr splitMessage(binary_ptr message); + std::vector fragment(binary data) override; + std::vector splitFrame(const binary &frame); - const uint16_t maxFragmentSize; - const NalUnit::Separator separator; + const NalUnit::Separator mSeparator; + const size_t mMaxFragmentSize; }; // For backward compatibility, do not use diff --git a/include/rtc/message.hpp b/include/rtc/message.hpp index ee90b5ca4..52a7dbe7f 100644 --- a/include/rtc/message.hpp +++ b/include/rtc/message.hpp @@ -46,11 +46,26 @@ inline size_t message_size_func(const message_ptr &m) { template message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Message::Binary, - unsigned int stream = 0, shared_ptr reliability = nullptr, - shared_ptr frameInfo = nullptr) { + unsigned int stream = 0, shared_ptr reliability = nullptr) { auto message = std::make_shared(begin, end, type); message->stream = stream; message->reliability = reliability; + return message; +} + +template +message_ptr make_message(Iterator begin, Iterator end, shared_ptr frameInfo) { + auto message = std::make_shared(begin, end); + message->frameInfo = frameInfo; + return message; +} + +// For backward compatibiity, do not use +template +[[deprecated]] message_ptr make_message(Iterator begin, Iterator end, Message::Type type, + unsigned int stream, shared_ptr frameInfo) { + auto message = std::make_shared(begin, end, type); + message->stream = stream; message->frameInfo = frameInfo; return message; } @@ -61,8 +76,9 @@ RTC_CPP_EXPORT message_ptr make_message(size_t size, Message::Type type = Messag RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Message::Binary, unsigned int stream = 0, - shared_ptr reliability = nullptr, - shared_ptr frameInfo = nullptr); + shared_ptr reliability = nullptr); + +RTC_CPP_EXPORT message_ptr make_message(binary &&data, shared_ptr frameInfo); RTC_CPP_EXPORT message_ptr make_message(size_t size, message_ptr orig); diff --git a/include/rtc/nalunit.hpp b/include/rtc/nalunit.hpp index 0e7aa0bb6..ca8af8c7d 100644 --- a/include/rtc/nalunit.hpp +++ b/include/rtc/nalunit.hpp @@ -13,6 +13,7 @@ #include "common.hpp" +#include #include namespace rtc { @@ -61,15 +62,31 @@ enum NalUnitStartSequenceMatch { static const size_t H264_NAL_HEADER_SIZE = 1; static const size_t H265_NAL_HEADER_SIZE = 2; -/// Nal unit + +struct NalUnitFragmentA; + +/// NAL unit struct RTC_CPP_EXPORT NalUnit : binary { + static std::vector GenerateFragments(const std::vector &nalus, + size_t maxFragmentSize); + + enum class Separator { + Length = RTC_NAL_SEPARATOR_LENGTH, // first 4 bytes are NAL unit length + LongStartSequence = RTC_NAL_SEPARATOR_LONG_START_SEQUENCE, // 0x00, 0x00, 0x00, 0x01 + ShortStartSequence = RTC_NAL_SEPARATOR_SHORT_START_SEQUENCE, // 0x00, 0x00, 0x01 + StartSequence = RTC_NAL_SEPARATOR_START_SEQUENCE, // LongStartSequence or ShortStartSequence + }; + + static NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, + std::byte _byte, Separator separator); + enum class Type { H264, H265 }; NalUnit(const NalUnit &unit) = default; NalUnit(size_t size, bool includingHeader = true, Type type = Type::H264) - : binary(size + (includingHeader - ? 0 - : (type == Type::H264 ? H264_NAL_HEADER_SIZE : H265_NAL_HEADER_SIZE))) {} + : binary(size + (includingHeader ? 0 + : (type == Type::H264 ? H264_NAL_HEADER_SIZE + : H265_NAL_HEADER_SIZE))) {} NalUnit(binary &&data) : binary(std::move(data)) {} NalUnit(Type type = Type::H264) : binary(type == Type::H264 ? H264_NAL_HEADER_SIZE : H265_NAL_HEADER_SIZE) {} @@ -94,56 +111,7 @@ struct RTC_CPP_EXPORT NalUnit : binary { insert(end(), payload.begin(), payload.end()); } - /// NAL unit separator - enum class Separator { - Length = RTC_NAL_SEPARATOR_LENGTH, // first 4 bytes are NAL unit length - LongStartSequence = RTC_NAL_SEPARATOR_LONG_START_SEQUENCE, // 0x00, 0x00, 0x00, 0x01 - ShortStartSequence = RTC_NAL_SEPARATOR_SHORT_START_SEQUENCE, // 0x00, 0x00, 0x01 - StartSequence = RTC_NAL_SEPARATOR_START_SEQUENCE, // LongStartSequence or ShortStartSequence - }; - - static NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, - std::byte _byte, Separator separator) { - assert(separator != Separator::Length); - auto byte = (uint8_t)_byte; - auto detectShort = - separator == Separator::ShortStartSequence || separator == Separator::StartSequence; - auto detectLong = - separator == Separator::LongStartSequence || separator == Separator::StartSequence; - switch (match) { - case NUSM_noMatch: - if (byte == 0x00) { - return NUSM_firstZero; - } - break; - case NUSM_firstZero: - if (byte == 0x00) { - return NUSM_secondZero; - } - break; - case NUSM_secondZero: - if (byte == 0x00 && detectLong) { - return NUSM_thirdZero; - } else if (byte == 0x00 && detectShort) { - return NUSM_secondZero; - } else if (byte == 0x01 && detectShort) { - return NUSM_shortMatch; - } - break; - case NUSM_thirdZero: - if (byte == 0x00 && detectLong) { - return NUSM_thirdZero; - } else if (byte == 0x01 && detectLong) { - return NUSM_longMatch; - } - break; - case NUSM_shortMatch: - return NUSM_shortMatch; - case NUSM_longMatch: - return NUSM_longMatch; - } - return NUSM_noMatch; - } + std::vector generateFragments(size_t maxFragmentSize) const; protected: const NalUnitHeader *header() const { @@ -159,8 +127,9 @@ struct RTC_CPP_EXPORT NalUnit : binary { /// Nal unit fragment A struct RTC_CPP_EXPORT NalUnitFragmentA : NalUnit { - static std::vector> fragmentsFrom(shared_ptr nalu, - uint16_t maxFragmentSize); + // For backward compatibility, do not use + [[deprecated]] static std::vector> + fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSize); enum class FragmentType { Start, Middle, End }; @@ -212,7 +181,8 @@ struct RTC_CPP_EXPORT NalUnitFragmentA : NalUnit { } }; -class RTC_CPP_EXPORT NalUnits : public std::vector> { +// For backward compatibility, do not use +class [[deprecated]] RTC_CPP_EXPORT NalUnits : public std::vector> { public: static const uint16_t defaultMaximumFragmentSize = uint16_t(RTC_DEFAULT_MTU - 12 - 8 - 40); // SRTP/UDP/IPv6 diff --git a/include/rtc/rtc.h b/include/rtc/rtc.h index 2fc70c90c..c4a4e9f9e 100644 --- a/include/rtc/rtc.h +++ b/include/rtc/rtc.h @@ -430,9 +430,6 @@ RTC_C_EXPORT int rtcSetTrackRtpTimestamp(int id, uint32_t timestamp); // Get timestamp of last RTCP SR, result is written to timestamp RTC_C_EXPORT int rtcGetLastTrackSenderReportTimestamp(int id, uint32_t *timestamp); -// Set NeedsToReport flag in RtcpSrReporter handler identified by given track id -RTC_C_EXPORT int rtcSetNeedsToSendRtcpSr(int id); - // Get all available payload types for given codec and stores them in buffer, does nothing if // buffer is NULL int rtcGetTrackPayloadTypesForCodec(int tr, const char *ccodec, int *buffer, int size); @@ -450,6 +447,9 @@ int rtcGetSsrcsForType(const char *mediaType, const char *sdp, uint32_t *buffer, int rtcSetSsrcForType(const char *mediaType, const char *sdp, char *buffer, const int bufferSize, rtcSsrcForTypeInit *init); +// For backward compatibility, do not use +RTC_C_EXPORT RTC_DEPRECATED int rtcSetNeedsToSendRtcpSr(int id); + #endif // RTC_ENABLE_MEDIA #if RTC_ENABLE_WEBSOCKET diff --git a/include/rtc/rtcpnackresponder.hpp b/include/rtc/rtcpnackresponder.hpp index 71ced1320..e141c79ba 100644 --- a/include/rtc/rtcpnackresponder.hpp +++ b/include/rtc/rtcpnackresponder.hpp @@ -33,8 +33,8 @@ class RTC_CPP_EXPORT RtcpNackResponder final : public MediaHandler { /// Packet storage element struct RTC_CPP_EXPORT Element { - Element(binary_ptr packet, uint16_t sequenceNumber, shared_ptr next = nullptr); - const binary_ptr packet; + Element(message_ptr packet, uint16_t sequenceNumber, shared_ptr next = nullptr); + const message_ptr packet; const uint16_t sequenceNumber; /// Pointer to newer element shared_ptr next = nullptr; @@ -59,11 +59,11 @@ class RTC_CPP_EXPORT RtcpNackResponder final : public MediaHandler { Storage(size_t _maxSize); /// Returns packet with given sequence number - optional get(uint16_t sequenceNumber); + message_ptr get(uint16_t sequenceNumber); /// Stores packet /// @param packet Packet - void store(binary_ptr packet); + void store(message_ptr packet); }; const shared_ptr mStorage; diff --git a/include/rtc/rtcpsrreporter.hpp b/include/rtc/rtcpsrreporter.hpp index 1fb10a467..70f05c240 100644 --- a/include/rtc/rtcpsrreporter.hpp +++ b/include/rtc/rtcpsrreporter.hpp @@ -12,17 +12,20 @@ #if RTC_ENABLE_MEDIA #include "mediahandler.hpp" -#include "rtppacketizationconfig.hpp" #include "rtp.hpp" +#include "rtppacketizationconfig.hpp" + +#include namespace rtc { class RTC_CPP_EXPORT RtcpSrReporter final : public MediaHandler { public: RtcpSrReporter(shared_ptr rtpConfig); + ~RtcpSrReporter(); uint32_t lastReportedTimestamp() const; - void setNeedsToReport(); + [[deprecated]] void setNeedsToReport(); void outgoing(message_vector &messages, const message_callback &send) override; @@ -30,13 +33,13 @@ class RTC_CPP_EXPORT RtcpSrReporter final : public MediaHandler { const shared_ptr rtpConfig; private: - void addToReport(RtpHeader *rtp, uint32_t rtpSize); + void addToReport(RtpHeader *header, size_t size); message_ptr getSenderReport(uint32_t timestamp); uint32_t mPacketCount = 0; uint32_t mPayloadOctets = 0; uint32_t mLastReportedTimestamp = 0; - bool mNeedsToReport = false; + std::chrono::steady_clock::time_point mLastReportTime; }; } // namespace rtc diff --git a/include/rtc/rtpdepacketizer.hpp b/include/rtc/rtpdepacketizer.hpp index feed8479f..5a007528e 100644 --- a/include/rtc/rtpdepacketizer.hpp +++ b/include/rtc/rtpdepacketizer.hpp @@ -18,10 +18,14 @@ namespace rtc { class RTC_CPP_EXPORT RtpDepacketizer : public MediaHandler { public: - RtpDepacketizer() = default; - virtual ~RtpDepacketizer() = default; + RtpDepacketizer(); + RtpDepacketizer(uint32_t clockRate); + virtual ~RtpDepacketizer(); virtual void incoming(message_vector &messages, const message_callback &send) override; + +private: + uint32_t mClockRate; }; using OpusRtpDepacketizer = RtpDepacketizer; diff --git a/include/rtc/rtppacketizer.hpp b/include/rtc/rtppacketizer.hpp index 99839b9d4..b9e006396 100644 --- a/include/rtc/rtppacketizer.hpp +++ b/include/rtc/rtppacketizer.hpp @@ -20,6 +20,12 @@ namespace rtc { /// RTP packetizer class RTC_CPP_EXPORT RtpPacketizer : public MediaHandler { public: + /// Default maximum fragment size (for video packetizers) + inline static const size_t DefaultMaxFragmentSize = RTC_DEFAULT_MAX_FRAGMENT_SIZE; + + /// Clock rate for video in RTP + inline static const uint32_t VideoClockRate = 90 * 1000; + /// Constructs packetizer with given RTP configuration /// @note RTP configuration is used in packetization process which may change some configuration /// properties such as sequence number. @@ -34,11 +40,19 @@ class RTC_CPP_EXPORT RtpPacketizer : public MediaHandler { const shared_ptr rtpConfig; protected: - /// Creates RTP packet for given payload - /// @note This function increase sequence number after packetization. + /// Fragment data into payloads + /// Default implementation returns data as a single payload + /// @param message Input data + virtual std::vector fragment(binary data); + + /// Creates an RTP packet for a payload + /// @note This function increases the sequence number. /// @param payload RTP payload - /// @param setMark Set marker flag in RTP packet if true - virtual message_ptr packetize(shared_ptr payload, bool mark); + /// @param mark Set marker flag in RTP packet if true + virtual message_ptr packetize(const binary &payload, bool mark); + + // For backward compatibility, do not use + [[deprecated]] virtual message_ptr packetize(shared_ptr payload, bool mark); private: static const auto RtpHeaderSize = 12; @@ -60,6 +74,8 @@ class RTC_CPP_EXPORT AudioRtpPacketizer final : public RtpPacketizer { // Audio RTP packetizers using OpusRtpPacketizer = AudioRtpPacketizer<48000>; using AACRtpPacketizer = AudioRtpPacketizer<48000>; +using PCMARtpPacketizer = AudioRtpPacketizer<8000>; +using PCMURtpPacketizer = AudioRtpPacketizer<8000>; // Dummy wrapper for backward compatibility, do not use class RTC_CPP_EXPORT PacketizationHandler final : public MediaHandler { diff --git a/include/rtc/track.hpp b/include/rtc/track.hpp index 554c578ce..883f9a7bc 100644 --- a/include/rtc/track.hpp +++ b/include/rtc/track.hpp @@ -41,7 +41,8 @@ class RTC_CPP_EXPORT Track final : private CheshireCat, public Chan bool isClosed(void) const override; size_t maxMessageSize() const override; - void onFrame(std::function callback); + void sendFrame(binary data, FrameInfo info); + void onFrame(std::function callback); bool requestKeyframe(); bool requestBitrate(unsigned int bitrate); diff --git a/src/av1rtppacketizer.cpp b/src/av1rtppacketizer.cpp index 0664812fe..2204c3377 100644 --- a/src/av1rtppacketizer.cpp +++ b/src/av1rtppacketizer.cpp @@ -40,34 +40,39 @@ const auto oneByteLeb128Size = 1; const uint8_t sevenLsbBitmask = 0b01111111; const uint8_t msbBitmask = 0b10000000; -std::vector extractTemporalUnitObus(binary_ptr message) { - std::vector> obus{}; +AV1RtpPacketizer::AV1RtpPacketizer(Packetization packetization, + shared_ptr rtpConfig, + size_t maxFragmentSize) + : RtpPacketizer(rtpConfig), mPacketization(packetization), mMaxFragmentSize(maxFragmentSize) {} + +std::vector AV1RtpPacketizer::extractTemporalUnitObus(const binary &data) { + std::vector obus; - if (message->size() <= 2 || (message->at(0) != obuTemporalUnitDelimiter.at(0)) || - (message->at(1) != obuTemporalUnitDelimiter.at(1))) { - return obus; + if (data.size() <= 2 || (data.at(0) != obuTemporalUnitDelimiter.at(0)) || + (data.at(1) != obuTemporalUnitDelimiter.at(1))) { + return {}; } - size_t messageIndex = 2; - while (messageIndex < message->size()) { - if ((message->at(messageIndex) & obuHasSizeMask) == byte(0)) { + size_t index = 2; + while (index < data.size()) { + if ((data.at(index) & obuHasSizeMask) == byte(0)) { return obus; } - if ((message->at(messageIndex) & obuHasExtensionMask) != byte(0)) { - messageIndex++; + if ((data.at(index) & obuHasExtensionMask) != byte(0)) { + index++; } // https://aomediacodec.github.io/av1-spec/#leb128 uint32_t obuLength = 0; uint8_t leb128Size = 0; while (leb128Size < 8) { - auto leb128Index = messageIndex + leb128Size + obuHeaderSize; - if (message->size() < leb128Index) { + auto leb128Index = index + leb128Size + obuHeaderSize; + if (data.size() < leb128Index) { break; } - auto leb128_byte = uint8_t(message->at(leb128Index)); + auto leb128_byte = uint8_t(data.at(leb128Index)); obuLength |= ((leb128_byte & sevenLsbBitmask) << (leb128Size * 7)); leb128Size++; @@ -77,16 +82,31 @@ std::vector extractTemporalUnitObus(binary_ptr message) { } } - obus.push_back(std::make_shared(message->begin() + messageIndex, - message->begin() + messageIndex + obuHeaderSize + - leb128Size + obuLength)); + obus.emplace_back(data.begin() + index, + data.begin() + index + obuHeaderSize + leb128Size + obuLength); - messageIndex += obuHeaderSize + leb128Size + obuLength; + index += obuHeaderSize + leb128Size + obuLength; } return obus; } +std::vector AV1RtpPacketizer::fragment(binary data) { + if (mPacketization == AV1RtpPacketizer::Packetization::TemporalUnit) { + std::vector result; + auto obus = extractTemporalUnitObus(data); + for (auto obu : obus) { + auto fragments = fragmentObu(obu); + result.reserve(result.size() + fragments.size()); + for(auto &fragment : fragments) + fragments.push_back(std::move(fragment)); + } + return result; + } else { + return fragmentObu(data); + } +} + /* * 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+ @@ -118,110 +138,71 @@ std::vector extractTemporalUnitObus(binary_ptr message) { * **/ -std::vector AV1RtpPacketizer::packetizeObu(binary_ptr message, - uint16_t maxFragmentSize) { - - std::vector> payloads{}; - size_t messageIndex = 0; +std::vector AV1RtpPacketizer::fragmentObu(const binary &data) { + std::vector payloads; - if (message->size() < 1) { - return payloads; - } + if (data.size() < 1) + return {}; // Cache sequence header and packetize with next OBU - auto frameType = (message->at(0) & obuFrameTypeMask) >> obuFrameTypeBitshift; + auto frameType = (data.at(0) & obuFrameTypeMask) >> obuFrameTypeBitshift; if (frameType == obuFrameTypeSequenceHeader) { - sequenceHeader = std::make_shared(message->begin(), message->end()); - return payloads; + mSequenceHeader = std::make_unique(data.begin(), data.end()); + return {}; } - size_t messageRemaining = message->size(); - while (messageRemaining > 0) { - auto obuCount = 1; - auto metadataSize = payloadHeaderSize; + size_t index = 0; + size_t remaining = data.size(); + while (remaining > 0) { + size_t obuCount = 1; + size_t metadataSize = payloadHeaderSize; - if (sequenceHeader != nullptr) { + if (mSequenceHeader) { obuCount++; - metadataSize += /* 1 byte leb128 */ 1 + int(sequenceHeader->size()); + metadataSize += 1 + int(mSequenceHeader->size()); // 1 byte leb128 } - auto payload = std::make_shared( - std::min(size_t(maxFragmentSize), messageRemaining + metadataSize)); - auto payloadOffset = payloadHeaderSize; + binary payload(std::min(size_t(mMaxFragmentSize), remaining + metadataSize)); + size_t payloadOffset = payloadHeaderSize; - payload->at(0) = byte(obuCount) << wBitshift; + payload.at(0) = byte(obuCount) << wBitshift; // Packetize cached SequenceHeader if (obuCount == 2) { - payload->at(0) ^= nMask; - payload->at(1) = byte(sequenceHeader->size() & sevenLsbBitmask); + payload.at(0) ^= nMask; + payload.at(1) = byte(mSequenceHeader->size() & sevenLsbBitmask); payloadOffset += oneByteLeb128Size; - std::memcpy(payload->data() + payloadOffset, sequenceHeader->data(), - sequenceHeader->size()); - payloadOffset += int(sequenceHeader->size()); + std::memcpy(payload.data() + payloadOffset, mSequenceHeader->data(), + mSequenceHeader->size()); + payloadOffset += int(mSequenceHeader->size()); - sequenceHeader = nullptr; + mSequenceHeader = nullptr; } // Copy as much of OBU as possible into Payload - auto payloadRemaining = payload->size() - payloadOffset; - std::memcpy(payload->data() + payloadOffset, message->data() + messageIndex, + size_t payloadRemaining = payload.size() - payloadOffset; + std::memcpy(payload.data() + payloadOffset, data.data() + index, payloadRemaining); - messageRemaining -= payloadRemaining; - messageIndex += payloadRemaining; + remaining -= payloadRemaining; + index += payloadRemaining; // Does this Fragment contain an OBU that started in a previous payload if (payloads.size() > 0) { - payload->at(0) ^= zMask; + payload.at(0) ^= zMask; } // This OBU will be continued in next Payload - if (messageIndex < message->size()) { - payload->at(0) ^= yMask; + if (index < data.size()) { + payload.at(0) ^= yMask; } - payloads.push_back(payload); + payloads.push_back(std::move(payload)); } return payloads; } -AV1RtpPacketizer::AV1RtpPacketizer(AV1RtpPacketizer::Packetization packetization, - shared_ptr rtpConfig, - uint16_t maxFragmentSize) - : RtpPacketizer(rtpConfig), maxFragmentSize(maxFragmentSize), - packetization(packetization) {} - -void AV1RtpPacketizer::outgoing(message_vector &messages, - [[maybe_unused]] const message_callback &send) { - message_vector result; - for (const auto &message : messages) { - std::vector obus; - if (packetization == AV1RtpPacketizer::Packetization::TemporalUnit) { - obus = extractTemporalUnitObus(message); - } else { - obus.push_back(message); - } - - std::vector fragments; - for (auto obu : obus) { - auto p = packetizeObu(obu, maxFragmentSize); - fragments.insert(fragments.end(), p.begin(), p.end()); - } - - if (fragments.size() == 0) - continue; - - for (size_t i = 0; i < fragments.size() - 1; i++) - result.push_back(packetize(fragments[i], false)); - - result.push_back(packetize(fragments[fragments.size() - 1], true)); - } - - messages.swap(result); -} - } // namespace rtc #endif /* RTC_ENABLE_MEDIA */ diff --git a/src/capi.cpp b/src/capi.cpp index ead5cedf3..4fc6ce144 100644 --- a/src/capi.cpp +++ b/src/capi.cpp @@ -1396,14 +1396,6 @@ int rtcGetLastTrackSenderReportTimestamp(int id, uint32_t *timestamp) { }); } -int rtcSetNeedsToSendRtcpSr(int id) { - return wrap([id] { - auto sender = getRtcpSrReporter(id); - sender->setNeedsToReport(); - return RTC_ERR_SUCCESS; - }); -} - int rtcGetTrackPayloadTypesForCodec(int tr, const char *ccodec, int *buffer, int size) { return wrap([&] { auto track = getTrack(tr); @@ -1480,6 +1472,11 @@ int rtcSetSsrcForType(const char *mediaType, const char *sdp, char *buffer, cons }); } +int rtcSetNeedsToSendRtcpSr(int) { + // Dummy + return RTC_ERR_SUCCESS; +} + #endif // RTC_ENABLE_MEDIA #if RTC_ENABLE_WEBSOCKET diff --git a/src/h264rtpdepacketizer.cpp b/src/h264rtpdepacketizer.cpp index 2ee7fd38e..42c601f91 100644 --- a/src/h264rtpdepacketizer.cpp +++ b/src/h264rtpdepacketizer.cpp @@ -14,6 +14,7 @@ #include "impl/internals.hpp" #include +#include namespace rtc { @@ -45,7 +46,6 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin, uint32_t timestamp) { message_vector out = {}; auto accessUnit = binary{}; - auto frameInfo = std::make_shared(payloadType, timestamp); for (auto it = begin; it != end; ++it) { auto pkt = it->get(); @@ -109,8 +109,10 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin, } if (!accessUnit.empty()) { - out.emplace_back( - make_message(std::move(accessUnit), Message::Binary, 0, nullptr, frameInfo)); + auto frameInfo = std::make_shared(timestamp); + frameInfo->timestampSeconds = std::chrono::duration(double(timestamp) / double(ClockRate)); + frameInfo->payloadType = payloadType; + out.emplace_back(make_message(std::move(accessUnit), frameInfo)); } return out; diff --git a/src/h264rtppacketizer.cpp b/src/h264rtppacketizer.cpp index f1f3fd8d8..07bc13102 100644 --- a/src/h264rtppacketizer.cpp +++ b/src/h264rtppacketizer.cpp @@ -23,37 +23,50 @@ namespace rtc { -shared_ptr H264RtpPacketizer::splitMessage(binary_ptr message) { - auto nalus = std::make_shared(); - if (separator == Separator::Length) { +H264RtpPacketizer::H264RtpPacketizer(shared_ptr rtpConfig, + size_t maxFragmentSize) + : RtpPacketizer(std::move(rtpConfig)), mSeparator(Separator::Length), mMaxFragmentSize(maxFragmentSize) {} + +H264RtpPacketizer::H264RtpPacketizer(Separator separator, + shared_ptr rtpConfig, + size_t maxFragmentSize) + : RtpPacketizer(rtpConfig), mSeparator(separator), mMaxFragmentSize(maxFragmentSize) {} + +std::vector H264RtpPacketizer::fragment(binary data) { + return NalUnit::GenerateFragments(splitFrame(data), mMaxFragmentSize); +} + +std::vector H264RtpPacketizer::splitFrame(const binary &frame) { + std::vector nalus; + if (mSeparator == Separator::Length) { size_t index = 0; - while (index < message->size()) { - assert(index + 4 < message->size()); - if (index + 4 >= message->size()) { + while (index < frame.size()) { + assert(index + 4 < frame.size()); + if (index + 4 >= frame.size()) { LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!"; break; } uint32_t length; - std::memcpy(&length, message->data() + index, sizeof(uint32_t)); + std::memcpy(&length, frame.data() + index, sizeof(uint32_t)); length = ntohl(length); auto naluStartIndex = index + 4; auto naluEndIndex = naluStartIndex + length; - assert(naluEndIndex <= message->size()); - if (naluEndIndex > message->size()) { + assert(naluEndIndex <= frame.size()); + if (naluEndIndex > frame.size()) { LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!"; break; } - auto begin = message->begin() + naluStartIndex; - auto end = message->begin() + naluEndIndex; - nalus->push_back(std::make_shared(begin, end)); + auto begin = frame.begin() + naluStartIndex; + auto end = frame.begin() + naluEndIndex; + nalus.emplace_back(begin, end); index = naluEndIndex; } } else { NalUnitStartSequenceMatch match = NUSM_noMatch; size_t index = 0; - while (index < message->size()) { - match = NalUnit::StartSequenceMatchSucc(match, (*message)[index++], separator); + while (index < frame.size()) { + match = NalUnit::StartSequenceMatchSucc(match, frame[index++], mSeparator); if (match == NUSM_longMatch || match == NUSM_shortMatch) { match = NUSM_noMatch; break; @@ -62,53 +75,26 @@ shared_ptr H264RtpPacketizer::splitMessage(binary_ptr message) { size_t naluStartIndex = index; - while (index < message->size()) { - match = NalUnit::StartSequenceMatchSucc(match, (*message)[index], separator); + while (index < frame.size()) { + match = NalUnit::StartSequenceMatchSucc(match, frame[index], mSeparator); if (match == NUSM_longMatch || match == NUSM_shortMatch) { auto sequenceLength = match == NUSM_longMatch ? 4 : 3; size_t naluEndIndex = index - sequenceLength; match = NUSM_noMatch; - auto begin = message->begin() + naluStartIndex; - auto end = message->begin() + naluEndIndex + 1; - nalus->push_back(std::make_shared(begin, end)); + auto begin = frame.begin() + naluStartIndex; + auto end = frame.begin() + naluEndIndex + 1; + nalus.emplace_back(begin, end); naluStartIndex = index + 1; } index++; } - auto begin = message->begin() + naluStartIndex; - auto end = message->end(); - nalus->push_back(std::make_shared(begin, end)); + auto begin = frame.begin() + naluStartIndex; + auto end = frame.end(); + nalus.emplace_back(begin, end); } return nalus; } -H264RtpPacketizer::H264RtpPacketizer(shared_ptr rtpConfig, - uint16_t maxFragmentSize) - : RtpPacketizer(std::move(rtpConfig)), maxFragmentSize(maxFragmentSize), - separator(Separator::Length) {} - -H264RtpPacketizer::H264RtpPacketizer(Separator separator, - shared_ptr rtpConfig, - uint16_t maxFragmentSize) - : RtpPacketizer(rtpConfig), maxFragmentSize(maxFragmentSize), separator(separator) {} - -void H264RtpPacketizer::outgoing(message_vector &messages, [[maybe_unused]] const message_callback &send) { - message_vector result; - for(const auto &message : messages) { - auto nalus = splitMessage(message); - auto fragments = nalus->generateFragments(maxFragmentSize); - if (fragments.size() == 0) - continue; - - for (size_t i = 0; i < fragments.size() - 1; i++) - result.push_back(packetize(fragments[i], false)); - - result.push_back(packetize(fragments[fragments.size() - 1], true)); - } - - messages.swap(result); -} - } // namespace rtc #endif /* RTC_ENABLE_MEDIA */ diff --git a/src/h265nalunit.cpp b/src/h265nalunit.cpp index 5fda10545..ad69220a0 100644 --- a/src/h265nalunit.cpp +++ b/src/h265nalunit.cpp @@ -16,35 +16,39 @@ namespace rtc { -H265NalUnitFragment::H265NalUnitFragment(FragmentType type, bool forbiddenBit, uint8_t nuhLayerId, - uint8_t nuhTempIdPlus1, uint8_t unitType, binary data) - : H265NalUnit(data.size() + H265_NAL_HEADER_SIZE + H265_FU_HEADER_SIZE) { - setForbiddenBit(forbiddenBit); - setNuhLayerId(nuhLayerId); - setNuhTempIdPlus1(nuhTempIdPlus1); - fragmentIndicator()->setUnitType(H265NalUnitFragment::nal_type_fu); - setFragmentType(type); - setUnitType(unitType); - copy(data.begin(), data.end(), begin() + H265_NAL_HEADER_SIZE + H265_FU_HEADER_SIZE); +std::vector H265NalUnit::GenerateFragments(const std::vector &nalus, + size_t maxFragmentSize) { + std::vector result; + for (auto nalu : nalus) { + if (nalu.size() > maxFragmentSize) { + auto fragments = nalu.generateFragments(maxFragmentSize); + result.insert(result.end(), fragments.begin(), fragments.end()); + } else { + // TODO: check + result.push_back(nalu); + } + } + return result; } -std::vector> -H265NalUnitFragment::fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSize) { - assert(nalu->size() > maxFragmentSize); - auto fragments_count = ceil(double(nalu->size()) / maxFragmentSize); - maxFragmentSize = uint16_t(int(ceil(nalu->size() / fragments_count))); +std::vector H265NalUnit::generateFragments(size_t maxFragmentSize) const { + // TODO: check + assert(size() > maxFragmentSize); + auto fragments_count = ceil(double(size()) / maxFragmentSize); + maxFragmentSize = uint16_t(int(ceil(size() / fragments_count))); // 3 bytes for FU indicator and FU header maxFragmentSize -= (H265_NAL_HEADER_SIZE + H265_FU_HEADER_SIZE); - auto f = nalu->forbiddenBit(); - uint8_t nuhLayerId = nalu->nuhLayerId() & 0x3F; // 6 bits - uint8_t nuhTempIdPlus1 = nalu->nuhTempIdPlus1() & 0x7; // 3 bits - uint8_t naluType = nalu->unitType() & 0x3F; // 6 bits - auto payload = nalu->payload(); - vector> result{}; + bool forbiddenBit = this->forbiddenBit(); + uint8_t nuhLayerId = this->nuhLayerId() & 0x3F; // 6 bits + uint8_t nuhTempIdPlus1 = this->nuhTempIdPlus1() & 0x7; // 3 bits + uint8_t naluType = this->unitType() & 0x3F; // 6 bits + auto payload = this->payload(); + vector result; uint64_t offset = 0; while (offset < payload.size()) { vector fragmentData; + using FragmentType = H265NalUnitFragment::FragmentType; FragmentType fragmentType; if (offset == 0) { fragmentType = FragmentType::Start; @@ -57,14 +61,36 @@ H265NalUnitFragment::fragmentsFrom(shared_ptr nalu, uint16_t maxFra fragmentType = FragmentType::End; } fragmentData = {payload.begin() + offset, payload.begin() + offset + maxFragmentSize}; - auto fragment = std::make_shared( - fragmentType, f, nuhLayerId, nuhTempIdPlus1, naluType, fragmentData); - result.push_back(fragment); + result.emplace_back(fragmentType, forbiddenBit, nuhLayerId, nuhTempIdPlus1, naluType, + fragmentData); offset += maxFragmentSize; } return result; } +H265NalUnitFragment::H265NalUnitFragment(FragmentType type, bool forbiddenBit, uint8_t nuhLayerId, + uint8_t nuhTempIdPlus1, uint8_t unitType, binary data) + : H265NalUnit(data.size() + H265_NAL_HEADER_SIZE + H265_FU_HEADER_SIZE) { + setForbiddenBit(forbiddenBit); + setNuhLayerId(nuhLayerId); + setNuhTempIdPlus1(nuhTempIdPlus1); + fragmentIndicator()->setUnitType(H265NalUnitFragment::nal_type_fu); + setFragmentType(type); + setUnitType(unitType); + copy(data.begin(), data.end(), begin() + H265_NAL_HEADER_SIZE + H265_FU_HEADER_SIZE); +} + +std::vector> +H265NalUnitFragment::fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSize) { + auto fragments = nalu->generateFragments(maxFragmentSize); + std::vector> result; + result.reserve(fragments.size()); + for (auto fragment : fragments) + result.push_back(std::make_shared(std::move(fragment))); + + return result; +} + void H265NalUnitFragment::setFragmentType(FragmentType type) { switch (type) { case FragmentType::Start: @@ -82,16 +108,16 @@ void H265NalUnitFragment::setFragmentType(FragmentType type) { } std::vector> H265NalUnits::generateFragments(uint16_t maxFragmentSize) { - vector> result{}; - for (auto nalu : *this) { - if (nalu->size() > maxFragmentSize) { - std::vector> fragments = - H265NalUnitFragment::fragmentsFrom(nalu, maxFragmentSize); - result.insert(result.end(), fragments.begin(), fragments.end()); - } else { - result.push_back(nalu); - } - } + std::vector nalus; + for (auto nalu : *this) + nalus.push_back(*nalu); + + auto fragments = H265NalUnit::GenerateFragments(nalus, maxFragmentSize); + std::vector> result; + result.reserve(fragments.size()); + for (auto fragment : fragments) + result.push_back(std::make_shared(std::move(fragment))); + return result; } diff --git a/src/h265rtppacketizer.cpp b/src/h265rtppacketizer.cpp index e4c60c79a..4061551cd 100644 --- a/src/h265rtppacketizer.cpp +++ b/src/h265rtppacketizer.cpp @@ -22,37 +22,51 @@ namespace rtc { -shared_ptr H265RtpPacketizer::splitMessage(binary_ptr message) { - auto nalus = std::make_shared(); - if (separator == NalUnit::Separator::Length) { +H265RtpPacketizer::H265RtpPacketizer(shared_ptr rtpConfig, + size_t maxFragmentSize) + : RtpPacketizer(std::move(rtpConfig)), mSeparator(NalUnit::Separator::Length), + mMaxFragmentSize(maxFragmentSize) {} + +H265RtpPacketizer::H265RtpPacketizer(NalUnit::Separator separator, + shared_ptr rtpConfig, + size_t maxFragmentSize) + : RtpPacketizer(std::move(rtpConfig)), mSeparator(separator), mMaxFragmentSize(maxFragmentSize) {} + +std::vector H265RtpPacketizer::fragment(binary data) { + return H265NalUnit::GenerateFragments(splitFrame(data), mMaxFragmentSize); +} + +std::vector H265RtpPacketizer::splitFrame(const binary &frame) { + std::vector nalus; + if (mSeparator == NalUnit::Separator::Length) { size_t index = 0; - while (index < message->size()) { - assert(index + 4 < message->size()); - if (index + 4 >= message->size()) { + while (index < frame.size()) { + assert(index + 4 < frame.size()); + if (index + 4 >= frame.size()) { LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!"; break; } uint32_t length; - std::memcpy(&length, message->data() + index, sizeof(uint32_t)); + std::memcpy(&length, frame.data() + index, sizeof(uint32_t)); length = ntohl(length); auto naluStartIndex = index + 4; auto naluEndIndex = naluStartIndex + length; - assert(naluEndIndex <= message->size()); - if (naluEndIndex > message->size()) { + assert(naluEndIndex <= frame.size()); + if (naluEndIndex > frame.size()) { LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!"; break; } - auto begin = message->begin() + naluStartIndex; - auto end = message->begin() + naluEndIndex; - nalus->push_back(std::make_shared(begin, end)); + auto begin = frame.begin() + naluStartIndex; + auto end = frame.begin() + naluEndIndex; + nalus.emplace_back(begin, end); index = naluEndIndex; } } else { NalUnitStartSequenceMatch match = NUSM_noMatch; size_t index = 0; - while (index < message->size()) { - match = NalUnit::StartSequenceMatchSucc(match, (*message)[index++], separator); + while (index < frame.size()) { + match = NalUnit::StartSequenceMatchSucc(match, frame[index++], mSeparator); if (match == NUSM_longMatch || match == NUSM_shortMatch) { match = NUSM_noMatch; break; @@ -61,54 +75,26 @@ shared_ptr H265RtpPacketizer::splitMessage(binary_ptr message) { size_t naluStartIndex = index; - while (index < message->size()) { - match = NalUnit::StartSequenceMatchSucc(match, (*message)[index], separator); + while (index < frame.size()) { + match = NalUnit::StartSequenceMatchSucc(match, frame[index], mSeparator); if (match == NUSM_longMatch || match == NUSM_shortMatch) { auto sequenceLength = match == NUSM_longMatch ? 4 : 3; size_t naluEndIndex = index - sequenceLength; match = NUSM_noMatch; - auto begin = message->begin() + naluStartIndex; - auto end = message->begin() + naluEndIndex + 1; - nalus->push_back(std::make_shared(begin, end)); + auto begin = frame.begin() + naluStartIndex; + auto end = frame.begin() + naluEndIndex + 1; + nalus.emplace_back(begin, end); naluStartIndex = index + 1; } index++; } - auto begin = message->begin() + naluStartIndex; - auto end = message->end(); - nalus->push_back(std::make_shared(begin, end)); + auto begin = frame.begin() + naluStartIndex; + auto end = frame.end(); + nalus.emplace_back(begin, end); } return nalus; } -H265RtpPacketizer::H265RtpPacketizer(shared_ptr rtpConfig, - uint16_t maxFragmentSize) - : RtpPacketizer(std::move(rtpConfig)), maxFragmentSize(maxFragmentSize), - separator(NalUnit::Separator::Length) {} - -H265RtpPacketizer::H265RtpPacketizer(NalUnit::Separator separator, - shared_ptr rtpConfig, - uint16_t maxFragmentSize) - : RtpPacketizer(std::move(rtpConfig)), maxFragmentSize(maxFragmentSize), - separator(separator) {} - -void H265RtpPacketizer::outgoing(message_vector &messages, [[maybe_unused]] const message_callback &send) { - message_vector result; - for (const auto &message : messages) { - auto nalus = splitMessage(message); - auto fragments = nalus->generateFragments(maxFragmentSize); - if (fragments.size() == 0) - continue; - - for (size_t i = 0; i < fragments.size() - 1; i++) - result.push_back(packetize(fragments[i], false)); - - result.push_back(packetize(fragments[fragments.size() - 1], true)); - } - - messages.swap(result); -} - } // namespace rtc #endif /* RTC_ENABLE_MEDIA */ diff --git a/src/impl/track.cpp b/src/impl/track.cpp index 6ff61ad4b..12b3c22d8 100644 --- a/src/impl/track.cpp +++ b/src/impl/track.cpp @@ -240,11 +240,6 @@ shared_ptr Track::getMediaHandler() { return mMediaHandler; } -void Track::onFrame(std::function callback) { - frameCallback = callback; - flushPendingMessages(); -} - void Track::flushPendingMessages() { if (!mOpenTriggered) return; @@ -256,9 +251,9 @@ void Track::flushPendingMessages() { auto message = next.value(); try { - if (message->frameInfo != nullptr && frameCallback) { + if (message->frameInfo && frameCallback) { frameCallback(std::move(*message), std::move(*message->frameInfo)); - } else if (message->frameInfo == nullptr && messageCallback) { + } else if (!message->frameInfo && messageCallback) { messageCallback(trackMessageToVariant(message)); } } catch (const std::exception &e) { diff --git a/src/impl/track.hpp b/src/impl/track.hpp index ddcf52def..8f52cc0ac 100644 --- a/src/impl/track.hpp +++ b/src/impl/track.hpp @@ -41,7 +41,7 @@ class Track final : public std::enable_shared_from_this, public Channel { void flushPendingMessages() override; message_variant trackMessageToVariant(message_ptr message); - void onFrame(std::function callback); + void sendFrame(binary data, const FrameInfo &frame); bool isOpen() const; bool isClosed() const; @@ -61,6 +61,8 @@ class Track final : public std::enable_shared_from_this, public Channel { bool transportSend(message_ptr message); + synchronized_callback frameCallback; + private: const weak_ptr mPeerConnection; #if RTC_ENABLE_MEDIA @@ -76,7 +78,6 @@ class Track final : public std::enable_shared_from_this, public Channel { Queue mRecvQueue; - synchronized_callback frameCallback; }; } // namespace rtc::impl diff --git a/src/message.cpp b/src/message.cpp index 2bbf1183e..0ce3256b3 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -18,11 +18,14 @@ message_ptr make_message(size_t size, Message::Type type, unsigned int stream, return message; } -message_ptr make_message(binary &&data, Message::Type type, unsigned int stream, - shared_ptr reliability, shared_ptr frameInfo) { +message_ptr make_message(binary &&data, Message::Type type, unsigned int stream, shared_ptr reliability) { auto message = std::make_shared(std::move(data), type); message->stream = stream; message->reliability = reliability; + return message; +} +message_ptr make_message(binary &&data, shared_ptr frameInfo) { + auto message = std::make_shared(std::move(data)); message->frameInfo = frameInfo; return message; } diff --git a/src/nalunit.cpp b/src/nalunit.cpp index 64741c1b4..a4bbb6662 100644 --- a/src/nalunit.cpp +++ b/src/nalunit.cpp @@ -16,33 +16,38 @@ namespace rtc { -NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri, - uint8_t unitType, binary data) - : NalUnit(data.size() + 2) { - setForbiddenBit(forbiddenBit); - setNRI(nri); - fragmentIndicator()->setUnitType(NalUnitFragmentA::nal_type_fu_A); - setFragmentType(type); - setUnitType(unitType); - copy(data.begin(), data.end(), begin() + 2); +std::vector NalUnit::GenerateFragments(const std::vector &nalus, + size_t maxFragmentSize) { + std::vector result; + for (const auto &nalu : nalus) { + if (nalu.size() > maxFragmentSize) { + auto fragments = nalu.generateFragments(maxFragmentSize); + result.insert(result.end(), fragments.begin(), fragments.end()); + } else { + // TODO: check this + result.push_back(nalu); + } + } + return result; } -std::vector> -NalUnitFragmentA::fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSize) { - assert(nalu->size() > maxFragmentSize); - auto fragments_count = ceil(double(nalu->size()) / maxFragmentSize); - maxFragmentSize = uint16_t(int(ceil(nalu->size() / fragments_count))); +std::vector NalUnit::generateFragments(size_t maxFragmentSize) const { + assert(size() > maxFragmentSize); + // TODO: check this + auto fragments_count = ceil(double(size()) / maxFragmentSize); + maxFragmentSize = uint16_t(int(ceil(size() / fragments_count))); // 2 bytes for FU indicator and FU header maxFragmentSize -= 2; - auto f = nalu->forbiddenBit(); - uint8_t nri = nalu->nri() & 0x03; - uint8_t naluType = nalu->unitType() & 0x1F; - auto payload = nalu->payload(); - vector> result{}; - uint64_t offset = 0; + auto f = forbiddenBit(); + uint8_t nri = this->nri() & 0x03; + uint8_t unitType = this->unitType() & 0x1F; + auto payload = this->payload(); + size_t offset = 0; + std::vector result; while (offset < payload.size()) { vector fragmentData; + using FragmentType = NalUnitFragmentA::FragmentType; FragmentType fragmentType; if (offset == 0) { fragmentType = FragmentType::Start; @@ -55,14 +60,78 @@ NalUnitFragmentA::fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSi fragmentType = FragmentType::End; } fragmentData = {payload.begin() + offset, payload.begin() + offset + maxFragmentSize}; - auto fragment = - std::make_shared(fragmentType, f, nri, naluType, fragmentData); - result.push_back(fragment); + result.emplace_back(fragmentType, f, nri, unitType, fragmentData); offset += maxFragmentSize; } return result; } +NalUnitStartSequenceMatch NalUnit::StartSequenceMatchSucc(NalUnitStartSequenceMatch match, + std::byte _byte, Separator separator) { + assert(separator != Separator::Length); + auto byte = (uint8_t)_byte; + auto detectShort = + separator == Separator::ShortStartSequence || separator == Separator::StartSequence; + auto detectLong = + separator == Separator::LongStartSequence || separator == Separator::StartSequence; + switch (match) { + case NUSM_noMatch: + if (byte == 0x00) { + return NUSM_firstZero; + } + break; + case NUSM_firstZero: + if (byte == 0x00) { + return NUSM_secondZero; + } + break; + case NUSM_secondZero: + if (byte == 0x00 && detectLong) { + return NUSM_thirdZero; + } else if (byte == 0x00 && detectShort) { + return NUSM_secondZero; + } else if (byte == 0x01 && detectShort) { + return NUSM_shortMatch; + } + break; + case NUSM_thirdZero: + if (byte == 0x00 && detectLong) { + return NUSM_thirdZero; + } else if (byte == 0x01 && detectLong) { + return NUSM_longMatch; + } + break; + case NUSM_shortMatch: + return NUSM_shortMatch; + case NUSM_longMatch: + return NUSM_longMatch; + } + return NUSM_noMatch; +} + +NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri, + uint8_t unitType, binary data) + : NalUnit(data.size() + 2) { + setForbiddenBit(forbiddenBit); + setNRI(nri); + fragmentIndicator()->setUnitType(NalUnitFragmentA::nal_type_fu_A); + setFragmentType(type); + setUnitType(unitType); + copy(data.begin(), data.end(), begin() + 2); +} + +// For backward compatibility, do not use +std::vector> +NalUnitFragmentA::fragmentsFrom(shared_ptr nalu, uint16_t maxFragmentSize) { + auto fragments = nalu->generateFragments(maxFragmentSize); + std::vector> result; + result.reserve(fragments.size()); + for (auto fragment : fragments) + result.push_back(std::make_shared(std::move(fragment))); + + return result; +} + void NalUnitFragmentA::setFragmentType(FragmentType type) { fragmentHeader()->setReservedBit6(false); switch (type) { @@ -80,17 +149,18 @@ void NalUnitFragmentA::setFragmentType(FragmentType type) { } } +// For backward compatibility, do not use std::vector> NalUnits::generateFragments(uint16_t maxFragmentSize) { - vector> result{}; - for (auto nalu : *this) { - if (nalu->size() > maxFragmentSize) { - std::vector> fragments = - NalUnitFragmentA::fragmentsFrom(nalu, maxFragmentSize); - result.insert(result.end(), fragments.begin(), fragments.end()); - } else { - result.push_back(nalu); - } - } + std::vector nalus; + for (auto nalu : *this) + nalus.push_back(*nalu); + + auto fragments = NalUnit::GenerateFragments(nalus, maxFragmentSize); + std::vector> result; + result.reserve(fragments.size()); + for (auto fragment : fragments) + result.push_back(std::make_shared(std::move(fragment))); + return result; } diff --git a/src/rtcpnackresponder.cpp b/src/rtcpnackresponder.cpp index 88d1face2..4d810ff12 100644 --- a/src/rtcpnackresponder.cpp +++ b/src/rtcpnackresponder.cpp @@ -46,10 +46,9 @@ void RtcpNackResponder::incoming(message_vector &messages, const message_callbac newMissingSeqenceNumbers.end()); } - for (auto sequenceNumber : missingSequenceNumbers) { - if (auto optPacket = mStorage->get(sequenceNumber)) - send(make_message(*optPacket.value())); - } + for (auto sequenceNumber : missingSequenceNumbers) + if (auto packet = mStorage->get(sequenceNumber)) + send(packet); } } } @@ -61,7 +60,7 @@ void RtcpNackResponder::outgoing(message_vector &messages, mStorage->store(message); } -RtcpNackResponder::Storage::Element::Element(binary_ptr packet, uint16_t sequenceNumber, +RtcpNackResponder::Storage::Element::Element(message_ptr packet, uint16_t sequenceNumber, shared_ptr next) : packet(packet), sequenceNumber(sequenceNumber), next(next) {} @@ -72,14 +71,14 @@ RtcpNackResponder::Storage::Storage(size_t _maxSize) : maxSize(_maxSize) { storage.reserve(maxSize); } -optional RtcpNackResponder::Storage::get(uint16_t sequenceNumber) { +message_ptr RtcpNackResponder::Storage::get(uint16_t sequenceNumber) { std::lock_guard lock(mutex); auto position = storage.find(sequenceNumber); - return position != storage.end() ? std::make_optional(storage.at(sequenceNumber)->packet) - : nullopt; + return position != storage.end() ? storage.at(sequenceNumber)->packet + : nullptr; } -void RtcpNackResponder::Storage::store(binary_ptr packet) { +void RtcpNackResponder::Storage::store(message_ptr packet) { if (!packet || packet->size() < sizeof(RtpHeader)) return; diff --git a/src/rtcpsrreporter.cpp b/src/rtcpsrreporter.cpp index a9d79b5a0..158fa9650 100644 --- a/src/rtcpsrreporter.cpp +++ b/src/rtcpsrreporter.cpp @@ -14,6 +14,8 @@ #include #include +using namespace std::chrono_literals; + namespace { // TODO: move to utils @@ -28,16 +30,21 @@ uint64_t ntp_time() { namespace rtc { -RtcpSrReporter::RtcpSrReporter(shared_ptr rtpConfig) - : rtpConfig(rtpConfig) { - mLastReportedTimestamp = rtpConfig->timestamp; -} +RtcpSrReporter::RtcpSrReporter(shared_ptr rtpConfig) : rtpConfig(rtpConfig) {} -void RtcpSrReporter::setNeedsToReport() { mNeedsToReport = true; } +RtcpSrReporter::~RtcpSrReporter() {} + +void RtcpSrReporter::setNeedsToReport() { + // Dummy +} uint32_t RtcpSrReporter::lastReportedTimestamp() const { return mLastReportedTimestamp; } void RtcpSrReporter::outgoing(message_vector &messages, const message_callback &send) { + if (messages.empty()) + return; + + uint32_t timestamp = 0; for (const auto &message : messages) { if (message->type == Message::Control) continue; @@ -45,21 +52,27 @@ void RtcpSrReporter::outgoing(message_vector &messages, const message_callback & if (message->size() < sizeof(RtpHeader)) continue; - auto rtp = reinterpret_cast(message->data()); - addToReport(rtp, uint32_t(message->size())); + auto header = reinterpret_cast(message->data()); + if(header->ssrc() != rtpConfig->ssrc) + continue; + + timestamp = header->timestamp(); + + addToReport(header, message->size()); } - if (std::exchange(mNeedsToReport, false)) { - auto timestamp = rtpConfig->timestamp; - auto sr = getSenderReport(timestamp); - send(sr); + auto now = std::chrono::steady_clock::now(); + if (now >= mLastReportTime + 1s) { + send(getSenderReport(timestamp)); + mLastReportedTimestamp = timestamp; + mLastReportTime = now; } } -void RtcpSrReporter::addToReport(RtpHeader *rtp, uint32_t rtpSize) { +void RtcpSrReporter::addToReport(RtpHeader *header, size_t size) { mPacketCount += 1; - assert(!rtp->padding()); - mPayloadOctets += rtpSize - uint32_t(rtp->getSize()); + assert(!header->padding()); + mPayloadOctets += uint32_t(size - header->getSize()); } message_ptr RtcpSrReporter::getSenderReport(uint32_t timestamp) { @@ -81,7 +94,6 @@ message_ptr RtcpSrReporter::getSenderReport(uint32_t timestamp) { item->setText(rtpConfig->cname); sdes->preparePacket(1); - mLastReportedTimestamp = timestamp; return msg; } diff --git a/src/rtpdepacketizer.cpp b/src/rtpdepacketizer.cpp index 59d907444..69b60515c 100644 --- a/src/rtpdepacketizer.cpp +++ b/src/rtpdepacketizer.cpp @@ -18,6 +18,12 @@ namespace rtc { +RtpDepacketizer::RtpDepacketizer() : mClockRate(0) {} + +RtpDepacketizer::RtpDepacketizer(uint32_t clockRate) : mClockRate(clockRate) {} + +RtpDepacketizer::~RtpDepacketizer() {} + void RtpDepacketizer::incoming([[maybe_unused]] message_vector &messages, [[maybe_unused]] const message_callback &send) { message_vector result; @@ -34,9 +40,13 @@ void RtpDepacketizer::incoming([[maybe_unused]] message_vector &messages, auto pkt = reinterpret_cast(message->data()); auto headerSize = sizeof(rtc::RtpHeader) + pkt->csrcCount() + pkt->getExtensionHeaderSize(); - result.push_back(make_message(message->begin() + headerSize, message->end(), - Message::Binary, 0, nullptr, - std::make_shared(pkt->payloadType(), pkt->timestamp()))); + + auto frameInfo = std::make_shared(pkt->timestamp()); + if (mClockRate > 0) + frameInfo->timestampSeconds = + std::chrono::duration(double(pkt->timestamp()) / double(mClockRate)); + frameInfo->payloadType = pkt->payloadType(); + result.push_back(make_message(message->begin() + headerSize, message->end(), frameInfo)); } messages.swap(result); diff --git a/src/rtppacketizer.cpp b/src/rtppacketizer.cpp index 9cb7332ae..fb6b9c019 100644 --- a/src/rtppacketizer.cpp +++ b/src/rtppacketizer.cpp @@ -19,7 +19,12 @@ RtpPacketizer::RtpPacketizer(shared_ptr rtpConfig) : rtp RtpPacketizer::~RtpPacketizer() {} -message_ptr RtpPacketizer::packetize(shared_ptr payload, bool mark) { +std::vector RtpPacketizer::fragment(binary data) { + // Default implementation + return {std::move(data)}; +} + +message_ptr RtpPacketizer::packetize(const binary &payload, bool mark) { size_t rtpExtHeaderSize = 0; const bool setVideoRotation = (rtpConfig->videoOrientationId != 0) && @@ -47,7 +52,7 @@ message_ptr RtpPacketizer::packetize(shared_ptr payload, bool mark) { rtpExtHeaderSize = (rtpExtHeaderSize + 3) & ~3; - auto message = make_message(RtpHeaderSize + rtpExtHeaderSize + payload->size()); + auto message = make_message(RtpHeaderSize + rtpExtHeaderSize + payload.size()); auto *rtp = (RtpHeader *)message->data(); rtp->setPayloadType(rtpConfig->payloadType); rtp->setSeqNumber(rtpConfig->sequenceNumber++); // increase sequence number @@ -96,11 +101,8 @@ message_ptr RtpPacketizer::packetize(shared_ptr payload, bool mark) { uint16_t max = rtpConfig->playoutDelayMax & 0xFFF; // 12 bits for min + 12 bits for max - byte data[] = { - byte((min >> 4) & 0xFF), - byte(((min & 0xF) << 4) | ((max >> 8) & 0xF)), - byte(max & 0xFF) - }; + byte data[] = {byte((min >> 4) & 0xFF), byte(((min & 0xF) << 4) | ((max >> 8) & 0xF)), + byte(max & 0xFF)}; extHeader->writeOneByteHeader(offset, rtpConfig->playoutDelayId, data, 3); offset += 4; @@ -109,19 +111,44 @@ message_ptr RtpPacketizer::packetize(shared_ptr payload, bool mark) { rtp->preparePacket(); - std::memcpy(message->data() + RtpHeaderSize + rtpExtHeaderSize, payload->data(), - payload->size()); + std::memcpy(message->data() + RtpHeaderSize + rtpExtHeaderSize, payload.data(), payload.size()); return message; } +message_ptr RtpPacketizer::packetize(shared_ptr payload, bool mark) { + return packetize(*payload, mark); +} + void RtpPacketizer::media([[maybe_unused]] const Description::Media &desc) {} -void RtpPacketizer::outgoing([[maybe_unused]] message_vector &messages, +void RtpPacketizer::outgoing(message_vector &messages, [[maybe_unused]] const message_callback &send) { - // Default implementation - for (auto &message : messages) - message = packetize(message, false); + message_vector result; + for (const auto &message : messages) { + if (const auto &frameInfo = message->frameInfo) { + if (frameInfo->payloadType && frameInfo->payloadType != rtpConfig->payloadType) + continue; + + if (frameInfo->timestampSeconds) + rtpConfig->timestamp = + rtpConfig->startTimestamp + + rtpConfig->secondsToTimestamp( + std::chrono::duration(*frameInfo->timestampSeconds).count()); + else + rtpConfig->timestamp = frameInfo->timestamp; + } + + auto payloads = fragment(std::move(*message)); + if (payloads.size() > 0) { + for (size_t i = 0; i < payloads.size() - 1; i++) + result.push_back(packetize(payloads[i], false)); + + result.push_back(packetize(payloads[payloads.size() - 1], true)); + } + } + + messages.swap(result); } } // namespace rtc diff --git a/src/track.cpp b/src/track.cpp index fccc6dfe9..6e1899ac4 100644 --- a/src/track.cpp +++ b/src/track.cpp @@ -40,6 +40,15 @@ bool Track::isClosed(void) const { return impl()->isClosed(); } size_t Track::maxMessageSize() const { return impl()->maxMessageSize(); } +void Track::sendFrame(binary data, FrameInfo info) { + impl()->outgoing(make_message(std::move(data), std::make_shared(std::move(info)))); +} + +void Track::onFrame(std::function callback) { + impl()->frameCallback = callback; + impl()->flushPendingMessages(); +} + void Track::setMediaHandler(shared_ptr handler) { impl()->setMediaHandler(std::move(handler)); } @@ -70,8 +79,4 @@ bool Track::requestBitrate(unsigned int bitrate) { shared_ptr Track::getMediaHandler() { return impl()->getMediaHandler(); } -void Track::onFrame(std::function callback) { - impl()->onFrame(callback); -} - } // namespace rtc