diff --git a/.gitignore b/.gitignore index 04bb132..a530fe3 100644 Binary files a/.gitignore and b/.gitignore differ diff --git a/Cargo.lock b/Cargo.lock index 0da08db..1175e7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1761,7 +1761,7 @@ dependencies = [ "rtsp", "rtsp-types", "scopeguard", - "sdp 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "sdp 0.7.0", "sdp-types", "signal", "tokio", @@ -3066,6 +3066,18 @@ dependencies = [ "url", ] +[[package]] +name = "sdp" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5" +dependencies = [ + "rand", + "substring", + "thiserror", + "url", +] + [[package]] name = "sdp-types" version = "0.1.6" diff --git a/livetwo/Cargo.toml b/livetwo/Cargo.toml index 17dc3ee..d166fc0 100644 --- a/livetwo/Cargo.toml +++ b/livetwo/Cargo.toml @@ -27,4 +27,4 @@ url = "2.5.2" portpicker = "0.1.1" rtsp-types = "0.1.2" sdp-types = "0.1" -sdp = "0.6.2" +sdp = "0.7.0" diff --git a/livetwo/src/rtspclient.rs b/livetwo/src/rtspclient.rs index 6c820ea..4c005d9 100644 --- a/livetwo/src/rtspclient.rs +++ b/livetwo/src/rtspclient.rs @@ -164,6 +164,42 @@ impl RtspSession { Ok(()) } + async fn send_announce_request(&mut self, sdp: String) -> Result<()> { + let announce_request = Request::builder(Method::Announce, Version::V1_0) + .request_uri( + self.uri + .parse::() + .map_err(|_| anyhow!("Invalid URI"))?, + ) + .header(headers::CSEQ, self.cseq.to_string()) + .header(headers::CONTENT_TYPE, "application/sdp") + .header(headers::USER_AGENT, USER_AGENT) + .build(sdp.into_bytes()); + + self.send_request(&announce_request).await?; + let announce_response = self.read_response().await?; + self.cseq += 1; + + if announce_response.status() == StatusCode::Unauthorized { + if let Some(auth_header) = announce_response.header(&WWW_AUTHENTICATE).cloned() { + let announce_response = self + .handle_unauthorized(Method::Announce, &auth_header) + .await?; + if announce_response.status() != StatusCode::Ok { + return Err(anyhow!("ANNOUNCE request failed after authentication")); + } + } else { + return Err(anyhow!( + "ANNOUNCE request failed with 401 Unauthorized and no WWW-Authenticate header" + )); + } + } else if announce_response.status() != StatusCode::Ok { + return Err(anyhow!("ANNOUNCE request failed")); + } + + Ok(()) + } + async fn send_describe_request(&mut self) -> Result { let describe_request = Request::builder(Method::Describe, Version::V1_0) .request_uri( @@ -197,7 +233,7 @@ impl RtspSession { Ok(sdp_content) } - async fn send_setup_request(&mut self) -> Result { + async fn send_setup_request(&mut self) -> Result<(String, u16)> { let rtp_client_port = self .rtp_client_port .ok_or_else(|| anyhow!("RTP server port not set"))?; @@ -267,11 +303,26 @@ impl RtspSession { .ok_or_else(|| anyhow!("Failed to parse session ID"))? .to_string(); - Ok(session_id) + let transport_header = setup_response + .header(&headers::TRANSPORT) + .ok_or_else(|| anyhow!("Transport header not found"))? + .as_str(); + + let server_port = transport_header + .split(';') + .find_map(|part| part.strip_prefix("server_port=")) + .and_then(|server_port_str| server_port_str.split('-').next()) + .ok_or_else(|| anyhow!("server_port not found in transport header"))? + .parse::() + .map_err(|_| anyhow!("Failed to parse server port"))?; + + info!("Transport header: {}", transport_header); + + Ok((session_id, server_port)) } } -pub async fn setup_rtsp_session(rtsp_url: &str) -> Result<(u16, u16, Codec, Codec)> { +pub async fn setup_rtsp_session(rtsp_url: &str) -> Result { let mut url = Url::parse(rtsp_url)?; let host = url .host() @@ -314,12 +365,116 @@ pub async fn setup_rtsp_session(rtsp_url: &str) -> Result<(u16, u16, Codec, Code return Err(anyhow!("No tracks found in SDP")); } - let rtp_port = pick_unused_port().ok_or_else(|| anyhow!("No available port found"))?; - let rtp_audio_port = pick_unused_port().ok_or_else(|| anyhow!("No available port found"))?; + let mut media_info = rtsp::MediaInfo::default(); + + if let Some(video_track) = video_track { + let (rtp_client, rtcp_client, rtp_server, codec) = + setup_track(&mut rtsp_session, video_track, "0").await?; + media_info.video_rtp_server = rtp_client; + media_info.video_rtcp_client = rtcp_client; + media_info.video_rtp_client = rtp_server; + media_info.video_codec = codec; + } + + if let Some(audio_track) = audio_track { + let (rtp_client, rtcp_client, rtp_server, codec) = + setup_track(&mut rtsp_session, audio_track, "1").await?; + media_info.audio_rtp_server = rtp_client; + media_info.audio_rtcp_client = rtcp_client; + media_info.audio_rtp_client = rtp_server; + media_info.audio_codec = codec; + } + + let play_request = Request::builder(Method::Play, Version::V1_0) + .request_uri( + rtsp_session + .uri + .parse::() + .map_err(|_| anyhow!("Invalid URI"))?, + ) + .header(headers::CSEQ, rtsp_session.cseq.to_string()) + .header(headers::USER_AGENT, USER_AGENT) + .header( + headers::SESSION, + rtsp_session.session_id.as_ref().unwrap().as_str(), + ) + .empty(); + + rtsp_session + .send_request(&play_request.map_body(|_| vec![])) + .await?; + let mut play_response = rtsp_session.read_response().await?; + trace!("play_response: {:?}", play_response); + + if play_response.status() == StatusCode::Unauthorized { + if let Some(auth_header) = play_response.header(&WWW_AUTHENTICATE).cloned() { + play_response = rtsp_session + .handle_unauthorized(Method::Play, &auth_header) + .await?; + } + } + + if play_response.status() != StatusCode::Ok { + return Err(anyhow!("PLAY request failed")); + } + + tokio::spawn(rtsp_session.keep_rtsp_alive()); + + Ok(media_info) +} + +pub async fn setup_rtsp_push_session( + rtsp_url: &str, + sdp_content: String, +) -> Result { + let mut url = Url::parse(rtsp_url)?; + let host = url.host_str().ok_or_else(|| anyhow!("Invalid RTSP URL"))?; + let port = url + .port_or_known_default() + .ok_or_else(|| anyhow!("Invalid RTSP URL"))?; + + let addr = format!("{}:{}", host, port); + info!("Connecting to RTSP server at {}", addr); + let stream = TcpStream::connect(&addr).await?; + + let mut rtsp_session = RtspSession { + stream, + uri: url.as_str().to_string(), + cseq: 1, + auth_params: AuthParams { + username: url.username().to_string(), + password: url.password().unwrap_or("").to_string(), + }, + session_id: None, + rtp_client_port: None, + auth_header: None, + }; + + url.set_username("").unwrap(); + url.set_password(None).unwrap(); - let video_uri = video_track - .and_then(|md| { - md.attributes.iter().find_map(|attr| { + rtsp_session.send_options_request().await?; + rtsp_session + .send_announce_request(sdp_content.clone()) + .await?; + + let sdp: Session = Session::parse(sdp_content.as_bytes()) + .map_err(|e| anyhow!("Failed to parse SDP: {}", e))?; + + let video_track = sdp.medias.iter().find(|md| md.media == "video"); + let audio_track = sdp.medias.iter().find(|md| md.media == "audio"); + debug!("track video: {:?}, audio: {:?}", video_track, audio_track); + + if video_track.is_none() && audio_track.is_none() { + return Err(anyhow!("No tracks found in SDP")); + } + let mut media_info = rtsp::MediaInfo::default(); + + if let Some(video_track) = video_track { + let video_url = video_track + .attributes + .iter() + .find_map(|attr| { if attr.attribute == "control" { let value = attr.value.clone().unwrap_or_default(); if value.starts_with("rtsp://") { @@ -331,12 +486,23 @@ pub async fn setup_rtsp_session(rtsp_url: &str) -> Result<(u16, u16, Codec, Code None } }) - }) - .unwrap_or_else(|| format!("{}/trackID=1", rtsp_session.uri)); + .unwrap_or_else(|| format!("{}/trackID=1", rtsp_session.uri)); + + media_info.video_rtp_client = + Some(pick_unused_port().ok_or_else(|| anyhow!("No available port found"))?); + rtsp_session.rtp_client_port = media_info.video_rtp_client; + rtsp_session.uri = video_url; + + let (session_id, v_server_port) = rtsp_session.send_setup_request().await?; + rtsp_session.session_id = Some(session_id); + media_info.video_rtp_server = Some(v_server_port); + } - let audio_uri = audio_track - .and_then(|md| { - md.attributes.iter().find_map(|attr| { + if let Some(audio_track) = audio_track { + let audio_url = audio_track + .attributes + .iter() + .find_map(|attr| { if attr.attribute == "control" { let value = attr.value.clone().unwrap_or_default(); if value.starts_with("rtsp://") { @@ -348,25 +514,18 @@ pub async fn setup_rtsp_session(rtsp_url: &str) -> Result<(u16, u16, Codec, Code None } }) - }) - .unwrap_or_else(|| format!("{}/trackID=2", rtsp_session.uri)); - - trace!("video uri: {:?}", video_uri); - trace!("audio uri: {:?}", audio_uri); + .unwrap_or_else(|| format!("{}/trackID=2", rtsp_session.uri)); - rtsp_session.uri.clone_from(&video_uri); - rtsp_session.rtp_client_port = Some(rtp_port); + media_info.audio_rtp_client = + Some(pick_unused_port().ok_or_else(|| anyhow!("No available port found"))?); + rtsp_session.rtp_client_port = media_info.audio_rtp_client; + rtsp_session.uri = audio_url; - let session_id = rtsp_session.send_setup_request().await?; - trace!("session id: {:?}", session_id); - - rtsp_session.session_id = Some(session_id); - - rtsp_session.uri.clone_from(&audio_uri); - rtsp_session.rtp_client_port = Some(rtp_audio_port); - rtsp_session.send_setup_request().await?; + let (_session_id, a_server_port) = rtsp_session.send_setup_request().await?; + media_info.audio_rtp_server = Some(a_server_port); + } - let play_request = Request::builder(Method::Play, Version::V1_0) + let record_request = Request::builder(Method::Record, Version::V1_0) .request_uri( rtsp_session .uri @@ -377,68 +536,88 @@ pub async fn setup_rtsp_session(rtsp_url: &str) -> Result<(u16, u16, Codec, Code .header(headers::USER_AGENT, USER_AGENT) .header( headers::SESSION, - rtsp_session.session_id.as_ref().unwrap().as_str(), + rtsp_session + .session_id + .clone() + .ok_or_else(|| anyhow!("Missing session ID"))?, ) .empty(); rtsp_session - .send_request(&play_request.map_body(|_| vec![])) + .send_request(&record_request.map_body(|_| vec![])) .await?; - let mut play_response = rtsp_session.read_response().await?; - trace!("play_response: {:?}", play_response); + let response = rtsp_session.read_response().await?; + rtsp_session.cseq += 1; - if play_response.status() == StatusCode::Unauthorized { - if let Some(auth_header) = play_response.header(&WWW_AUTHENTICATE).cloned() { - play_response = rtsp_session - .handle_unauthorized(Method::Play, &auth_header) + if response.status() == StatusCode::Unauthorized { + if let Some(auth_header) = response.header(&WWW_AUTHENTICATE).cloned() { + let response = rtsp_session + .handle_unauthorized(Method::Record, &auth_header) .await?; + if response.status() != StatusCode::Ok { + return Err(anyhow!("RECORD request failed after authentication")); + } + } else { + return Err(anyhow!( + "RECORD request failed with 401 Unauthorized and no WWW-Authenticate header" + )); } - } - - if play_response.status() != StatusCode::Ok { - return Err(anyhow!("PLAY request failed")); + } else if response.status() != StatusCode::Ok { + return Err(anyhow!( + "RECORD request failed with status: {:?}", + response.status() + )); } tokio::spawn(rtsp_session.keep_rtsp_alive()); - let video_codec = video_track - .and_then(|md| { - md.attributes.iter().find_map(|attr| { - if attr.attribute == "rtpmap" { - let parts: Vec<&str> = attr.value.as_ref()?.split_whitespace().collect(); - if parts.len() > 1 { - Some(parts[1].split('/').next().unwrap_or("").to_string()) - } else { - None - } - } else { - None - } - }) - }) - .unwrap_or_else(|| "unknown".to_string()); - - let audio_codec = audio_track - .and_then(|md| { - md.attributes.iter().find_map(|attr| { - if attr.attribute == "rtpmap" { - let parts: Vec<&str> = attr.value.as_ref()?.split_whitespace().collect(); - if parts.len() > 1 { - Some(parts[1].split('/').next().unwrap_or("").to_string()) - } else { - None - } + Ok(media_info) +} + +async fn setup_track( + rtsp_session: &mut RtspSession, + track: &sdp_types::Media, + track_id: &str, +) -> Result<(Option, Option, Option, Option)> { + let track_url = track + .attributes + .iter() + .find_map(|attr| { + if attr.attribute == "control" { + let value = attr.value.clone().unwrap_or_default(); + if value.starts_with("rtsp://") { + Some(value) } else { - None + Some(format!("{}/{}", rtsp_session.uri, value)) } - }) + } else { + None + } }) - .unwrap_or_else(|| "unknown".to_string()); + .unwrap_or_else(|| format!("{}/trackID={}", rtsp_session.uri, track_id)); + + let rtp_client_port = pick_unused_port().ok_or_else(|| anyhow!("No available port found"))?; + rtsp_session.rtp_client_port = Some(rtp_client_port); + rtsp_session.uri = track_url; - let video_codec = codec_from_str(&video_codec)?; - let audio_codec = codec_from_str(&audio_codec)?; + let (session_id, rtp_server_port) = rtsp_session.send_setup_request().await?; + rtsp_session.session_id = Some(session_id); - Ok((rtp_port, rtp_audio_port, video_codec, audio_codec)) + let codec = track.attributes.iter().find_map(|attr| { + if attr.attribute == "rtpmap" { + let value = attr.value.as_ref()?.split_whitespace().nth(1)?; + codec_from_str(value).ok() + } else { + None + } + }); + + Ok(( + Some(rtp_client_port), + Some(rtp_client_port + 1), + Some(rtp_server_port), + codec, + )) } fn generate_digest_response( diff --git a/livetwo/src/whep.rs b/livetwo/src/whep.rs index 90af382..e5470d3 100644 --- a/livetwo/src/whep.rs +++ b/livetwo/src/whep.rs @@ -39,7 +39,8 @@ use webrtc::{ use libwish::Client; -use crate::{PREFIX_LIB, SCHEME_RTP_SDP, SCHEME_RTSP_SERVER}; +use crate::rtspclient::setup_rtsp_push_session; +use crate::{PREFIX_LIB, SCHEME_RTP_SDP, SCHEME_RTSP_CLIENT, SCHEME_RTSP_SERVER}; pub async fn from( target_url: String, @@ -107,7 +108,7 @@ pub async fn from( if input.scheme() == SCHEME_RTSP_SERVER { let (tx, mut rx) = unbounded_channel::(); let mut handler = rtsp::Handler::new(tx, complete_tx.clone()); - handler.set_sdp(filtered_sdp.into_bytes()); + handler.set_sdp(filtered_sdp.clone().into_bytes()); let host2 = host.to_string(); let tcp_port = input.port().unwrap_or(0); @@ -130,6 +131,9 @@ pub async fn from( }); media_info = rx.recv().await.unwrap(); + } + if input.scheme() == SCHEME_RTSP_CLIENT { + media_info = setup_rtsp_push_session(&target_url, filtered_sdp.clone()).await?; } else { media_info.video_rtp_client = pick_unused_port(); media_info.audio_rtp_client = pick_unused_port(); diff --git a/livetwo/src/whip.rs b/livetwo/src/whip.rs index e2a84c2..2785b9d 100644 --- a/livetwo/src/whip.rs +++ b/livetwo/src/whip.rs @@ -112,18 +112,7 @@ pub async fn into( media_info = rx.recv().await.unwrap(); } else if input.scheme() == SCHEME_RTSP_CLIENT { - let (video_port, audio_port, video_codec, audio_codec) = - setup_rtsp_session(&target_url).await?; - media_info = rtsp::MediaInfo { - video_rtp_client: None, - audio_rtp_client: None, - video_codec: Some(video_codec), - audio_codec: Some(audio_codec), - video_rtp_server: Some(video_port), - audio_rtp_server: Some(audio_port), - video_rtcp_client: Some(video_port + 1), - audio_rtcp_client: Some(audio_port + 1), - }; + media_info = setup_rtsp_session(&target_url).await?; } else { tokio::time::sleep(Duration::from_secs(1)).await; let path = Path::new(&target_url);