From 634c38fd983ed5153f72429248357bc93c7a303f Mon Sep 17 00:00:00 2001 From: SHAcollision <127778313+SHAcollision@users.noreply.github.com> Date: Wed, 19 Feb 2025 06:44:53 -0400 Subject: [PATCH 1/6] fix(relay): remove rate limiting on random port test relay (#125) --- relay/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay/src/lib.rs b/relay/src/lib.rs index 7f0e974..95efbf1 100644 --- a/relay/src/lib.rs +++ b/relay/src/lib.rs @@ -213,6 +213,7 @@ impl Relay { let mut config = Config { cache_path: None, http_port: 0, + rate_limiter: None, ..Default::default() }; From 8e7b44f4587afb2936b3d0bcf3e1eb53bf13bfe5 Mon Sep 17 00:00:00 2001 From: nazeh Date: Wed, 19 Feb 2025 16:25:28 +0300 Subject: [PATCH 2/6] fix(pkarr): only use async_compat when _not_ in another tokio runtime --- pkarr/src/client.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/pkarr/src/client.rs b/pkarr/src/client.rs index f665fbe..ee68621 100644 --- a/pkarr/src/client.rs +++ b/pkarr/src/client.rs @@ -24,6 +24,7 @@ mod tests_web; use futures_lite::{Stream, StreamExt}; use pubky_timestamp::Timestamp; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::{hash::Hash, num::NonZeroUsize}; @@ -225,12 +226,7 @@ impl Client { signed_packet: &SignedPacket, cas: Option, ) -> Result<(), PublishError> { - #[cfg(not(wasm_browser))] - { - async_compat::Compat::new(self.publish_inner(signed_packet, cas)).await - } - #[cfg(wasm_browser)] - self.publish_inner(signed_packet, cas).await + async_compat_if_necessary(self.publish_inner(signed_packet, cas)).await } // === Resolve === @@ -242,12 +238,7 @@ impl Client { /// If you want to get the most recent version of a [SignedPacket], /// you should use [Self::resolve_most_recent]. pub async fn resolve(&self, public_key: &PublicKey) -> Option { - #[cfg(not(wasm_browser))] - { - async_compat::Compat::new(self.resolve_inner(public_key)).await - } - #[cfg(wasm_browser)] - self.resolve_inner(public_key).await + async_compat_if_necessary(self.resolve_inner(public_key)).await } /// Returns the most recent [SignedPacket] found after querying all @@ -662,3 +653,17 @@ impl From for PublishError { } } } + +async fn async_compat_if_necessary(fut: T) -> O +where + T: Future, +{ + #[cfg(not(wasm_browser))] + { + if tokio::runtime::Handle::try_current().is_err() { + return async_compat::Compat::new(fut).await; + } + } + + fut.await +} From 4dc0908369b6bcd5cc7a86538c094af4eaa8314a Mon Sep 17 00:00:00 2001 From: nazeh Date: Wed, 19 Feb 2025 16:31:10 +0300 Subject: [PATCH 3/6] chore: Release --- Cargo.lock | 46 +++++++++++++++++++++++----------------------- pkarr/Cargo.toml | 2 +- relay/Cargo.toml | 2 +- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5541a23..a451544 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1824,86 +1824,86 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkarr" version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb8be2e97e822e41c6e254706e732e02ca2c49f4da038042c11d5a5e5961a164" dependencies = [ - "anyhow", "async-compat", - "axum", - "axum-server", "base32", "byteorder", "bytes", "cfg_aliases", - "clap", - "console_log", "document-features", "dyn-clone", "ed25519-dalek", - "futures-buffered", "futures-lite", - "genawaiter", "getrandom 0.2.15", "heed", "log", "lru", "mainline", "page_size", - "pkarr-relay", - "postcard", "pubky-timestamp", - "reqwest", - "rstest", - "rustls", - "rustls-webpki", "self_cell", "serde", "sha1_smol", "simple-dns", "thiserror 2.0.11", "tokio", - "tokio-rustls", "tracing", - "tracing-subscriber", - "url", - "wasm-bindgen", "wasm-bindgen-futures", - "wasm-bindgen-test", ] [[package]] name = "pkarr" -version = "3.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb8be2e97e822e41c6e254706e732e02ca2c49f4da038042c11d5a5e5961a164" +version = "3.3.2" dependencies = [ + "anyhow", "async-compat", + "axum", + "axum-server", "base32", "byteorder", "bytes", "cfg_aliases", + "clap", + "console_log", "document-features", "dyn-clone", "ed25519-dalek", + "futures-buffered", "futures-lite", + "genawaiter", "getrandom 0.2.15", "heed", "log", "lru", "mainline", "page_size", + "pkarr-relay", + "postcard", "pubky-timestamp", + "reqwest", + "rstest", + "rustls", + "rustls-webpki", "self_cell", "serde", "sha1_smol", "simple-dns", "thiserror 2.0.11", "tokio", + "tokio-rustls", "tracing", + "tracing-subscriber", + "url", + "wasm-bindgen", "wasm-bindgen-futures", + "wasm-bindgen-test", ] [[package]] name = "pkarr-relay" -version = "0.5.6" +version = "0.5.7" dependencies = [ "anyhow", "axum", @@ -1915,7 +1915,7 @@ dependencies = [ "http", "httpdate", "mainline", - "pkarr 3.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "pkarr 3.3.1", "rustls", "serde", "thiserror 2.0.11", diff --git a/pkarr/Cargo.toml b/pkarr/Cargo.toml index f77dcd3..2d94852 100644 --- a/pkarr/Cargo.toml +++ b/pkarr/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pkarr" -version = "3.3.1" +version = "3.3.2" authors = ["Nuh "] edition = "2021" description = "Public-Key Addressable Resource Records (Pkarr); publish and resolve DNS records over Mainline DHT" diff --git a/relay/Cargo.toml b/relay/Cargo.toml index 5871f27..49867d0 100644 --- a/relay/Cargo.toml +++ b/relay/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pkarr-relay" -version = "0.5.6" +version = "0.5.7" authors = ["Nuh "] edition = "2021" description = "Pkarr relay (https://pkarr.org/relays)" From c8c4059c4085ab53dbd1927b48287e1b36ff2156 Mon Sep 17 00:00:00 2001 From: nazeh Date: Thu, 20 Feb 2025 10:37:58 +0300 Subject: [PATCH 4/6] fix(pkarr): properly cleanup relays inflight requests --- pkarr/src/client.rs | 12 ++++- pkarr/src/client/relays.rs | 102 ++++++++++++++++++++++--------------- pkarr/src/client/tests.rs | 27 ++++++++++ pkarr/src/extra/tls.rs | 2 +- 4 files changed, 99 insertions(+), 44 deletions(-) diff --git a/pkarr/src/client.rs b/pkarr/src/client.rs index ee68621..aee1f0f 100644 --- a/pkarr/src/client.rs +++ b/pkarr/src/client.rs @@ -333,11 +333,19 @@ impl Client { #[cfg(all(dht, relays))] return if dht_future.is_some() && relays_future.is_some() { - futures_lite::future::or( + let result = futures_lite::future::or( dht_future.expect("infallible"), relays_future.expect("infallible"), ) - .await + .await; + + self.0 + .relays + .as_ref() + .expect("infallible") + .cancel_publish(&signed_packet.public_key()); + + result } else if dht_future.is_some() { dht_future.expect("infallible").await } else { diff --git a/pkarr/src/client/relays.rs b/pkarr/src/client/relays.rs index 4077e50..db4e964 100644 --- a/pkarr/src/client/relays.rs +++ b/pkarr/src/client/relays.rs @@ -109,6 +109,11 @@ impl RelaysClient { .expect("infallible") } + /// Cancel an inflight publish request. + pub fn cancel_publish(&self, public_key: &PublicKey) { + self.inflight_publish.cancel_request(public_key); + } + #[cfg(not(wasm_browser))] pub fn resolve( &self, @@ -208,6 +213,15 @@ impl InflightPublishRequests { Ok(()) } + pub fn cancel_request(&self, public_key: &PublicKey) { + let mut inflight = self + .requests + .write() + .expect("InflightPublishRequests write lock"); + + inflight.remove(public_key); + } + pub fn add_result( &mut self, public_key: &PublicKey, @@ -219,28 +233,31 @@ impl InflightPublishRequests { } } + /// Returns true if request is done. fn add_success(&self, public_key: &PublicKey) -> Result { let mut inflight = self .requests .write() .expect("InflightPublishRequests write lock"); - let request = inflight.get_mut(public_key).expect("infallible"); - let majority = (self.relays_count / 2) + 1; + if let Some(request) = inflight.get_mut(public_key) { + let majority = (self.relays_count / 2) + 1; - request.success_count += 1; + request.success_count += 1; - if self.done(request) { - return Ok(true); - } else if request.success_count >= majority { - inflight.remove(public_key); + if self.done(request) || request.success_count >= majority { + inflight.remove(public_key); - return Ok(true); + Ok(true) + } else { + Ok(false) + } + } else { + Ok(true) } - - Ok(false) } + /// Returns true if request is done. fn add_error( &mut self, public_key: &PublicKey, @@ -251,47 +268,50 @@ impl InflightPublishRequests { .write() .expect("InflightPublishRequests write lock"); - let request = inflight.get_mut(public_key).expect("infallible"); - let majority = (self.relays_count / 2) + 1; + if let Some(request) = inflight.get_mut(public_key) { + let majority = (self.relays_count / 2) + 1; - // Add error, and return early error if necessary. - { - let count = request.errors.get(&error).unwrap_or(&0) + 1; - - if count >= majority - && matches!( - error, - PublishError::Concurrency(ConcurrencyError::NotMostRecent) - ) | matches!( - error, - PublishError::Concurrency(ConcurrencyError::CasFailed) - ) + // Add error, and return early error if necessary. { - inflight.remove(public_key); + let count = request.errors.get(&error).unwrap_or(&0) + 1; + + if count >= majority + && matches!( + error, + PublishError::Concurrency(ConcurrencyError::NotMostRecent) + ) | matches!( + error, + PublishError::Concurrency(ConcurrencyError::CasFailed) + ) + { + inflight.remove(public_key); + + return Err(error); + } - return Err(error); + request.errors.insert(error, count); } - request.errors.insert(error, count); - } + if self.done(request) { + let request = inflight.remove(public_key).expect("infallible"); - if self.done(request) { - let request = inflight.remove(public_key).expect("infallible"); + if request.success_count >= majority { + Ok(true) + } else { + let most_common_error = request + .errors + .into_iter() + .max_by_key(|&(_, count)| count) + .map(|(error, _)| error) + .expect("infallible"); - if request.success_count >= majority { - Ok(true) + Err(most_common_error) + } } else { - let most_common_error = request - .errors - .into_iter() - .max_by_key(|&(_, count)| count) - .map(|(error, _)| error) - .expect("infallible"); - - Err(most_common_error) + Ok(false) } } else { - Ok(false) + Ok(true) } } diff --git a/pkarr/src/client/tests.rs b/pkarr/src/client/tests.rs index bbcd611..cd789e1 100644 --- a/pkarr/src/client/tests.rs +++ b/pkarr/src/client/tests.rs @@ -564,3 +564,30 @@ async fn zero_cache_size(#[case] networks: Networks) { let resolved = b.resolve(&keypair.public_key()).await.unwrap(); assert_eq!(resolved.as_bytes(), signed_packet.as_bytes()); } + +#[rstest] +#[case::both_networks(Networks::Both)] +#[cfg_attr(feature = "relays", case::relays(Networks::Relays))] +#[tokio::test] +async fn clear_inflight_requests(#[case] networks: Networks) { + let testnet = mainline::Testnet::new(10).unwrap(); + let relay = Relay::run_test(&testnet).await.unwrap(); + + let client = builder(&relay, &testnet, networks).build().unwrap(); + + let keypair = Keypair::random(); + + let signed_packet_builder = + SignedPacket::builder().txt("foo".try_into().unwrap(), "bar".try_into().unwrap(), 30); + + client + .publish(&signed_packet_builder.clone().sign(&keypair).unwrap(), None) + .await + .unwrap(); + + // If there was a memory leak, we would get a `ConflictRisk` error instead. + client + .publish(&signed_packet_builder.sign(&keypair).unwrap(), None) + .await + .unwrap(); +} diff --git a/pkarr/src/extra/tls.rs b/pkarr/src/extra/tls.rs index e71c5c1..2f0c936 100644 --- a/pkarr/src/extra/tls.rs +++ b/pkarr/src/extra/tls.rs @@ -60,7 +60,7 @@ impl ServerCertVerifier for CertVerifier { // This won't be necessary if Reqwest enabled us to create a rustls configuration // per connection. // - // TODO: update this Reqwest enabled this. + // TODO: update after Reqwest enables this. let stream = self.0.resolve_https_endpoints(&qname); pin!(stream); for endpoint in block_on(stream) { From aaed12b00d66bd9a66c5b3168cb11aa25ffdf71f Mon Sep 17 00:00:00 2001 From: nazeh Date: Thu, 20 Feb 2025 11:10:24 +0300 Subject: [PATCH 5/6] chore: Release --- Cargo.lock | 2 +- pkarr/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a451544..87e6472 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1855,7 +1855,7 @@ dependencies = [ [[package]] name = "pkarr" -version = "3.3.2" +version = "3.3.3" dependencies = [ "anyhow", "async-compat", diff --git a/pkarr/Cargo.toml b/pkarr/Cargo.toml index 2d94852..c7a1368 100644 --- a/pkarr/Cargo.toml +++ b/pkarr/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pkarr" -version = "3.3.2" +version = "3.3.3" authors = ["Nuh "] edition = "2021" description = "Public-Key Addressable Resource Records (Pkarr); publish and resolve DNS records over Mainline DHT" From 7a935aa457b172e8fcc4ba333f059f44dd7e64e1 Mon Sep 17 00:00:00 2001 From: nazeh Date: Thu, 20 Feb 2025 12:16:03 +0300 Subject: [PATCH 6/6] chore: try to pass unit test in linux --- pkarr/src/client/tests.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkarr/src/client/tests.rs b/pkarr/src/client/tests.rs index cd789e1..737147c 100644 --- a/pkarr/src/client/tests.rs +++ b/pkarr/src/client/tests.rs @@ -585,6 +585,8 @@ async fn clear_inflight_requests(#[case] networks: Networks) { .await .unwrap(); + tokio::time::sleep(Duration::from_millis(200)).await; + // If there was a memory leak, we would get a `ConflictRisk` error instead. client .publish(&signed_packet_builder.sign(&keypair).unwrap(), None)