Skip to content

Commit

Permalink
feat(relay): add Pkarr RelayBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Feb 13, 2025
1 parent 0dbdb9b commit ee81f7e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 21 deletions.
2 changes: 1 addition & 1 deletion relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Nuh <[email protected]>"]
edition = "2021"
description = "Pkarr relay (https://pkarr.org/relays)"
license = "MIT"
repository = "https://git.pkarr.org"
repository = "https://github.com/pubky/pkarr"
keywords = ["pkarr", "relay", "mainline", "dht"]
categories = ["network-programming"]

Expand Down
3 changes: 2 additions & 1 deletion relay/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

pub const DEFAULT_CACHE_SIZE: usize = 1_000_000;
pub const CACHE_DIR: &str = "pkarr-cache";

use crate::rate_limiting::RateLimiterConfig;

Expand Down Expand Up @@ -101,7 +102,7 @@ impl Config {
}

if let Some(path) = config_toml.cache_path {
config.cache_path = Some(PathBuf::from(path).join("pkarr-cache"));
config.cache_path = Some(PathBuf::from(path).join(CACHE_DIR));
}

config.cache_size = config_toml.cache_size.unwrap_or(DEFAULT_CACHE_SIZE);
Expand Down
109 changes: 96 additions & 13 deletions relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod rate_limiting;

use std::{
net::{SocketAddr, TcpListener},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
Expand All @@ -27,7 +28,79 @@ use tracing::info;
use pkarr::{extra::lmdb_cache::LmdbCache, Client, Timestamp};
use url::Url;

pub use config::Config;
use config::{Config, CACHE_DIR};

pub use rate_limiting::RateLimiterConfig;

/// A builder for Pkarr [Relay]
pub struct RelayBuilder(Config);

impl RelayBuilder {
/// Set the port for the HTTP endpoint.
pub fn http_port(&mut self, port: u16) -> &mut Self {
self.0.http_port = port;

self
}

/// Set the storage directory.
///
/// This Relay's cache will be stored in a subdirectory (`pkarr-cache`) inside
/// that storage directory
///
/// Defaults to the path to the user's data directory
pub fn storage(&mut self, storage: PathBuf) -> &mut Self {
self.0.cache_path = Some(storage.join(CACHE_DIR));

self
}

/// See [pkarr::ClientBuilder::cache_size]
///
/// Defaults to `1_000_000`
pub fn cache_size(&mut self, size: usize) -> &mut Self {
self.0.cache_size = size;

self
}

/// Disable the rate limiter.
///
/// Useful when running in a local test network.
pub fn disable_rate_limiter(&mut self) -> &mut Self {
self.0.rate_limiter = None;

self
}

/// Set the [RateLimiterConfig].
///
/// Defaults to [RateLimiterConfig::default].
pub fn rate_limiter_config(&mut self, config: RateLimiterConfig) -> &mut Self {
self.0.rate_limiter = Some(config);

self
}

/// Allows mutating the internal [pkarr::ClientBuilder] with a callback function.
pub fn pkarr<F>(&mut self, f: F) -> &mut Self
where
F: FnOnce(&mut pkarr::ClientBuilder) -> &mut pkarr::ClientBuilder,
{
f(&mut self.0.pkarr);

self
}

/// Run a Pkarr relay with the provided configuration.
///
/// # Safety
/// This method is marked as unsafe because it uses `LmdbCache`, which can lead to
/// undefined behavior if the lock file is corrupted or improperly handled.
pub async unsafe fn run(self) -> anyhow::Result<Relay> {
unsafe { Relay::run(self.0) }.await
}
}

/// A running instance of a Pkarr relay server.
///
Expand All @@ -50,7 +123,7 @@ impl Relay {
///
/// # Returns
/// A `Result` containing the `Relay` instance or an error.
pub async unsafe fn run(config: Config) -> anyhow::Result<Self> {
async unsafe fn run(config: Config) -> anyhow::Result<Self> {
let mut config = config;

tracing::debug!(?config, "Pkarr server config");
Expand All @@ -63,7 +136,7 @@ impl Relay {
"operating environment provides no directory for application data"
)
})?;
path.join("pkarr-relay")
path.join(CACHE_DIR)
}
};

Expand Down Expand Up @@ -114,12 +187,20 @@ impl Relay {
})
}

/// Convenient wrapper around [`Self::run`].
/// Create a builder for running a [Relay]
pub fn builder() -> RelayBuilder {
RelayBuilder(Default::default())
}

/// Run a [Relay] with a configuration file path.
///
/// # Safety
/// See [`Self::run`].
pub async fn run_unsafe(config: Config) -> anyhow::Result<Self> {
unsafe { Self::run(config).await }
/// Homeserver uses LMDB, opening which is marked [unsafe](https://docs.rs/heed/latest/heed/struct.EnvOpenOptions.html#safety-1),
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub async unsafe fn run_with_config_file(
config_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
unsafe { Self::run(Config::load(config_path).await?) }.await
}

/// Run an ephemeral Pkarr relay on a random port number for testing purposes.
Expand All @@ -128,12 +209,13 @@ impl Relay {
/// * `testnet` - A reference to a `mainline::Testnet` for bootstrapping the DHT.
///
/// # Safety
/// See [`Self::run`].
/// Homeserver uses LMDB, opening which is marked [unsafe](https://docs.rs/heed/latest/heed/struct.EnvOpenOptions.html#safety-1),
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub async fn run_test(testnet: &mainline::Testnet) -> anyhow::Result<Self> {
let storage = std::env::temp_dir().join(Timestamp::now().to_string());

let mut config = Config {
cache_path: Some(storage.join("pkarr-relay")),
cache_path: Some(storage.join(CACHE_DIR)),
http_port: 0,
..Default::default()
};
Expand All @@ -151,8 +233,9 @@ impl Relay {
/// Run a Pkarr relay in a Testnet mode (on port 15411).
///
/// # Safety
/// See [`Self::run`].
pub async fn run_testnet() -> anyhow::Result<Self> {
/// Homeserver uses LMDB, opening which is marked [unsafe](https://docs.rs/heed/latest/heed/struct.EnvOpenOptions.html#safety-1),
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub async unsafe fn run_testnet() -> anyhow::Result<Self> {
let testnet = mainline::Testnet::new(10)?;

// Leaking the testnet to avoid dropping and shutting them down.
Expand All @@ -164,7 +247,7 @@ impl Relay {

let mut config = Config {
http_port: 15411,
cache_path: Some(storage.join("pkarr-relay")),
cache_path: Some(storage.join(CACHE_DIR)),
rate_limiter: None,
..Default::default()
};
Expand All @@ -175,7 +258,7 @@ impl Relay {
.bootstrap(&testnet.bootstrap)
.dht(|builder| builder.server_mode());

unsafe { Self::run(config).await }
Self::run(config).await
}

/// Returns the HTTP socket address of the relay.
Expand Down
6 changes: 3 additions & 3 deletions relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::Parser;
use std::path::PathBuf;
use tracing::info;

use pkarr_relay::{Config, Relay};
use pkarr_relay::Relay;

#[derive(Parser, Debug)]
struct Cli {
Expand Down Expand Up @@ -35,9 +35,9 @@ async fn main() -> Result<()> {
if args.testnet {
Relay::run_testnet().await?
} else if let Some(config_path) = args.config {
Relay::run(Config::load(config_path).await?).await?
Relay::run_with_config_file(config_path).await?
} else {
Relay::run(Default::default()).await?
Relay::builder().run().await?
}
};

Expand Down
20 changes: 17 additions & 3 deletions relay/src/rate_limiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,24 @@ use tower_governor::{
pub use tower_governor::GovernorLayer;

#[derive(Serialize, Deserialize, Debug)]
/// Configurations for rate limitng.
pub struct RateLimiterConfig {
pub(crate) behind_proxy: bool,
pub(crate) per_second: u64,
pub(crate) burst_size: u32,
/// Enable rate limit based on headers commonly used by reverse proxies.
///
/// Uses headers commonly used by reverse proxies to extract the original IP address,
/// falling back to the connection's peer IP address.
/// <https://docs.rs/tower_governor/latest/tower_governor/key_extractor/struct.SmartIpKeyExtractor.html>
pub behind_proxy: bool,
/// Set the interval after which one element of the quota is replenished in seconds.
///
/// **The interval must not be zero.**
pub per_second: u64,
/// Set quota size that defines how many requests can occur
/// before the governor middleware starts blocking requests from an IP address and
/// clients have to wait until the elements of the quota are replenished.
///
/// **The burst_size must not be zero.**
pub burst_size: u32,
}

impl Default for RateLimiterConfig {
Expand Down

0 comments on commit ee81f7e

Please sign in to comment.