Skip to content

Commit

Permalink
HTTP API diagnostics, request details, IP address as SocketAddr, midd…
Browse files Browse the repository at this point in the history
…leware ordering fix
  • Loading branch information
spetz committed Nov 7, 2023
1 parent 50579e9 commit c635c85
Show file tree
Hide file tree
Showing 30 changed files with 146 additions and 78 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench/src/args/defaults.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use nonzero_lit::u32;
use std::num::NonZeroU32;

pub const DEFAULT_HTTP_API_URL: &str = "http://127.0.0.1:3000";
pub const DEFAULT_HTTP_SERVER_ADDRESS: &str = "127.0.0.1:3000";
pub const DEFAULT_HTTP_START_STREAM_ID: NonZeroU32 = u32!(1000000);

pub const DEFAULT_TCP_SERVER_ADDRESS: &str = "127.0.0.1:8090";
Expand Down
2 changes: 1 addition & 1 deletion bench/src/args/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl BenchmarkTransportProps for BenchmarkTransportCommand {
#[derive(Parser, Debug)]
pub struct HttpArgs {
/// Address of the HTTP iggy-server
#[arg(long, default_value_t = DEFAULT_HTTP_API_URL.to_owned())]
#[arg(long, default_value_t = DEFAULT_HTTP_SERVER_ADDRESS.to_owned())]
pub server_address: String,

/// Start stream id
Expand Down
5 changes: 3 additions & 2 deletions integration/tests/streaming/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iggy::identifier::Identifier;
use server::configs::server::PersonalAccessTokenConfig;
use server::streaming::session::Session;
use server::streaming::systems::system::System;
use std::net::{Ipv4Addr, SocketAddr};
use tokio::fs;

#[tokio::test]
Expand Down Expand Up @@ -39,7 +40,7 @@ async fn should_create_and_persist_stream() {
);
let stream_id = 1;
let stream_name = "test";
let session = Session::new(1, 1, "127.0.0.1".to_string());
let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
system.init().await.unwrap();

system
Expand All @@ -60,7 +61,7 @@ async fn should_delete_persisted_stream() {
);
let stream_id = 1;
let stream_name = "test";
let session = Session::new(1, 1, "127.0.0.1".to_string());
let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
system.init().await.unwrap();
system
.create_stream(&session, stream_id, stream_name)
Expand Down
3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.0.48"
version = "0.0.49"
edition = "2021"
build = "src/build.rs"

Expand Down Expand Up @@ -51,6 +51,7 @@ tower-service = "0.3.2"
tracing = { version = "0.1.*" }
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.17", features = ["fmt"] }
ulid = "1.1.0"
uuid = { version = "1.5.0", features = ["v4", "fast-rng", "zerocopy"] }
xxhash-rust = { version = "0.8.*", features = ["xxh32"] }

Expand Down
2 changes: 1 addition & 1 deletion server/src/http/consumer_groups.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::mapper;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming::session::Session;
use axum::extract::{Path, State};
use axum::http::StatusCode;
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/consumer_offsets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::session::Session;
use axum::extract::{Path, Query, State};
Expand Down
41 changes: 41 additions & 0 deletions server/src/http/diagnostics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::http::shared::RequestDetails;
use crate::streaming::utils::random_id;
use axum::{
extract::ConnectInfo,
http::{Request, StatusCode},
middleware::Next,
response::Response,
};
use std::net::SocketAddr;
use tokio::time::Instant;
use tracing::debug;

pub async fn request_diagnostics<T>(
ConnectInfo(ip_address): ConnectInfo<SocketAddr>,
mut request: Request<T>,
next: Next<T>,
) -> Result<Response, StatusCode> {
let request_id = random_id::get_ulid();
let path_and_query = request
.uri()
.path_and_query()
.map(|p| p.as_str())
.unwrap_or("/");
debug!(
"Processing a request {} {} with ID: {request_id} from client with IP address: {ip_address}...",
request.method(),
path_and_query,
);
request.extensions_mut().insert(RequestDetails {
request_id,
ip_address,
});
let now = Instant::now();
let result = Ok(next.run(request).await);
let elapsed = now.elapsed();
debug!(
"Processed a request with ID: {request_id} from client with IP address: {ip_address} in {} ms.",
elapsed.as_millis()
);
result
}
71 changes: 37 additions & 34 deletions server/src/http/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::configs::http::{HttpConfig, HttpCorsConfig};
use crate::http::diagnostics::request_diagnostics;
use crate::http::jwt::cleaner::start_expired_tokens_cleaner;
use crate::http::jwt::jwt_manager::JwtManager;
use crate::http::jwt::middleware::jwt_auth;
use crate::http::metrics::metrics;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::http::{
consumer_groups, consumer_offsets, messages, partitions, personal_access_tokens, streams,
system, topics, users,
Expand All @@ -26,46 +27,48 @@ pub async fn start(config: HttpConfig, system: SharedSystem) {
};

let app_state = build_app_state(&config, system).await;
let mut app = Router::new().nest(
"/",
system::router(app_state.clone(), &config.metrics)
.nest(
"/personal-access-tokens",
personal_access_tokens::router(app_state.clone()),
)
.nest("/users", users::router(app_state.clone()))
.nest(
"/streams",
streams::router(app_state.clone()).nest(
"/:stream_id/topics",
topics::router(app_state.clone())
.nest(
"/:topic_id/consumer-groups",
consumer_groups::router(app_state.clone()),
)
.nest("/:topic_id/messages", messages::router(app_state.clone()))
.nest(
"/:topic_id/consumer-offsets",
consumer_offsets::router(app_state.clone()),
)
.nest(
"/:topic_id/partitions",
partitions::router(app_state.clone()),
),
let mut app = Router::new()
.nest(
"/",
system::router(app_state.clone(), &config.metrics)
.nest(
"/personal-access-tokens",
personal_access_tokens::router(app_state.clone()),
)
.nest("/users", users::router(app_state.clone()))
.nest(
"/streams",
streams::router(app_state.clone()).nest(
"/:stream_id/topics",
topics::router(app_state.clone())
.nest(
"/:topic_id/consumer-groups",
consumer_groups::router(app_state.clone()),
)
.nest("/:topic_id/messages", messages::router(app_state.clone()))
.nest(
"/:topic_id/consumer-offsets",
consumer_offsets::router(app_state.clone()),
)
.nest(
"/:topic_id/partitions",
partitions::router(app_state.clone()),
),
),
),
),
);

if config.metrics.enabled {
app = app.layer(middleware::from_fn_with_state(app_state.clone(), metrics));
}
)
.layer(middleware::from_fn_with_state(app_state.clone(), jwt_auth));

if config.cors.enabled {
app = app.layer(configure_cors(config.cors));
}

if config.metrics.enabled {
app = app.layer(middleware::from_fn_with_state(app_state.clone(), metrics));
}

start_expired_tokens_cleaner(app_state.clone());
app = app.layer(middleware::from_fn_with_state(app_state.clone(), jwt_auth));
app = app.layer(middleware::from_fn(request_diagnostics));
info!("Started {api_name} on: {:?}", config.address);

if !config.tls.enabled {
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/jwt/cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::http::state::AppState;
use crate::http::shared::AppState;
use iggy::utils::timestamp::TimeStamp;
use std::sync::Arc;
use std::time::Duration;
Expand Down
3 changes: 2 additions & 1 deletion server/src/http/jwt/json_web_token.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use iggy::models::user_info::UserId;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;

#[derive(Debug, Clone)]
pub struct Identity {
pub token_id: String,
pub token_expiry: u64,
pub user_id: UserId,
pub ip_address: String,
pub ip_address: SocketAddr,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
15 changes: 3 additions & 12 deletions server/src/http/jwt/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use crate::http::jwt::json_web_token::Identity;
use crate::http::state::AppState;
use axum::extract::ConnectInfo;
use crate::http::shared::{AppState, RequestDetails};
use axum::{
extract::State,
http::{Request, StatusCode},
middleware::Next,
response::Response,
};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::debug;

const AUTHORIZATION: &str = "authorization";
const BEARER: &str = "Bearer ";
Expand All @@ -26,16 +23,9 @@ const UNAUTHORIZED_PATHS: &[&str] = &[

pub async fn jwt_auth<T>(
State(state): State<Arc<AppState>>,
ConnectInfo(ip_address): ConnectInfo<SocketAddr>,
mut request: Request<T>,
next: Next<T>,
) -> Result<Response, StatusCode> {
debug!(
"Processing request {} {} from client with IP address: {ip_address}.",
request.method(),
request.uri().path()
);

if UNAUTHORIZED_PATHS.contains(&request.uri().path()) {
return Ok(next.run(request).await);
}
Expand Down Expand Up @@ -65,11 +55,12 @@ pub async fn jwt_auth<T>(
return Err(StatusCode::UNAUTHORIZED);
}

let request_details = request.extensions().get::<RequestDetails>().unwrap();
let identity = Identity {
token_id: jwt_claims.claims.jti,
token_expiry: jwt_claims.claims.exp,
user_id: jwt_claims.claims.sub,
ip_address: ip_address.to_string(),
ip_address: request_details.ip_address,
};
request.extensions_mut().insert(identity);
Ok(next.run(request).await)
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming;
use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::session::Session;
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::http::state::AppState;
use crate::http::shared::AppState;
use axum::{
extract::State,
http::{Request, StatusCode},
Expand Down
3 changes: 2 additions & 1 deletion server/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod consumer_groups;
pub mod consumer_offsets;
pub mod diagnostics;
pub mod error;
pub mod http_server;
pub mod jwt;
Expand All @@ -8,7 +9,7 @@ pub mod messages;
pub mod metrics;
pub mod partitions;
pub mod personal_access_tokens;
pub mod state;
mod shared;
pub mod streams;
pub mod system;
pub mod topics;
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/partitions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming::session::Session;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/personal_access_tokens.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::mapper;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming::session::Session;
use axum::extract::{Path, State};
use axum::http::StatusCode;
Expand Down
8 changes: 8 additions & 0 deletions server/src/http/state.rs → server/src/http/shared.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use crate::http::jwt::jwt_manager::JwtManager;
use crate::streaming::systems::system::SharedSystem;
use std::net::SocketAddr;
use ulid::Ulid;

pub struct AppState {
pub jwt_manager: JwtManager,
pub system: SharedSystem,
}

#[derive(Debug)]
pub struct RequestDetails {
pub request_id: Ulid,
pub ip_address: SocketAddr,
}
2 changes: 1 addition & 1 deletion server/src/http/streams.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::mapper;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming::session::Session;
use axum::extract::{Path, State};
use axum::http::StatusCode;
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::configs::http::HttpMetricsConfig;
use crate::http::error::CustomError;
use crate::http::jwt::json_web_token::Identity;
use crate::http::mapper;
use crate::http::state::AppState;
use crate::http::shared::AppState;
use crate::streaming::session::Session;
use axum::extract::{Path, State};
use axum::routing::get;
Expand Down
Loading

0 comments on commit c635c85

Please sign in to comment.