Skip to content

Commit

Permalink
Hook up rtpproxy into rtp.io when no explicit configuration is
Browse files Browse the repository at this point in the history
provided. It is also possible to use internal RTP functionality
by adding it up as "rtp.io:auto" into some of the existing set.
  • Loading branch information
sobomax committed Jun 16, 2024
1 parent 89b2b7b commit 051a095
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 29 deletions.
2 changes: 2 additions & 0 deletions modules/rtpproxy/rtppn_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
Expand Down Expand Up @@ -66,6 +67,7 @@ int connect_rtpp_node(struct rtpp_node *pnode)
struct addrinfo hints, *res;
struct sockaddr_un sau;

assert(pnode->rn_umode != CM_RTPIO);
/*
* This is UDP, TCP, UDP6 or TCP6. Detect host and port; lookup host;
* do connect() in order to specify peer address
Expand Down
98 changes: 71 additions & 27 deletions modules/rtpproxy/rtpproxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@
#include "rtpproxy_vcmd.h"
#include "rtppn_connect.h"
#include "../rtp_relay/rtp_relay.h"
#include "../rtp.io/rtp_io_api.h"

#define NH_TABLE_VERSION 0

#define DEFAULT_RTPP_SET_ID 0
Expand Down Expand Up @@ -368,6 +370,7 @@ static int rtpproxy_autobridge = 0;
static pid_t mypid;
static int myrand = 0;
static unsigned int myseqn = 0;
static int myrank = 0;
static str nortpproxy_str = str_init("a=nortpproxy:yes");
str rtpp_notify_socket = {0, 0};
/*
Expand All @@ -385,8 +388,13 @@ struct rtpp_set_head ** rtpp_set_list =0;
struct rtpp_set ** default_rtpp_set=0;
static int default_rtpp_set_no = DEFAULT_RTPP_SET_ID;

struct rtpp_sock {
int fd;
enum comm_modes rn_umode;
};

/* array with the sockets used by rtpporxy (per process)*/
static int *rtpp_socks = 0;
static struct rtpp_sock *rtpp_socks = NULL;
static unsigned int *rtpp_no = 0;
static unsigned int *list_version;
static unsigned int my_version = 0;
Expand Down Expand Up @@ -641,8 +649,7 @@ static int rtpproxy_set_notify(modparam_t type, void * val)
return 0;
}

static int add_rtpproxy_socks(struct rtpp_set * rtpp_list,
char * rtpproxy){
static int add_rtpproxy_socks(struct rtpp_set * rtpp_list, char *rtpproxy){
/* Make rtp proxies list. */
char *p, *p1, *p2, *p3, *p4, *plim;
struct rtpp_node *pnode;
Expand Down Expand Up @@ -726,6 +733,13 @@ static int add_rtpproxy_socks(struct rtpp_set * rtpp_list,
} else if (strncasecmp(pnode->rn_address, "cunix:", 6) == 0) {
pnode->rn_umode = CM_CUNIX;
pnode->rn_address += 6;
} else if (strncasecmp(pnode->rn_address, "rtp.io:auto", 11) == 0) {
if (pnode->rn_address[11] != '\0') {
LM_ERR("only \"rtp.io:auto\" is supported\n");
return -1;
}
pnode->rn_umode = CM_RTPIO;
pnode->rn_address += 11;
}

if (rtpp_list->rn_first == NULL) {
Expand Down Expand Up @@ -1116,6 +1130,13 @@ static int mod_preinit(void)
return 0;
}

static rtp_io_getchildsock_t
rtp_io_childsock_f(void)
{

return (rtp_io_getchildsock_t)find_export("rtp_io_getchildsock", 0);
}

static int
mod_init(void)
{
Expand Down Expand Up @@ -1166,8 +1187,11 @@ mod_init(void)
if(db_url.s == NULL)
{
if (rtpp_sets == 0) {
LM_ERR("no rtpproxy set specified\n");
return -1;
int rtp_io_found = (rtp_io_childsock_f() == NULL) ? 0 : 1;
if (!rtp_io_found || rtpproxy_add_rtpproxy_set("rtp.io:auto", -1) != 0) {
LM_ERR("no rtpproxy set specified");
return -1;
}
}

/* storing the list of rtp proxy sets in shared memory*/
Expand Down Expand Up @@ -1355,7 +1379,7 @@ mod_init(void)

static int mi_child_init(void)
{
if(child_init(1) < 0)
if (child_init(1) < 0)
{
LM_ERR("Failed to initial rtpp socks\n");
return -1;
Expand Down Expand Up @@ -1465,6 +1489,7 @@ child_init(int rank)

mypid = getpid();
myrand = rand()%10000;
myrank = rank;

return connect_rtpproxies(NULL);
}
Expand All @@ -1480,7 +1505,8 @@ int connect_rtpproxies(struct rtpp_set *filter)
LM_DBG("[Re]connecting sockets (%d > %d)\n", *rtpp_no, rtpp_number);

if (*rtpp_no > rtpp_number) {
rtpp_socks = (int*)pkg_realloc(rtpp_socks, *rtpp_no * sizeof(int) );
size_t asize = *rtpp_no * sizeof(rtpp_socks[0]);
rtpp_socks = (typeof(rtpp_socks))pkg_realloc(rtpp_socks, asize);
if (rtpp_socks==NULL) {
LM_ERR("no more pkg memory\n");
return -1;
Expand All @@ -1495,16 +1521,31 @@ int connect_rtpproxies(struct rtpp_set *filter)
continue;

for (pnode=rtpp_list->rn_first; pnode!=0; pnode = pnode->rn_next){
if (pnode->rn_umode == CM_UNIX) {
rtpp_socks[pnode->idx] = -1;
} else {
rtpp_socks[pnode->idx] = connect_rtpp_node(pnode);
LM_INFO("created to %d\n", rtpp_socks[pnode->idx]);
if (rtpp_socks[pnode->idx] == -1) {
switch (pnode->rn_umode) {
case CM_UNIX:
rtpp_socks[pnode->idx].fd = -1;
break;
case CM_RTPIO:
{
rtp_io_getchildsock_t gcs_f;
gcs_f = rtp_io_childsock_f();
if (gcs_f == NULL) {
LM_ERR("rtp.io is not loaded\n");
return -1;
}
rtpp_socks[pnode->idx].fd = gcs_f(myrank);
}
break;
default:
rtpp_socks[pnode->idx].fd = connect_rtpp_node(pnode);
LM_INFO("created to %d\n", rtpp_socks[pnode->idx].fd);
if (rtpp_socks[pnode->idx].fd == -1) {
LM_ERR("connect_rtpp_node() failed\n");
return -1;
}
break;
}
rtpp_socks[pnode->idx].rn_umode = pnode->rn_umode;
pnode->rn_disabled = rtpp_test(pnode, 0, 1);
}

Expand Down Expand Up @@ -1534,13 +1575,15 @@ int update_rtpp_proxies(struct rtpp_set *filter) {

update_rtpp_notify();
for (i = 0; i < rtpp_number; i++) {
if (rtpp_socks[i].rn_umode == CM_RTPIO)
continue;
if (!filter ||
(filter->rtpp_socks_idx <= i
&& i < filter->rtpp_socks_idx + filter->rtpp_node_count)) {
LM_DBG("closing rtpp_socks[%d] | filter_set: %d\n", i,
LM_DBG("closing rtpp_socks[%d].fd | filter_set: %d\n", i,
filter ? filter->id_set : -1);
shutdown(rtpp_socks[i], SHUT_RDWR);
close(rtpp_socks[i]);
shutdown(rtpp_socks[i].fd, SHUT_RDWR);
close(rtpp_socks[i].fd);
}
}

Expand Down Expand Up @@ -2119,9 +2162,10 @@ send_rtpp_command(struct rtpp_node *node, struct rtpproxy_vcmd *vcmd, int vcnt)
max_vcnt = IOV_MAX;
#endif

if (rtpp_socks[node->idx] == -1 && node->rn_umode != CM_UNIX) {
rtpp_socks[node->idx] = connect_rtpp_node(node);
if (rtpp_socks[node->idx] == -1) {
if (rtpp_socks[node->idx].fd == -1 && node->rn_umode != CM_UNIX &&
node->rn_umode != CM_RTPIO) {
rtpp_socks[node->idx].fd = connect_rtpp_node(node);
if (rtpp_socks[node->idx].fd == -1) {
LM_ERR("connect_rtpp_node() failed\n");
return (NULL);
}
Expand Down Expand Up @@ -2202,18 +2246,18 @@ send_rtpp_command(struct rtpp_node *node, struct rtpproxy_vcmd *vcmd, int vcnt)
}
} else {
int rtry = CM_STREAM(node) ? 1 : rtpproxy_retr;
fds[0].fd = rtpp_socks[node->idx];
fds[0].fd = rtpp_socks[node->idx].fd;
fds[0].events = POLLIN | POLLRDHUP;
fds[0].revents = 0;
/* Drain input buffer */
while ((poll(fds, 1, 0) == 1) &&
((fds[0].revents & POLLIN) != 0)) {
if (fds[0].revents & (POLLERR|POLLNVAL|POLLRDHUP)) {
LM_ERR("error on rtpproxy socket %d!\n", rtpp_socks[node->idx]);
LM_ERR("error on rtpproxy socket %d!\n", rtpp_socks[node->idx].fd);
break;
}
fds[0].revents = 0;
if (recv(rtpp_socks[node->idx], buf, sizeof(buf) - 1, 0) < 0 &&
if (recv(rtpp_socks[node->idx].fd, buf, sizeof(buf) - 1, 0) < 0 &&
errno != EINTR) {
LM_ERR("error while draining rtpproxy %d!\n", errno);
break;
Expand All @@ -2229,7 +2273,7 @@ send_rtpp_command(struct rtpp_node *node, struct rtpproxy_vcmd *vcmd, int vcnt)
for (i = 0; i < rtry; i++) {
int buflen = sizeof(buf)-1;
do {
len = writev(rtpp_socks[node->idx], cv, vcnt + 1);
len = writev(rtpp_socks[node->idx].fd, cv, vcnt + 1);
} while (len == -1 && (errno == EINTR || errno == ENOBUFS));
if (len <= 0) {
LM_ERR("can't send (#%d iovec buffers) command to a RTP proxy (%d:%s)\n",
Expand All @@ -2241,7 +2285,7 @@ send_rtpp_command(struct rtpp_node *node, struct rtpproxy_vcmd *vcmd, int vcnt)
int s_errno;

do {
len = recv(rtpp_socks[node->idx], cp, buflen, 0);
len = recv(rtpp_socks[node->idx].fd, cp, buflen, 0);
} while (len == -1 && errno == EINTR);
s_errno = (len < 0) ? errno : 0;
if (len <= 0) {
Expand Down Expand Up @@ -2290,9 +2334,9 @@ send_rtpp_command(struct rtpp_node *node, struct rtpproxy_vcmd *vcmd, int vcnt)
return cp;
badproxy:
LM_ERR("proxy <%s> does not respond, disable it\n", node->rn_url.s);
if (CM_STREAM(node)) {
close(rtpp_socks[node->idx]);
rtpp_socks[node->idx] = -1;
if (CM_STREAM(node) && node->rn_umode != CM_RTPIO) {
close(rtpp_socks[node->idx].fd);
rtpp_socks[node->idx].fd = -1;
}
node->rn_disabled = 1;
node->rn_recheck_ticks = get_ticks() + rtpproxy_disable_tout;
Expand Down
5 changes: 3 additions & 2 deletions modules/rtpproxy/rtpproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct rtpproxy_vcmd;
#define AF_LOCAL AF_UNIX
#endif

enum comm_modes {CM_UNIX = 0, CM_CUNIX, CM_UDP, CM_TCP, CM_UDP6, CM_TCP6};
enum comm_modes {CM_UNIX = 0, CM_CUNIX, CM_RTPIO, CM_UDP, CM_TCP, CM_UDP6, CM_TCP6};

struct rtpp_node {
unsigned int idx; /* overall index */
Expand All @@ -57,7 +57,8 @@ struct rtpp_node {
struct rtpp_node *rn_next;
};

#define CM_STREAM(ndp) ((ndp)->rn_umode == CM_TCP || (ndp)->rn_umode == CM_TCP6 || (ndp)->rn_umode == CM_CUNIX)
#define CM_STREAM(ndp) ((ndp)->rn_umode == CM_TCP || (ndp)->rn_umode == CM_TCP6 || \
(ndp)->rn_umode == CM_CUNIX || (ndp)->rn_umode == CM_RTPIO)

/* Supported version of the RTP proxy command protocol */
#define SUP_CPROTOVER 20040107
Expand Down

0 comments on commit 051a095

Please sign in to comment.