diff --git a/Cargo.lock b/Cargo.lock index 8e29cdc9e..2b0de2f1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2998,7 +2998,7 @@ dependencies = [ [[package]] name = "server" -version = "0.0.48" +version = "0.0.49" dependencies = [ "aes-gcm", "anyhow", @@ -3043,6 +3043,7 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", + "ulid", "uuid", "vergen", "xxhash-rust", @@ -3634,6 +3635,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "ulid" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e37c4b6cbcc59a8dcd09a6429fbc7890286bcbb79215cea7b38a3c4c0921d93" +dependencies = [ + "rand", +] + [[package]] name = "uncased" version = "0.9.9" diff --git a/bench/src/args/defaults.rs b/bench/src/args/defaults.rs index 9d81f4576..72d650ae0 100644 --- a/bench/src/args/defaults.rs +++ b/bench/src/args/defaults.rs @@ -1,7 +1,7 @@ use nonzero_lit::u32; use std::num::NonZeroU32; -pub const DEFAULT_HTTP_API_URL: &str = "http://127.0.0.1:3000"; +pub const DEFAULT_HTTP_SERVER_ADDRESS: &str = "127.0.0.1:3000"; pub const DEFAULT_HTTP_START_STREAM_ID: NonZeroU32 = u32!(1000000); pub const DEFAULT_TCP_SERVER_ADDRESS: &str = "127.0.0.1:8090"; diff --git a/bench/src/args/transport.rs b/bench/src/args/transport.rs index 8d5890a8a..b61383730 100644 --- a/bench/src/args/transport.rs +++ b/bench/src/args/transport.rs @@ -44,7 +44,7 @@ impl BenchmarkTransportProps for BenchmarkTransportCommand { #[derive(Parser, Debug)] pub struct HttpArgs { /// Address of the HTTP iggy-server - #[arg(long, default_value_t = DEFAULT_HTTP_API_URL.to_owned())] + #[arg(long, default_value_t = DEFAULT_HTTP_SERVER_ADDRESS.to_owned())] pub server_address: String, /// Start stream id diff --git a/integration/tests/streaming/system.rs b/integration/tests/streaming/system.rs index efa4abc9f..eecaee882 100644 --- a/integration/tests/streaming/system.rs +++ b/integration/tests/streaming/system.rs @@ -3,6 +3,7 @@ use iggy::identifier::Identifier; use server::configs::server::PersonalAccessTokenConfig; use server::streaming::session::Session; use server::streaming::systems::system::System; +use std::net::{Ipv4Addr, SocketAddr}; use tokio::fs; #[tokio::test] @@ -39,7 +40,7 @@ async fn should_create_and_persist_stream() { ); let stream_id = 1; let stream_name = "test"; - let session = Session::new(1, 1, "127.0.0.1".to_string()); + let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); system.init().await.unwrap(); system @@ -60,7 +61,7 @@ async fn should_delete_persisted_stream() { ); let stream_id = 1; let stream_name = "test"; - let session = Session::new(1, 1, "127.0.0.1".to_string()); + let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); system.init().await.unwrap(); system .create_stream(&session, stream_id, stream_name) diff --git a/server/Cargo.toml b/server/Cargo.toml index ead379b3e..39b1af7c5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.0.48" +version = "0.0.49" edition = "2021" build = "src/build.rs" @@ -51,6 +51,7 @@ tower-service = "0.3.2" tracing = { version = "0.1.*" } tracing-appender = "0.2.2" tracing-subscriber = { version = "0.3.17", features = ["fmt"] } +ulid = "1.1.0" uuid = { version = "1.5.0", features = ["v4", "fast-rng", "zerocopy"] } xxhash-rust = { version = "0.8.*", features = ["xxh32"] } diff --git a/server/src/http/consumer_groups.rs b/server/src/http/consumer_groups.rs index 12148b87d..060b316cd 100644 --- a/server/src/http/consumer_groups.rs +++ b/server/src/http/consumer_groups.rs @@ -1,7 +1,7 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; diff --git a/server/src/http/consumer_offsets.rs b/server/src/http/consumer_offsets.rs index a1441a127..44892c7c8 100644 --- a/server/src/http/consumer_offsets.rs +++ b/server/src/http/consumer_offsets.rs @@ -1,6 +1,6 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::session::Session; use axum::extract::{Path, Query, State}; diff --git a/server/src/http/diagnostics.rs b/server/src/http/diagnostics.rs new file mode 100644 index 000000000..d417d75d6 --- /dev/null +++ b/server/src/http/diagnostics.rs @@ -0,0 +1,41 @@ +use crate::http::shared::RequestDetails; +use crate::streaming::utils::random_id; +use axum::{ + extract::ConnectInfo, + http::{Request, StatusCode}, + middleware::Next, + response::Response, +}; +use std::net::SocketAddr; +use tokio::time::Instant; +use tracing::debug; + +pub async fn request_diagnostics( + ConnectInfo(ip_address): ConnectInfo, + mut request: Request, + next: Next, +) -> Result { + let request_id = random_id::get_ulid(); + let path_and_query = request + .uri() + .path_and_query() + .map(|p| p.as_str()) + .unwrap_or("/"); + debug!( + "Processing a request {} {} with ID: {request_id} from client with IP address: {ip_address}...", + request.method(), + path_and_query, + ); + request.extensions_mut().insert(RequestDetails { + request_id, + ip_address, + }); + let now = Instant::now(); + let result = Ok(next.run(request).await); + let elapsed = now.elapsed(); + debug!( + "Processed a request with ID: {request_id} from client with IP address: {ip_address} in {} ms.", + elapsed.as_millis() + ); + result +} diff --git a/server/src/http/http_server.rs b/server/src/http/http_server.rs index 8320d4d81..0986d9964 100644 --- a/server/src/http/http_server.rs +++ b/server/src/http/http_server.rs @@ -1,9 +1,10 @@ use crate::configs::http::{HttpConfig, HttpCorsConfig}; +use crate::http::diagnostics::request_diagnostics; use crate::http::jwt::cleaner::start_expired_tokens_cleaner; use crate::http::jwt::jwt_manager::JwtManager; use crate::http::jwt::middleware::jwt_auth; use crate::http::metrics::metrics; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::http::{ consumer_groups, consumer_offsets, messages, partitions, personal_access_tokens, streams, system, topics, users, @@ -26,46 +27,48 @@ pub async fn start(config: HttpConfig, system: SharedSystem) { }; let app_state = build_app_state(&config, system).await; - let mut app = Router::new().nest( - "/", - system::router(app_state.clone(), &config.metrics) - .nest( - "/personal-access-tokens", - personal_access_tokens::router(app_state.clone()), - ) - .nest("/users", users::router(app_state.clone())) - .nest( - "/streams", - streams::router(app_state.clone()).nest( - "/:stream_id/topics", - topics::router(app_state.clone()) - .nest( - "/:topic_id/consumer-groups", - consumer_groups::router(app_state.clone()), - ) - .nest("/:topic_id/messages", messages::router(app_state.clone())) - .nest( - "/:topic_id/consumer-offsets", - consumer_offsets::router(app_state.clone()), - ) - .nest( - "/:topic_id/partitions", - partitions::router(app_state.clone()), - ), + let mut app = Router::new() + .nest( + "/", + system::router(app_state.clone(), &config.metrics) + .nest( + "/personal-access-tokens", + personal_access_tokens::router(app_state.clone()), + ) + .nest("/users", users::router(app_state.clone())) + .nest( + "/streams", + streams::router(app_state.clone()).nest( + "/:stream_id/topics", + topics::router(app_state.clone()) + .nest( + "/:topic_id/consumer-groups", + consumer_groups::router(app_state.clone()), + ) + .nest("/:topic_id/messages", messages::router(app_state.clone())) + .nest( + "/:topic_id/consumer-offsets", + consumer_offsets::router(app_state.clone()), + ) + .nest( + "/:topic_id/partitions", + partitions::router(app_state.clone()), + ), + ), ), - ), - ); - - if config.metrics.enabled { - app = app.layer(middleware::from_fn_with_state(app_state.clone(), metrics)); - } + ) + .layer(middleware::from_fn_with_state(app_state.clone(), jwt_auth)); if config.cors.enabled { app = app.layer(configure_cors(config.cors)); } + if config.metrics.enabled { + app = app.layer(middleware::from_fn_with_state(app_state.clone(), metrics)); + } + start_expired_tokens_cleaner(app_state.clone()); - app = app.layer(middleware::from_fn_with_state(app_state.clone(), jwt_auth)); + app = app.layer(middleware::from_fn(request_diagnostics)); info!("Started {api_name} on: {:?}", config.address); if !config.tls.enabled { diff --git a/server/src/http/jwt/cleaner.rs b/server/src/http/jwt/cleaner.rs index 1c1ab7638..4b024fd76 100644 --- a/server/src/http/jwt/cleaner.rs +++ b/server/src/http/jwt/cleaner.rs @@ -1,4 +1,4 @@ -use crate::http::state::AppState; +use crate::http::shared::AppState; use iggy::utils::timestamp::TimeStamp; use std::sync::Arc; use std::time::Duration; diff --git a/server/src/http/jwt/json_web_token.rs b/server/src/http/jwt/json_web_token.rs index 8f12b2d35..460198134 100644 --- a/server/src/http/jwt/json_web_token.rs +++ b/server/src/http/jwt/json_web_token.rs @@ -1,12 +1,13 @@ use iggy::models::user_info::UserId; use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; #[derive(Debug, Clone)] pub struct Identity { pub token_id: String, pub token_expiry: u64, pub user_id: UserId, - pub ip_address: String, + pub ip_address: SocketAddr, } #[derive(Debug, Serialize, Deserialize)] diff --git a/server/src/http/jwt/middleware.rs b/server/src/http/jwt/middleware.rs index 5eb46e684..6f8b12921 100644 --- a/server/src/http/jwt/middleware.rs +++ b/server/src/http/jwt/middleware.rs @@ -1,15 +1,12 @@ use crate::http::jwt::json_web_token::Identity; -use crate::http::state::AppState; -use axum::extract::ConnectInfo; +use crate::http::shared::{AppState, RequestDetails}; use axum::{ extract::State, http::{Request, StatusCode}, middleware::Next, response::Response, }; -use std::net::SocketAddr; use std::sync::Arc; -use tracing::debug; const AUTHORIZATION: &str = "authorization"; const BEARER: &str = "Bearer "; @@ -26,16 +23,9 @@ const UNAUTHORIZED_PATHS: &[&str] = &[ pub async fn jwt_auth( State(state): State>, - ConnectInfo(ip_address): ConnectInfo, mut request: Request, next: Next, ) -> Result { - debug!( - "Processing request {} {} from client with IP address: {ip_address}.", - request.method(), - request.uri().path() - ); - if UNAUTHORIZED_PATHS.contains(&request.uri().path()) { return Ok(next.run(request).await); } @@ -65,11 +55,12 @@ pub async fn jwt_auth( return Err(StatusCode::UNAUTHORIZED); } + let request_details = request.extensions().get::().unwrap(); let identity = Identity { token_id: jwt_claims.claims.jti, token_expiry: jwt_claims.claims.exp, user_id: jwt_claims.claims.sub, - ip_address: ip_address.to_string(), + ip_address: request_details.ip_address, }; request.extensions_mut().insert(identity); Ok(next.run(request).await) diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index f8c19dc24..f09aa8984 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -1,6 +1,6 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::session::Session; diff --git a/server/src/http/metrics.rs b/server/src/http/metrics.rs index a5a1c15c9..197544853 100644 --- a/server/src/http/metrics.rs +++ b/server/src/http/metrics.rs @@ -1,4 +1,4 @@ -use crate::http::state::AppState; +use crate::http::shared::AppState; use axum::{ extract::State, http::{Request, StatusCode}, diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 1df788a0f..7916ae4eb 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -1,5 +1,6 @@ pub mod consumer_groups; pub mod consumer_offsets; +pub mod diagnostics; pub mod error; pub mod http_server; pub mod jwt; @@ -8,7 +9,7 @@ pub mod messages; pub mod metrics; pub mod partitions; pub mod personal_access_tokens; -pub mod state; +mod shared; pub mod streams; pub mod system; pub mod topics; diff --git a/server/src/http/partitions.rs b/server/src/http/partitions.rs index 24647bcf2..24978b0af 100644 --- a/server/src/http/partitions.rs +++ b/server/src/http/partitions.rs @@ -1,6 +1,6 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; diff --git a/server/src/http/personal_access_tokens.rs b/server/src/http/personal_access_tokens.rs index 5bfc70102..74e008826 100644 --- a/server/src/http/personal_access_tokens.rs +++ b/server/src/http/personal_access_tokens.rs @@ -1,7 +1,7 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; diff --git a/server/src/http/state.rs b/server/src/http/shared.rs similarity index 55% rename from server/src/http/state.rs rename to server/src/http/shared.rs index db2859675..a7710016e 100644 --- a/server/src/http/state.rs +++ b/server/src/http/shared.rs @@ -1,7 +1,15 @@ use crate::http::jwt::jwt_manager::JwtManager; use crate::streaming::systems::system::SharedSystem; +use std::net::SocketAddr; +use ulid::Ulid; pub struct AppState { pub jwt_manager: JwtManager, pub system: SharedSystem, } + +#[derive(Debug)] +pub struct RequestDetails { + pub request_id: Ulid, + pub ip_address: SocketAddr, +} diff --git a/server/src/http/streams.rs b/server/src/http/streams.rs index 6580af7b1..c221332cf 100644 --- a/server/src/http/streams.rs +++ b/server/src/http/streams.rs @@ -1,7 +1,7 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; diff --git a/server/src/http/system.rs b/server/src/http/system.rs index 6b2759e41..02a35bb4f 100644 --- a/server/src/http/system.rs +++ b/server/src/http/system.rs @@ -2,7 +2,7 @@ use crate::configs::http::HttpMetricsConfig; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::routing::get; diff --git a/server/src/http/topics.rs b/server/src/http/topics.rs index 00deb3f06..096035e61 100644 --- a/server/src/http/topics.rs +++ b/server/src/http/topics.rs @@ -1,7 +1,7 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; diff --git a/server/src/http/users.rs b/server/src/http/users.rs index b654589b4..676051baa 100644 --- a/server/src/http/users.rs +++ b/server/src/http/users.rs @@ -1,7 +1,7 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; -use crate::http::state::AppState; +use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs index ebd05b14f..b56cbc790 100644 --- a/server/src/quic/listener.rs +++ b/server/src/quic/listener.rs @@ -46,7 +46,7 @@ async fn handle_connection( .await .add_client(&address, Transport::Quic) .await; - let mut session = Session::from_client_id(client_id, address.to_string()); + let mut session = Session::from_client_id(client_id, address); loop { let stream = connection.accept_bi().await; let mut stream = match stream { diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 596731363..5194a52a7 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -338,7 +338,7 @@ impl Partition { .into_iter() .filter_map(|mut message| { if message.id == 0 || !message_ids.contains(&message.id) { - message.id = random_id::get(); + message.id = random_id::get_uuid(); message_ids.insert(message.id); Some(message) } else { diff --git a/server/src/streaming/session.rs b/server/src/streaming/session.rs index 211b581ed..b6cdacd80 100644 --- a/server/src/streaming/session.rs +++ b/server/src/streaming/session.rs @@ -1,16 +1,17 @@ use iggy::models::user_info::UserId; use std::fmt::Display; +use std::net::SocketAddr; // This might be extended with more fields in the future e.g. custom name, permissions etc. #[derive(Debug)] pub struct Session { pub user_id: UserId, pub client_id: u32, - pub ip_address: String, + pub ip_address: SocketAddr, } impl Session { - pub fn new(client_id: u32, user_id: UserId, ip_address: String) -> Self { + pub fn new(client_id: u32, user_id: UserId, ip_address: SocketAddr) -> Self { Self { client_id, user_id, @@ -18,11 +19,11 @@ impl Session { } } - pub fn stateless(user_id: UserId, ip_address: String) -> Self { + pub fn stateless(user_id: UserId, ip_address: SocketAddr) -> Self { Self::new(0, user_id, ip_address) } - pub fn from_client_id(client_id: u32, ip_address: String) -> Self { + pub fn from_client_id(client_id: u32, ip_address: SocketAddr) -> Self { Self::new(client_id, 0, ip_address) } diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index b2f9cc11b..4ed002bb8 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -273,6 +273,7 @@ mod tests { use crate::configs::system::SystemConfig; use crate::streaming::storage::tests::get_test_system_storage; use crate::streaming::users::user::User; + use std::net::{Ipv4Addr, SocketAddr}; #[tokio::test] async fn should_get_stream_by_id_and_name() { @@ -283,7 +284,11 @@ mod tests { let mut system = System::create(config, storage, None, PersonalAccessTokenConfig::default()); let root = User::root(); - let session = Session::new(1, root.id, "127.0.0.1".to_string()); + let session = Session::new( + 1, + root.id, + SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234), + ); system.permissioner.init_permissions_for_user(root); system .create_stream(&session, stream_id, stream_name) diff --git a/server/src/streaming/utils/random_id.rs b/server/src/streaming/utils/random_id.rs index 4a2cac50b..8c050efc9 100644 --- a/server/src/streaming/utils/random_id.rs +++ b/server/src/streaming/utils/random_id.rs @@ -1,5 +1,10 @@ +use ulid::Ulid; use uuid::Uuid; -pub fn get() -> u128 { +pub fn get_uuid() -> u128 { Uuid::new_v4().to_u128_le() } + +pub fn get_ulid() -> Ulid { + Ulid::new() +} diff --git a/server/src/tcp/connection_handler.rs b/server/src/tcp/connection_handler.rs index 5787dda65..da38fabf4 100644 --- a/server/src/tcp/connection_handler.rs +++ b/server/src/tcp/connection_handler.rs @@ -13,17 +13,17 @@ use tracing::{debug, error, info}; const INITIAL_BYTES_LENGTH: usize = 4; pub(crate) async fn handle_connection( - address: &SocketAddr, + address: SocketAddr, sender: &mut dyn Sender, system: SharedSystem, ) -> Result<(), ServerError> { let client_id = system .read() .await - .add_client(address, Transport::Tcp) + .add_client(&address, Transport::Tcp) .await; - let mut session = Session::from_client_id(client_id, address.to_string()); + let mut session = Session::from_client_id(client_id, address); let mut initial_buffer = [0u8; INITIAL_BYTES_LENGTH]; loop { let read_length = sender.read(&mut initial_buffer).await?; diff --git a/server/src/tcp/tcp_listener.rs b/server/src/tcp/tcp_listener.rs index b143f5ff2..502c83fad 100644 --- a/server/src/tcp/tcp_listener.rs +++ b/server/src/tcp/tcp_listener.rs @@ -21,7 +21,7 @@ pub fn start(address: &str, system: SharedSystem) { let mut sender = TcpSender { stream }; tokio::spawn(async move { if let Err(error) = - handle_connection(&address, &mut sender, system.clone()).await + handle_connection(address, &mut sender, system.clone()).await { handle_error(error); system.read().await.delete_client(&address).await; diff --git a/server/src/tcp/tcp_tls_listener.rs b/server/src/tcp/tcp_tls_listener.rs index 06aa1e165..0bbd5d937 100644 --- a/server/src/tcp/tcp_tls_listener.rs +++ b/server/src/tcp/tcp_tls_listener.rs @@ -42,7 +42,7 @@ pub(crate) fn start(address: &str, config: TcpTlsConfig, system: SharedSystem) { let mut sender = TcpTlsSender { stream }; tokio::spawn(async move { if let Err(error) = - handle_connection(&address, &mut sender, system.clone()).await + handle_connection(address, &mut sender, system.clone()).await { handle_error(error); system.read().await.delete_client(&address).await;