Skip to content

Commit

Permalink
Merge branch 'rtpRewrite'
Browse files Browse the repository at this point in the history
  • Loading branch information
lodoyun committed Mar 6, 2014
2 parents 233f158 + c945501 commit 4819c18
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 238 deletions.
5 changes: 1 addition & 4 deletions erizo/src/erizo/SdesTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,7 @@ void SdesTransport::onNiceData(unsigned int component_id, char* data, int len, N
}

rtcpheader *chead = reinterpret_cast<rtcpheader*> (unprotectBuf_);
if (chead->packettype == RTCP_Sender_PT ||
chead->packettype == RTCP_Receiver_PT ||
chead->packettype == RTCP_PS_Feedback_PT||
chead->packettype == RTCP_RTP_Feedback_PT){
if (chead->isRtcp()){
if(srtp->unprotectRtcp(unprotectBuf_, &length)<0)
return;
} else {
Expand Down
6 changes: 3 additions & 3 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ namespace erizo {

int WebRtcConnection::deliverFeedback(char* buf, int len){
// Check where to send the feedback
rtcpheader *chead = (rtcpheader*) buf;
rtcpheader *chead = reinterpret_cast<rtcpheader*> (buf);
ELOG_DEBUG("received Feedback type %u ssrc %u, sourcessrc %u", chead->packettype, ntohl(chead->ssrc), ntohl(chead->ssrcsource));
if (ntohl(chead->ssrcsource) == this->getAudioSourceSSRC()) {
writeSsrc(buf,len,this->getAudioSinkSSRC());
Expand All @@ -216,10 +216,10 @@ namespace erizo {
}

void WebRtcConnection::writeSsrc(char* buf, int len, unsigned int ssrc) {
rtpheader *head = (rtpheader*) buf;
rtpheader *head = reinterpret_cast<rtpheader*> (buf);
rtcpheader *chead = reinterpret_cast<rtcpheader*> (buf);
//if it is RTCP we check it it is a compound packet
if (chead->packettype == RTCP_Sender_PT || chead->packettype == RTCP_Receiver_PT || chead->packettype == RTCP_PS_Feedback_PT || chead->packettype == RTCP_RTP_Feedback_PT) {
if (chead->isRtcp()) {
processRtcpHeaders(buf,len,ssrc);
} else {
head->ssrc=htonl(ssrc);
Expand Down
35 changes: 24 additions & 11 deletions erizo/src/erizo/media/ExternalOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,19 @@ namespace erizo {
}

int ExternalOutput::deliverAudioData(char* buf, int len) {
rtcpheader *head = reinterpret_cast<rtcpheader*>(buf);
if (head->isRtcp()){
return 0;
}
this->queueData(buf,len,AUDIO_PACKET);
return 0;
}

int ExternalOutput::deliverVideoData(char* buf, int len) {
rtcpheader *head = reinterpret_cast<rtcpheader*>(buf);
if (head->isRtcp()){
return 0;
}
this->queueData(buf,len,VIDEO_PACKET);
return 0;
}
Expand Down Expand Up @@ -319,16 +327,11 @@ namespace erizo {
return;
}
boost::mutex::scoped_lock lock(queueMutex_);

if (packetQueue_.size()>1000){
return;
if (type == VIDEO_PACKET){
videoQueue_.pushPacket(buffer, length);
}else{
audioQueue_.pushPacket(buffer, length);
}
dataPacket p;
memset(p.data, 0,length);
memcpy(p.data, buffer, length);
p.type = type;
p.length = length;
packetQueue_.push(p);
cond_.notify_one();

}
Expand Down Expand Up @@ -365,20 +368,30 @@ namespace erizo {

while (sending_ == true) {
boost::unique_lock<boost::mutex> lock(queueMutex_);
while (packetQueue_.size() == 0) {
while ((!audioQueue_.getSize())&&(!videoQueue_.getSize())) {
cond_.wait(lock);
if (sending_ == false) {
lock.unlock();
return;
}
}
if (audioQueue_.getSize()){
boost::shared_ptr<dataPacket> audioP = audioQueue_.popPacket();
this->writeAudioData(audioP->data, audioP->length);
}
if (videoQueue_.getSize()) {
boost::shared_ptr<dataPacket> videoP = videoQueue_.popPacket();
this->writeVideoData(videoP->data, videoP->length);

}
/*
if (packetQueue_.front().type == VIDEO_PACKET) {
this->writeVideoData(packetQueue_.front().data, packetQueue_.front().length);
} else {
this->writeAudioData(packetQueue_.front().data, packetQueue_.front().length);
}
packetQueue_.pop();
*/

lock.unlock();
}
}
Expand Down
5 changes: 3 additions & 2 deletions erizo/src/erizo/media/ExternalOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

#include <string>
#include <map>
#include <queue>
#include "../MediaDefinitions.h"
#include "rtp/RtpPacketQueue.h"
#include "codecs/VideoCodec.h"
#include "codecs/AudioCodec.h"
#include "MediaProcessor.h"
Expand Down Expand Up @@ -32,15 +32,16 @@ namespace erizo{

private:
OutputProcessor* op_;
RtpPacketQueue audioQueue_, videoQueue_;
unsigned char* decodedBuffer_;
char* sendVideoBuffer_;


std::string url;
volatile bool sending_;
boost::mutex queueMutex_;
boost::thread thread_;
boost::condition_variable cond_;
std::queue<dataPacket> packetQueue_;
AVStream *video_st, *audio_st;

AudioEncoder* audioCoder_;
Expand Down
2 changes: 2 additions & 0 deletions erizo/src/erizo/media/rtp/RtpHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#ifndef RTPHEADER_H_
#define RTPHEADER_H_

#include <arpa/inet.h>

class RTPHeader {
public:
// constants
Expand Down
114 changes: 114 additions & 0 deletions erizo/src/erizo/media/rtp/RtpPacketQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include <cstring>

#include "RtpPacketQueue.h"
#include "../../MediaDefinitions.h"
#include "RtpHeader.h"


namespace erizo{

DEFINE_LOGGER(RtpPacketQueue, "RtpPacketQueue");

RtpPacketQueue::RtpPacketQueue()
: lastNseq_(0), lastTs_(0)
{
}

RtpPacketQueue::~RtpPacketQueue(void)
{
cleanQueue();
}

void RtpPacketQueue::pushPacket(const char *data, int length)
{

const RTPHeader *header = reinterpret_cast<const RTPHeader*>(data);
uint16_t nseq = header->getSeqNumber();
uint32_t ts = header->getTimestamp();

long long int ltsdiff = (long long int)ts - (long long int)lastTs_;
int tsdiff = (int)ltsdiff;
int nseqdiff = nseq - lastNseq_;
/*
// nseq sequence cicle test
if ( abs(nseqdiff) > ( USHRT_MAX - MAX_DIFF ) )
{
NOTIFY("Vuelta del NSeq ns=%d last=%d\n", nseq, lastNseq_);
if (nseqdiff > 0)
nseqdiff-= (USHRT_MAX + 1);
else
nseqdiff+= (USHRT_MAX + 1);
}
*/

if (abs(tsdiff) > MAX_DIFF_TS || abs(nseqdiff) > MAX_DIFF )
{
// new flow, process and clean queue
ELOG_DEBUG("Max diff reached, new Flow? nsqediff %d , tsdiff %d", nseqdiff, tsdiff);
ELOG_DEBUG("PT %d", header->getPayloadType());
lastNseq_ = nseq;
lastTs_ = ts;
cleanQueue();
}
else if (nseqdiff > 1)
{
// Jump in nseq, enqueue
ELOG_DEBUG("Jump in nseq");
enqueuePacket(data, length, nseq);
}
else if (nseqdiff == 1)
{
// next packet, process
lastNseq_ = nseq;
lastTs_ = ts;
enqueuePacket(data, length, nseq);
}
else if (nseqdiff < 0)
{
ELOG_DEBUG("Old Packet Received");
// old packet, discard?
// stats?
}
else if (nseqdiff == 0)
{
ELOG_DEBUG("Duplicate Packet received");
//duplicate packet, process (for stats)?
}
}

void RtpPacketQueue::enqueuePacket(const char *data, int length, uint16_t nseq)
{
if (queue_.size() > MAX_SIZE) { // if queue is growing too much, we start again
cleanQueue();
}

boost::shared_ptr<dataPacket> packet(new dataPacket());
memcpy(packet->data, data, length);
packet->length = length;
queue_.insert(std::map< int, boost::shared_ptr<dataPacket>>::value_type(nseq,packet));

}

void RtpPacketQueue::cleanQueue()
{
queue_.clear();
}

boost::shared_ptr<dataPacket> RtpPacketQueue::popPacket()
{
boost::shared_ptr<dataPacket> packet = queue_.begin()->second;
if (packet.get() == NULL){
return packet;
}
const RTPHeader *header = reinterpret_cast<const RTPHeader*>(packet->data);
lastNseq_ = queue_.begin()->first;
lastTs_ = header->getTimestamp();
queue_.erase(queue_.begin());
return packet;
}

int RtpPacketQueue::getSize(){
uint16_t size = queue_.size();
return size;
}
} /* namespace erizo */
40 changes: 40 additions & 0 deletions erizo/src/erizo/media/rtp/RtpPacketQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#ifndef __RTPPACKETQUEUE_H__
#define __RTPPACKETQUEUE_H__

#include <map>
#include <boost/shared_ptr.hpp>

#include "logger.h"

namespace erizo{
//forward declaration
class dataPacket;

class RtpPacketQueue
{
DECLARE_LOGGER();

public:
RtpPacketQueue();
virtual ~RtpPacketQueue(void);

void pushPacket(const char *data, int length);
boost::shared_ptr<dataPacket> popPacket();
int getSize();

private:
static const int MAX_DIFF = 50;
static const int MAX_DIFF_TS = 50000;
static const unsigned int MAX_SIZE = 10;
std::map< int, boost::shared_ptr<dataPacket>> queue_;
uint16_t lastNseq_;
uint32_t lastTs_;

void enqueuePacket(const char *data, int length, uint16_t nseq);
void cleanQueue(void);

};
} /* namespace erizo */

#endif /* RTPPACKETQUEUE*/

Loading

0 comments on commit 4819c18

Please sign in to comment.