diff --git a/Cargo.lock b/Cargo.lock index 4811fe7c3..69e04fecf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1551,6 +1551,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rusty_ulid", "serde", "serde_json", "sha2", diff --git a/bin/config.toml b/bin/config.toml index 0dfced877..5385d2e63 100644 --- a/bin/config.toml +++ b/bin/config.toml @@ -35,6 +35,11 @@ log_target = "stdout" # It supports the same options as log_target # log_access_target = "file:///var/logs/sozu-access.log" +# format of the access logs. Defaults to ascii. +# - ascii +# - binary (defined in [sozu_command_lib::proto::command::BinaryAccessLog]) +# log_access_format = "ascii" + # path to the unix socket file used to send commands to sozu # default value points to "sozu.sock" file in the current directory command_socket = "./sozu.sock" diff --git a/command/Cargo.toml b/command/Cargo.toml index c2a428f52..b5a63d361 100644 --- a/command/Cargo.toml +++ b/command/Cargo.toml @@ -46,6 +46,7 @@ prettytable-rs = { version = "^0.10.0", default-features = false } pool = "^0.1.4" poule = "^0.3.2" thiserror = "^1.0.56" +rusty_ulid = "^2.0.0" x509-parser = "^0.15.1" [features] diff --git a/command/src/access_logs.rs b/command/src/access_logs.rs new file mode 100644 index 000000000..85d3c946d --- /dev/null +++ b/command/src/access_logs.rs @@ -0,0 +1,181 @@ +use std::{collections::BTreeMap, mem::ManuallyDrop, net::SocketAddr}; + +use rusty_ulid::Ulid; +use time::Duration; + +use crate::proto::command::{BinaryAccessLog, BinaryEndpoint, RequestUlid, TcpEndpoint, Uint128}; + +/// This uses unsafe to creates a "fake" owner of the underlying data. +/// Beware that for the compiler it is as legitimate as the original owner. +/// So you have to elide one of them (with std::mem::forget or ManuallyDrop) +/// before it is drop to avoid a double free. +/// +/// This trait works on &T and Option<&T> types +trait DuplicateOwnership { + type Target; + /// Don't forget to use std::mem::forget or ManuallyDrop over one of your owners + unsafe fn duplicate(self) -> Self::Target; +} + +impl DuplicateOwnership for &T { + type Target = T; + unsafe fn duplicate(self) -> T { + std::ptr::read(self as *const T) + } +} +impl<'a, T> DuplicateOwnership for Option<&'a T> +where + T: ?Sized, + &'a T: DuplicateOwnership + 'a, +{ + type Target = Option<<&'a T as DuplicateOwnership>::Target>; + unsafe fn duplicate(self) -> Self::Target { + self.map(|t| t.duplicate()) + } +} +impl DuplicateOwnership for &str { + type Target = String; + unsafe fn duplicate(self) -> Self::Target { + String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) + } +} +impl DuplicateOwnership for &[T] { + type Target = Vec; + unsafe fn duplicate(self) -> Self::Target { + Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len()) + } +} + +pub fn prepare_user_agent(user_agent: &str) -> String { + let mut user_agent = user_agent.replace(' ', "_"); + let mut ua_bytes = std::mem::take(&mut user_agent).into_bytes(); + if let Some(last) = ua_bytes.last_mut() { + if *last == b',' { + *last = b'!' + } + } + unsafe { String::from_utf8_unchecked(ua_bytes) } +} + +pub struct LogDuration(pub Option); + +#[derive(Debug)] +pub struct LogContext<'a> { + pub request_id: Ulid, + pub cluster_id: Option<&'a str>, + pub backend_id: Option<&'a str>, +} + +pub enum EndpointRecord<'a> { + Http { + method: Option<&'a str>, + authority: Option<&'a str>, + path: Option<&'a str>, + status: Option, + reason: Option<&'a str>, + }, + Tcp { + context: Option<&'a str>, + }, +} + +/// used to aggregate tags in a session +#[derive(Debug)] +pub struct CachedTags { + pub tags: BTreeMap, + pub concatenated: String, +} + +impl CachedTags { + pub fn new(tags: BTreeMap) -> Self { + let concatenated = tags + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(", "); + Self { tags, concatenated } + } +} + +/// Intermediate representation of an access log agnostic of the final format. +/// Every field is a reference to avoid capturing ownership (as a logger should). +pub struct RequestRecord<'a> { + pub error: &'a Option<&'a str>, + pub context: &'a LogContext<'a>, + pub session_address: &'a Option, + pub backend_address: &'a Option, + pub protocol: &'a str, + pub endpoint: &'a EndpointRecord<'a>, + pub tags: &'a Option<&'a CachedTags>, + pub client_rtt: &'a Option, + pub server_rtt: &'a Option, + pub user_agent: &'a Option, + pub service_time: &'a Duration, + pub response_time: &'a Duration, + pub bytes_in: &'a usize, + pub bytes_out: &'a usize, +} + +impl RequestRecord<'_> { + /// Converts the RequestRecord in its protobuf representation. + /// Prost needs ownership over all the fields but we don't want to take it from the user + /// or clone them, so we use the unsafe DuplicateOwnership. + pub unsafe fn into_binary_access_log( + self, + time: i128, + tag: &str, + ) -> ManuallyDrop { + let (first, second) = self.context.request_id.into(); + let request_id = RequestUlid { first, second }; + let time: Uint128 = time.into(); + + let endpoint = match self.endpoint { + EndpointRecord::Http { + method, + authority, + path, + status, + reason, + } => crate::proto::command::binary_endpoint::Inner::Http( + crate::proto::command::HttpEndpoint { + method: method.duplicate().duplicate(), + authority: authority.duplicate().duplicate(), + path: path.duplicate().duplicate(), + status: status.map(|s| s as u32), + reason: reason.duplicate().duplicate(), + }, + ), + EndpointRecord::Tcp { context } => { + crate::proto::command::binary_endpoint::Inner::Tcp(TcpEndpoint { + context: context.duplicate().duplicate(), + }) + } + }; + + ManuallyDrop::new(BinaryAccessLog { + backend_address: self.backend_address.map(Into::into), + backend_id: self.context.backend_id.duplicate(), + bytes_in: *self.bytes_in as u64, + bytes_out: *self.bytes_out as u64, + client_rtt: self.client_rtt.map(|t| t.whole_microseconds() as u64), + cluster_id: self.context.cluster_id.duplicate(), + endpoint: BinaryEndpoint { + inner: Some(endpoint), + }, + error: self.error.duplicate().duplicate(), + protocol: self.protocol.duplicate(), + request_id, + response_time: self.response_time.whole_microseconds() as u64, + server_rtt: self.server_rtt.map(|t| t.whole_microseconds() as u64), + service_time: self.service_time.whole_microseconds() as u64, + session_address: self.session_address.map(Into::into), + tags: self + .tags + .map(|tags| tags.tags.duplicate()) + .unwrap_or_default(), + user_agent: self.user_agent.duplicate(), + tag: tag.duplicate(), + time: time.duplicate(), + }) + } +} diff --git a/command/src/command.proto b/command/src/command.proto index e7c92547a..81dc3814d 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -619,7 +619,7 @@ message SocketAddress { message IpAddress { oneof inner { - uint32 v4 = 1; + fixed32 v4 = 1; Uint128 v6 = 2; } } @@ -630,5 +630,71 @@ message Uint128 { required uint64 low = 1; // lower value, last 8 bytes of the ip required uint64 high = 2; +} + +// An access log, meant to be passed to another agent +message BinaryAccessLog { + // error message if any + optional string error = 1; + // LogContext = request_id + cluster_id + backend_id + required RequestUlid request_id = 2; + // id of the cluster (set of frontend, backend, routing rules) + optional string cluster_id = 3; + // id of the backend (the server to which the traffic is redirected) + optional string backend_id = 4; + // ip and port of the client + optional SocketAddress session_address = 5; + // socket address of the backend server + optional SocketAddress backend_address = 6; + // the protocol, with SSL/TLS version, for instance "HTTPS-TLS1.1" + required string protocol = 7; + // TCP or HTTP endpoint (method, path, context...) + required BinaryEndpoint endpoint = 8; + // round trip time for the client (microseconds) + optional uint64 client_rtt = 9; + // round trip time for the backend (microseconds) + optional uint64 server_rtt = 10; + // time for the backend to respond (microseconds) + required uint64 response_time = 12; + // time spent on a session (microseconds) + required uint64 service_time = 13; + // number of bytes received from the client + required uint64 bytes_in = 14; + // number of bytes written to the client + required uint64 bytes_out = 15; + // value of the User-Agent header, if any + optional string user_agent = 16; + // custom tags as key-values, for instance owner_id: MyOrganisation + map tags = 17; + // short description of which process sends the log, for instance: "WRK-02" + required string tag = 18; + // POSIX timestamp, nanoseconds + required Uint128 time = 19; +} + +// this matches the way the rusty_ulid crate represent a ULID +message RequestUlid { + required uint64 first = 1; + required uint64 second = 2; +} + +message BinaryEndpoint { + oneof inner { + HttpEndpoint http = 1; + TcpEndpoint tcp = 2; + } +} + +message HttpEndpoint { + optional string method = 1; + optional string authority = 2; + optional string path = 3; + // warning: this should be a u16 but protobuf only has uint32. + // Make sure the value never exceeds u16 bounds. + optional uint32 status = 4; + optional string reason = 5; +} -} \ No newline at end of file +message TcpEndpoint { + optional string context = 1; +} diff --git a/command/src/config.rs b/command/src/config.rs index 594634021..2dcad8e8e 100644 --- a/command/src/config.rs +++ b/command/src/config.rs @@ -61,6 +61,7 @@ use toml; use crate::{ certificate::split_certificate_chain, + logging::AccessLogFormat, proto::command::{ request::RequestType, ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster, HttpListenerConfig, HttpsListenerConfig, ListenerType, LoadBalancingAlgorithms, @@ -1088,6 +1089,7 @@ pub struct FileConfig { pub log_target: Option, #[serde(default)] pub log_access_target: Option, + pub log_access_format: Option, pub worker_count: Option, pub worker_automatic_restart: Option, pub metrics: Option, @@ -1199,6 +1201,7 @@ impl ConfigBuilder { front_timeout: file_config.front_timeout.unwrap_or(DEFAULT_FRONT_TIMEOUT), handle_process_affinity: file_config.handle_process_affinity.unwrap_or(false), log_access_target: file_config.log_access_target.clone(), + log_access_format: file_config.log_access_format.clone(), log_level: file_config .log_level .clone() @@ -1445,6 +1448,7 @@ pub struct Config { pub log_target: String, #[serde(default)] pub log_access_target: Option, + pub log_access_format: Option, pub worker_count: u16, pub worker_automatic_restart: bool, pub metrics: Option, diff --git a/command/src/lib.rs b/command/src/lib.rs index d7422ec4e..1f7aebda6 100644 --- a/command/src/lib.rs +++ b/command/src/lib.rs @@ -6,6 +6,7 @@ extern crate serde; #[macro_use] /// custom made logging macros pub mod logging; +pub mod access_logs; /// Custom buffer used for parsing within the Sōzu codebase. pub mod buffer; /// TLS certificates diff --git a/command/src/logging.rs b/command/src/logging.rs index 0d6c3d5aa..c384f4c4c 100644 --- a/command/src/logging.rs +++ b/command/src/logging.rs @@ -2,9 +2,9 @@ use std::{ cell::RefCell, cmp::{self, Ord}, env, - fmt::{format, Arguments}, + fmt::Arguments, fs::{File, OpenOptions}, - io::{stdout, Stdout, Write}, + io::{stdout, Error as IoError, ErrorKind as IoErrorKind, Stdout, Write}, net::{SocketAddr, TcpStream, ToSocketAddrs, UdpSocket}, path::Path, str::FromStr, @@ -12,13 +12,15 @@ use std::{ use libc; use mio::net::UnixDatagram; +use prost::{encoding::encoded_len_varint, Message}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use crate::config::Config; +pub use crate::access_logs::*; + +use crate::{bind_format_args, config::Config}; thread_local! { pub static LOGGER: RefCell = RefCell::new(Logger::new()); - pub static TAG: String = LOGGER.with(|logger| {logger.borrow().tag.clone()}); } // TODO: check if this error is critical: @@ -26,10 +28,25 @@ thread_local! { // The CompatLogger may need a variable that tells wether it has been initiated already pub static COMPAT_LOGGER: CompatLogger = CompatLogger; -pub struct Logger { +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(deny_unknown_fields, rename_all = "lowercase")] +pub enum AccessLogFormat { + Ascii, + Binary, +} + +pub struct InnerLogger { pub directives: Vec, pub backend: LoggerBackend, + /// target of the access logs pub access_backend: Option, + /// how to format the access logs + pub access_log_format: AccessLogFormat, + pub buffer: Vec, +} + +pub struct Logger { + pub inner: InnerLogger, /// is displayed in each log, for instance "MAIN" or worker_id pub tag: String, /// the pid of the current process (main or worker) @@ -37,16 +54,32 @@ pub struct Logger { pub initialized: bool, } +impl std::ops::Deref for Logger { + type Target = InnerLogger; + fn deref(&self) -> &Self::Target { + &self.inner + } +} +impl std::ops::DerefMut for Logger { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + impl Default for Logger { fn default() -> Self { Self { - directives: vec![LogDirective { - name: None, - level: LogLevelFilter::Error, - }], - backend: LoggerBackend::Stdout(stdout()), - access_backend: None, - tag: "SOZU".to_string(), + inner: InnerLogger { + directives: vec![LogDirective { + name: None, + level: LogLevelFilter::Error, + }], + backend: LoggerBackend::Stdout(stdout()), + access_backend: None, + access_log_format: AccessLogFormat::Ascii, + buffer: Vec::with_capacity(4096), + }, + tag: "UNINITIALIZED".to_string(), pid: 0, initialized: false, } @@ -63,14 +96,16 @@ impl Logger { spec: &str, backend: LoggerBackend, access_backend: Option, + access_log_format: Option, ) { let directives = parse_logging_spec(spec); - LOGGER.with(|l| { - let logger = &mut (*l.borrow_mut()); + LOGGER.with(|logger| { + let mut logger = logger.borrow_mut(); if !logger.initialized { logger.set_directives(directives); logger.backend = backend; logger.access_backend = access_backend; + logger.access_log_format = access_log_format.unwrap_or(AccessLogFormat::Ascii); logger.tag = tag; logger.pid = unsafe { libc::getpid() }; logger.initialized = true; @@ -83,106 +118,124 @@ impl Logger { }); } - pub fn log(&mut self, meta: &Metadata, args: Arguments) { - if self.enabled(meta) { - match self.backend { - LoggerBackend::Stdout(ref mut stdout) => { - let _ = stdout.write_fmt(args); - } - //FIXME: should have a buffer to write to instead of allocating a string - LoggerBackend::Unix(ref mut socket) => { - let _ = socket.send(format(args).as_bytes()).map_err(|e| { - println!("cannot write logs to Unix socket: {e:?}"); - }); - } - //FIXME: should have a buffer to write to instead of allocating a string - LoggerBackend::Udp(ref mut socket, ref address) => { - let _ = socket - .send_to(format(args).as_bytes(), address) - .map_err(|e| { - println!("cannot write logs to UDP socket: {e:?}"); - }); - } - LoggerBackend::Tcp(ref mut socket) => { - let _ = socket.write_fmt(args).map_err(|e| { - println!("cannot write logs to TCP socket: {e:?}"); - }); - } - LoggerBackend::File(ref mut file) => { - let _ = file.write_fmt(args).map_err(|e| { - println!("cannot write logs to file: {e:?}"); - }); - } + pub fn split(&mut self) -> (i32, &str, &mut InnerLogger) { + (self.pid, &self.tag, &mut self.inner) + } +} + +trait LoggerBuffer { + fn fmt Result>( + &mut self, + args: Arguments, + flush: F, + ) -> Result<(), IoError>; +} +impl LoggerBuffer for Vec { + fn fmt Result>( + &mut self, + args: Arguments, + flush: F, + ) -> Result<(), IoError> { + self.clear(); + self.write_fmt(args)?; + flush(self.as_slice())?; + Ok(()) + } +} + +impl InnerLogger { + pub fn log(&mut self, args: Arguments) { + let io_result = match &mut self.backend { + LoggerBackend::Stdout(stdout) => { + let _ = stdout.write_fmt(args); + return; } + LoggerBackend::Tcp(socket) => socket.write_fmt(args), + LoggerBackend::File(file) => file.write_fmt(args), + LoggerBackend::Unix(socket) => self.buffer.fmt(args, |bytes| socket.send(bytes)), + LoggerBackend::Udp(sock, addr) => self.buffer.fmt(args, |b| sock.send_to(b, *addr)), + }; + + if let Err(e) = io_result { + println!( + "Cannot write access log to {}: {e:?}", + self.backend.as_ref() + ); } } - pub fn log_access(&mut self, meta: &Metadata, args: Arguments) { - if self.enabled(meta) { - let backend = self.access_backend.as_mut().unwrap_or(&mut self.backend); - match *backend { - LoggerBackend::Stdout(ref mut stdout) => { - let _ = stdout.write_fmt(args); - } - //FIXME: should have a buffer to write to instead of allocating a string - LoggerBackend::Unix(ref mut socket) => { - let _ = socket.send(format(args).as_bytes()).map_err(|e| { - println!("cannot write logs to Unix socket: {e:?}"); - }); - } - //FIXME: should have a buffer to write to instead of allocating a string - LoggerBackend::Udp(ref mut socket, ref address) => { - let _ = socket - .send_to(format(args).as_bytes(), address) - .map_err(|e| { - println!("cannot write logs to UDP socket: {e:?}"); - }); - } - LoggerBackend::Tcp(ref mut socket) => { - let _ = socket.write_fmt(args).map_err(|e| { - println!("cannot write logs to TCP socket: {e:?}"); - }); + pub fn log_access(&mut self, pid: i32, tag: &str, level_tag: &str, log: RequestRecord) { + let (now, precise_time) = now(); + let backend = self.access_backend.as_mut().unwrap_or(&mut self.backend); + + let io_result = match self.access_log_format { + AccessLogFormat::Binary => { + let binary_log = unsafe { log.into_binary_access_log(precise_time, tag) }; + // println!("binary_log length: {:?}", binary_log); + let log_length = binary_log.encoded_len(); + let total_length = log_length + encoded_len_varint(log_length as u64); + self.buffer.clear(); + if self.buffer.capacity() < total_length { + self.buffer.reserve(total_length - self.buffer.capacity()); } - LoggerBackend::File(ref mut file) => { - let _ = file.write_fmt(args).map_err(|e| { - println!("cannot write logs to file: {e:?}"); - }); + + if let Err(e) = binary_log.encode_length_delimited(&mut self.buffer) { + Err(IoError::new(IoErrorKind::InvalidData, e)) + } else { + // println!("length: {}, {:02X?}", &self.buffer.len(), &self.buffer[..]); + let bytes = &self.buffer; + match backend { + LoggerBackend::Stdout(stdout) => { + let _ = stdout.write(bytes); + return; + } + LoggerBackend::Tcp(socket) => socket.write(bytes), + LoggerBackend::File(file) => file.write(bytes), + LoggerBackend::Unix(socket) => socket.send(bytes), + LoggerBackend::Udp(socket, address) => socket.send_to(bytes, *address), + } + .map(|_| ()) } } + AccessLogFormat::Ascii => bind_format_args! { + let args = ("{now} {precise_time} {pid} {tag} {level_tag} {log}"); + // TODO: delete or make it trace + println!("ascii access log length: {}", format!("{}", args).len()); + match backend { + LoggerBackend::Stdout(stdout) => { + let _ = stdout.write_fmt(args); + return Ok(()); + } + LoggerBackend::Tcp(socket) => socket.write_fmt(args), + LoggerBackend::File(file) => file.write_fmt(args), + LoggerBackend::Unix(socket) => self.buffer.fmt(args, |b| socket.send(b)), + LoggerBackend::Udp(sock, addr) => self.buffer.fmt(args, |b| sock.send_to(b, *addr)), + } + }, + }; + + if let Err(e) = io_result { + println!("Cannot write access log to {}: {:?}", backend.as_ref(), e); } } - pub fn compat_log(&mut self, meta: &log::Metadata, args: Arguments) { - if self.compat_enabled(meta) { - match self.backend { - LoggerBackend::Stdout(ref mut stdout) => { - let _ = stdout.write_fmt(args); - } - //FIXME: should have a buffer to write to instead of allocating a string - LoggerBackend::Unix(ref mut socket) => { - let _ = socket.send(format(args).as_bytes()).map_err(|e| { - println!("cannot write logs to Unix socket: {e:?}"); - }); - } - //FIXME: should have a buffer to write to instead of allocating a string - LoggerBackend::Udp(ref mut socket, ref address) => { - let _ = socket - .send_to(format(args).as_bytes(), address) - .map_err(|e| { - println!("cannot write logs to UDP socket: {e:?}"); - }); - } - LoggerBackend::Tcp(ref mut socket) => { - let _ = socket.write_fmt(args).map_err(|e| { - println!("cannot write logs to TCP socket: {e:?}"); - }); - } - LoggerBackend::File(ref mut file) => { - let _ = file.write_fmt(args).map_err(|e| { - println!("cannot write logs to file: {e:?}"); - }); - } + pub fn compat_log(&mut self, args: Arguments) { + let io_result = match &mut self.backend { + LoggerBackend::Stdout(stdout) => { + let _ = stdout.write_fmt(args); + return; } + LoggerBackend::Tcp(socket) => socket.write_fmt(args), + LoggerBackend::File(file) => file.write_fmt(args), + LoggerBackend::Unix(socket) => self.buffer.fmt(args, |b| socket.send(b)), + LoggerBackend::Udp(sock, addr) => self.buffer.fmt(args, |b| sock.send_to(b, *addr)), + }; + + if let Err(e) = io_result { + println!( + "Cannot write access log to {}: {e:?}", + self.backend.as_ref() + ); } } @@ -190,11 +243,11 @@ impl Logger { self.directives = directives; } - fn enabled(&self, meta: &Metadata) -> bool { + pub fn enabled(&self, meta: Metadata) -> bool { // Search for the longest match, the vector is assumed to be pre-sorted. for directive in self.directives.iter().rev() { match directive.name { - Some(ref name) if !meta.target.starts_with(&**name) => {} + Some(ref name) if !meta.target.starts_with(name) => {} Some(..) | None => return meta.level <= directive.level, } } @@ -205,11 +258,8 @@ impl Logger { // Search for the longest match, the vector is assumed to be pre-sorted. for directive in self.directives.iter().rev() { match directive.name { - Some(ref name) if !meta.target().starts_with(&**name) => {} - Some(..) | None => { - let lvl: LogLevel = meta.level().into(); - return lvl <= directive.level; - } + Some(ref name) if !meta.target().starts_with(name) => {} + Some(..) | None => return Into::::into(meta.level()) <= directive.level, } } false @@ -374,13 +424,11 @@ impl Ord for LogLevelFilter { impl FromStr for LogLevelFilter { type Err = (); fn from_str(level: &str) -> Result { - ok_or( - LOG_LEVEL_NAMES - .iter() - .position(|&name| name.eq_ignore_ascii_case(level)) - .map(|p| LogLevelFilter::from_usize(p).unwrap()), - (), - ) + LOG_LEVEL_NAMES + .iter() + .position(|&name| name.eq_ignore_ascii_case(level)) + .map(|p| LogLevelFilter::from_usize(p).unwrap()) + .ok_or(()) } } @@ -424,13 +472,6 @@ pub struct LogDirective { level: LogLevelFilter, } -fn ok_or(t: Option, e: E) -> Result { - match t { - Some(t) => Ok(t), - None => Err(e), - } -} - pub fn parse_logging_spec(spec: &str) -> Vec { let mut dirs = Vec::new(); @@ -494,6 +535,7 @@ pub fn setup_logging_with_config(config: &Config, tag: &str) { setup_logging( &config.log_target, config.log_access_target.as_deref(), + config.log_access_format.clone(), &config.log_level, tag, ) @@ -506,6 +548,7 @@ pub fn setup_logging_with_config(config: &Config, tag: &str) { pub fn setup_logging( log_target: &str, log_access_target: Option<&str>, + log_access_format: Option, log_level: &str, tag: &str, ) { @@ -513,9 +556,21 @@ pub fn setup_logging( let access_backend = log_access_target.map(target_to_backend); if let Ok(env_log_level) = env::var("RUST_LOG") { - Logger::init(tag.to_string(), &env_log_level, backend, access_backend); + Logger::init( + tag.to_string(), + &env_log_level, + backend, + access_backend, + log_access_format, + ); } else { - Logger::init(tag.to_string(), log_level, backend, access_backend); + Logger::init( + tag.to_string(), + log_level, + backend, + access_backend, + log_access_format, + ); } } @@ -575,183 +630,146 @@ pub fn target_to_backend(target: &str) -> LoggerBackend { } } +#[macro_export] +macro_rules! bind_format_args { + (let $args: ident = ($($f:tt)+); $($t:tt)*) => { + (|$args| { $($t)* })(format_args!($($f)+)) + }; +} + /// write a log with the custom logger (used in other macros, do not use directly) #[macro_export] macro_rules! log { - (__inner__ $target:expr, $lvl:expr, $format:expr, $level_tag:expr, - [$($transformed_args:ident),*], [$first_ident:ident $(, $other_idents:ident)*], $first_arg:expr $(, $other_args:expr)*) => ({ - let $first_ident = &$first_arg; - log!(__inner__ $target, $lvl, $format, $level_tag, [$($transformed_args,)* $first_ident], [$($other_idents),*] $(, $other_args)*); - }); - - (__inner__ $target:expr, $lvl:expr, $format:expr, $level_tag:expr, - [$($final_args:ident),*], [$($idents:ident),*]) => ({ - static _META: $crate::logging::Metadata = $crate::logging::Metadata { - level: $lvl, - target: module_path!(), - }; - { - $crate::logging::TAG.with(|tag| { - //let tag = t.borrow().tag; - $crate::logging::LOGGER.with(|l| { - let pid = l.borrow().pid; - + ($lvl:expr, $format:expr, $level_tag:expr $(, $args:expr)*) => { + log!(@ module_path!(), $lvl, $format, $level_tag, [] $(, $args)*) + }; + (@ $target:expr, $lvl:expr, $format:expr, $level_tag:expr, + [$($ref_args:ident),*], $first_arg:expr $(, $other_args:expr)*) => {{ + let reg_arg = &$first_arg; + log!(@ $target, $lvl, $format, $level_tag, [$($ref_args,)* reg_arg] $(, $other_args)*); + }}; + (@ $target:expr, $lvl:expr, $format:expr, $level_tag:expr, [$($final_args:ident),*]) => {{ + $crate::logging::LOGGER.with(|logger| { + let mut logger = logger.borrow_mut(); + if !logger.enabled($crate::logging::Metadata { + level: $lvl, + target: module_path!(), + }) { return; } + let (pid, tag, inner) = logger.split(); let (now, precise_time) = $crate::logging::now(); - l.borrow_mut().log( - &_META, - format_args!( + inner.log( + format_args!( concat!("{} {} {} {} {}\t", $format, '\n'), now, precise_time, pid, tag, $level_tag $(, $final_args)*) ); - }) - }); - } - }); - ($lvl:expr, $format:expr, $level_tag:expr $(, $args:expr)+) => { - log!(__inner__ module_path!(), $lvl, $format, $level_tag, [], [a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v] - $(, $args)+) - }; - ($lvl:expr, $format:expr, $level_tag:expr) => { - log!(__inner__ module_path!(), $lvl, $format, $level_tag, [], [a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v]) - }; + }) + }}; + } /// log a failure concerning an HTTP or TCP request #[macro_export] macro_rules! log_access { - (__inner__ $target:expr, $lvl:expr, $format:expr, $level_tag:expr, - [$($transformed_args:ident),*], [$first_ident:ident $(, $other_idents:ident)*], $first_arg:expr $(, $other_args:expr)*) => ({ - let $first_ident = &$first_arg; - log_access!(__inner__ $target, $lvl, $format, $level_tag, [$($transformed_args,)* $first_ident], [$($other_idents),*] $(, $other_args)*); - }); - - (__inner__ $target:expr, $lvl:expr, $format:expr, $level_tag:expr, - [$($final_args:ident),*], [$($idents:ident),*]) => ({ - static _META: $crate::logging::Metadata = $crate::logging::Metadata { - level: $lvl, - target: module_path!(), - }; - { - $crate::logging::TAG.with(|tag| { - //let tag = t.borrow().tag; - $crate::logging::LOGGER.with(|l| { - let pid = l.borrow().pid; + ($lvl:expr, $level_tag:expr, $request_record:expr) => {{ + $crate::logging::LOGGER.with(|logger| { + let mut logger = logger.borrow_mut(); + if !logger.enabled($crate::logging::Metadata { + level: $lvl, + target: module_path!(), + }) { + return; + } + let (pid, tag, inner) = logger.split(); + inner.log_access(pid, tag, $level_tag, $request_record); + }) + }}; +} - let (now, precise_time) = $crate::logging::now(); - l.borrow_mut().log_access( - &_META, - format_args!( - concat!("{} {} {} {} {}\t", $format, '\n'), - now, precise_time, pid, tag, - $level_tag $(, $final_args)*) - ); - }) - }); - } - }); - ($lvl:expr, $format:expr, $level_tag:expr $(, $args:expr)+) => { - log_access!(__inner__ module_path!(), $lvl, $format, $level_tag, [], [a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v] - $(, $args)+) - }; - ($lvl:expr, $format:expr, $level_tag:expr) => { - log_access!(__inner__ module_path!(), $lvl, $format, $level_tag, [], [a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v]) +#[macro_export] +macro_rules! unwrap_or { + ({} {$($default:tt)*}) => { + $($default)* }; + ({$($value:tt)*} {$($default:tt)*}) => { + $($value)* + } } -/// log an error with Sōzu's custom log stack #[macro_export] -macro_rules! error { - ($format:expr, $($arg:tt)*) => { - log!($crate::logging::LogLevel::Error, $format, "ERROR", $($arg)*) - }; - ($format:expr) => { - log!($crate::logging::LogLevel::Error, $format, "ERROR") +macro_rules! structured_access_log { + ($($k:ident $(: $v:expr)?),* $(,)?) => { + $crate::logging::RequestRecord {$( + $k: &unwrap_or!({$($v)?} {$k}), + )*} }; } /// log a failure concerning an HTTP or TCP request #[macro_export] macro_rules! error_access { - ($format:expr, $($arg:tt)*) => { - log_access!($crate::logging::LogLevel::Error, $format, "ERROR", $($arg)*); + ($($request_record_fields:tt)*) => { + log_access!($crate::logging::LogLevel::Error, "ERROR", structured_access_log!($($request_record_fields)*)); }; - ($format:expr) => { - log_access!($crate::logging::LogLevel::Error, $format, "ERROR"); +} + +/// log the success of an HTTP or TCP request +#[macro_export] +macro_rules! info_access { + ($($request_record_fields:tt)*) => { + log_access!($crate::logging::LogLevel::Info, "INFO", structured_access_log!($($request_record_fields)*)); }; } -/// log a warning with Sōzu’s custom log stack +/// log an error with Sōzu's custom log stack #[macro_export] -macro_rules! warn { - ($format:expr, $($arg:tt)*) => { - log!($crate::logging::LogLevel::Warn, $format, "WARN", $($arg)*); +macro_rules! error { + ($format:expr $(, $args:expr)* $(,)?) => { + log!($crate::logging::LogLevel::Error, $format, "ERROR" $(, $args)*) }; - ($format:expr) => { - log!($crate::logging::LogLevel::Warn, $format, "WARN"); - } } -/// log an info with Sōzu’s custom log stack +/// log a warning with Sōzu’s custom log stack #[macro_export] -macro_rules! info { - ($format:expr, $($arg:tt)*) => { - log!($crate::logging::LogLevel::Info, $format, "INFO", $($arg)*); +macro_rules! warn { + ($format:expr $(, $args:expr)* $(,)?) => { + log!($crate::logging::LogLevel::Warn, $format, "WARN" $(, $args)*); }; - ($format:expr) => { - log!($crate::logging::LogLevel::Info, $format, "INFO"); - } } -/// log the success of an HTTP or TCP request +/// log an info with Sōzu’s custom log stack #[macro_export] -macro_rules! info_access { - ($format:expr, $($arg:tt)*) => { - log_access!($crate::logging::LogLevel::Info, $format, "INFO", $($arg)*); +macro_rules! info { + ($format:expr $(, $args:expr)* $(,)?) => { + log!($crate::logging::LogLevel::Info, $format, "INFO" $(, $args)*); }; - ($format:expr) => { - log_access!($crate::logging::LogLevel::Info, $format, "INFO"); - } } /// log a debug with Sōzu’s custom log stack #[macro_export] macro_rules! debug { - ($format:expr, $($arg:tt)*) => { + ($format:expr $(, $args:expr)* $(,)?) => { #[cfg(any(debug_assertions, feature = "logs-debug", feature = "logs-trace"))] log!($crate::logging::LogLevel::Debug, concat!("{}\t", $format), - "DEBUG", {module_path!()}, $($arg)*); + "DEBUG", module_path!() $(, $args)*); }; - ($format:expr) => { - #[cfg(any(debug_assertions, feature = "logs-debug", feature = "logs-trace"))] - log!($crate::logging::LogLevel::Debug, concat!("{}\t", $format), - "DEBUG", {module_path!()}); - } } /// log a trace with Sōzu’s custom log stack #[macro_export] macro_rules! trace { - ($format:expr, $($arg:tt)*) => ( + ($format:expr $(, $args:expr)* $(,)?) => ( #[cfg(any(debug_assertions, feature = "logs-trace"))] log!($crate::logging::LogLevel::Trace, concat!("{}\t", $format), - "TRACE", module_path!(), $($arg)*); + "TRACE", module_path!() $(, $args)*); ); - ($format:expr) => ( - #[cfg(any(debug_assertions, feature = "logs-trace"))] - log!($crate::logging::LogLevel::Trace, concat!("{}\t", $format), - "TRACE", module_path!()); - ) } /// write a log with a "FIXME" prefix on an info level #[macro_export] macro_rules! fixme { - () => { - log!($crate::logging::LogLevel::Info, "FIXME: {}:{} in {}", "INFO", file!(), line!(), module_path!()); - }; - ($($arg:tt)*) => { - log!($crate::logging::LogLevel::Info, "FIXME: {}:{} in {}: {}", "INFO", file!(), line!(), module_path!(), $($arg)*); + ($(, $args:expr)* $(,)?) => { + log!($crate::logging::LogLevel::Info, "FIXME: {}:{} in {}: {}", "INFO", file!(), line!(), module_path!() $(, $args)*); }; } @@ -775,24 +793,23 @@ impl log::Log for CompatLogger { } fn log(&self, record: &log::Record) { - TAG.with(|tag| { - LOGGER.with(|l| { - let pid = l.borrow().pid; - let (now, precise_time) = now(); - l.borrow_mut().compat_log( - record.metadata(), - format_args!( - concat!("{} {} {} {} {}\t{}\n"), - now, - precise_time, - pid, - tag, - record.level(), - record.args() - ), - ); - }) - }); + LOGGER.with(|l| { + let mut l = l.borrow_mut(); + if !l.compat_enabled(record.metadata()) { + return; + } + let (pid, tag, inner) = l.split(); + let (now, precise_time) = now(); + inner.compat_log(format_args!( + concat!("{} {} {} {} {}\t{}\n"), + now, + precise_time, + pid, + tag, + record.level(), + record.args() + )); + }) } fn flush(&self) {} @@ -807,6 +824,7 @@ macro_rules! setup_test_logger { "error", $crate::logging::LoggerBackend::Stdout(::std::io::stdout()), None, + None, ); }; } diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index b123a0437..5168dca47 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -1,6 +1,6 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, - fmt::{Display, Formatter}, + fmt::{self, Display, Formatter}, net::SocketAddr, }; @@ -8,18 +8,23 @@ use prettytable::{cell, row, Row, Table}; use time::format_description; use x509_parser::time::ASN1Time; -use crate::proto::{ - command::{ - filtered_metrics, request::RequestType, response_content::ContentType, AggregatedMetrics, - AvailableMetrics, CertificateAndKey, CertificateSummary, CertificatesWithFingerprints, - ClusterMetrics, FilteredMetrics, ListOfCertificatesByAddress, ListedFrontends, - ListenersList, QueryCertificatesFilters, RequestCounts, Response, ResponseContent, - ResponseStatus, RunState, TlsVersion, WorkerInfos, WorkerMetrics, WorkerResponses, +use crate::{ + access_logs::{prepare_user_agent, EndpointRecord, LogContext, LogDuration, RequestRecord}, + logging::LoggerBackend, + proto::{ + command::{ + filtered_metrics, request::RequestType, response_content::ContentType, + AggregatedMetrics, AvailableMetrics, CertificateAndKey, CertificateSummary, + CertificatesWithFingerprints, ClusterMetrics, FilteredMetrics, + ListOfCertificatesByAddress, ListedFrontends, ListenersList, QueryCertificatesFilters, + RequestCounts, Response, ResponseContent, ResponseStatus, RunState, TlsVersion, + WorkerInfos, WorkerMetrics, WorkerResponses, + }, + DisplayError, }, - DisplayError, }; -use super::command::SocketAddress; +use super::command::{binary_endpoint, BinaryEndpoint, HttpEndpoint, SocketAddress, TcpEndpoint}; impl Display for CertificateAndKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -949,3 +954,169 @@ impl Display for SocketAddress { write!(f, "{}", SocketAddr::from(self.clone())) } } + +impl Display for BinaryEndpoint { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match &self.inner { + Some(binary_endpoint::Inner::Http(HttpEndpoint { + method, + authority, + path, + status, + .. + })) => write!( + f, + "{} {} {} -> {}", + authority.as_string_or("-"), + method.as_string_or("-"), + path.as_string_or("-"), + status.as_string_or("-"), + ), + Some(binary_endpoint::Inner::Tcp(TcpEndpoint { context })) => { + write!(f, "{}", context.as_string_or("-")) + } + None => Ok(()), + } + } +} + +pub trait AsString { + fn as_string_or(&self, default: &'static str) -> String; +} + +impl AsString for Option { + fn as_string_or(&self, default: &'static str) -> String { + match self { + None => default.to_string(), + Some(t) => t.to_string(), + } + } +} + +pub trait AsStr { + fn as_str_or(&self, default: &'static str) -> &str; +} + +impl> AsStr for Option { + fn as_str_or(&self, default: &'static str) -> &str { + match self { + None => default, + Some(s) => s.as_ref(), + } + } +} + +impl AsRef for LoggerBackend { + fn as_ref(&self) -> &str { + match self { + LoggerBackend::Stdout(_) => "stdout", + LoggerBackend::Unix(_) => "UNIX socket", + LoggerBackend::Udp(_, _) => "UDP socket", + LoggerBackend::Tcp(_) => "TCP socket", + LoggerBackend::File(_) => "file", + } + } +} + +impl Display for LogDuration { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + None => write!(f, "-"), + Some(duration) => { + let secs = duration.whole_seconds(); + if secs >= 10 { + return write!(f, "{secs}s"); + } + + let ms = duration.whole_milliseconds(); + if ms < 10 { + let us = duration.whole_microseconds(); + if us >= 10 { + return write!(f, "{us}μs"); + } + + let ns = duration.whole_nanoseconds(); + return write!(f, "{ns}ns"); + } + + write!(f, "{ms}ms") + } + } + } +} + +impl Display for LogContext<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{} {} {}", + self.request_id, + self.cluster_id.unwrap_or("-"), + self.backend_id.unwrap_or("-") + ) + } +} + +impl Display for EndpointRecord<'_> { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + Self::Http { + method, + authority, + path, + status, + .. + } => write!( + f, + "{} {} {} -> {}", + authority.as_str_or("-"), + method.as_str_or("-"), + path.as_str_or("-"), + status.as_string_or("-"), + ), + Self::Tcp { context } => { + write!(f, "{}", context.as_str_or("-")) + } + } + } +} + +impl Display for RequestRecord<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let (tags, ua_separator, user_agent) = match (self.tags, &self.user_agent) { + (None, None) => ("-", "", String::new()), + (Some(tags), None) => (tags.concatenated.as_str(), "", String::new()), + (None, Some(ua)) => ("", "user-agent=", prepare_user_agent(ua)), + (Some(tags), Some(ua)) => ( + tags.concatenated.as_str(), + ", user-agent=", + prepare_user_agent(ua), + ), + }; + + write!( + f, + "{} \t{} -> {} \t{}/{}/{}/{} \t{} -> {} \t {}{}{} {} {}", + self.context, + self.session_address.as_string_or("X"), + self.backend_address.as_string_or("X"), + LogDuration(Some(*self.response_time)), + LogDuration(Some(*self.service_time)), + LogDuration(*self.client_rtt), + LogDuration(*self.server_rtt), + self.bytes_in, + self.bytes_out, + tags, + ua_separator, + user_agent, + self.protocol, + self.endpoint + )?; + + if let Some(message) = &self.error { + writeln!(f, " | {}", message) + } else { + writeln!(f) + } + } +} diff --git a/command/src/request.rs b/command/src/request.rs index 234d41f50..d29a05cbb 100644 --- a/command/src/request.rs +++ b/command/src/request.rs @@ -324,3 +324,9 @@ impl From for Uint128 { Uint128 { low, high } } } + +impl From for Uint128 { + fn from(value: i128) -> Self { + Uint128::from(value as u128) + } +} diff --git a/command/state.json b/command/state.json new file mode 100644 index 000000000..e69de29bb diff --git a/e2e/src/sozu/worker.rs b/e2e/src/sozu/worker.rs index cf16eea5a..54ee43470 100644 --- a/e2e/src/sozu/worker.rs +++ b/e2e/src/sozu/worker.rs @@ -147,7 +147,7 @@ impl Worker { println!("Setting up logging"); let server_job = thread::spawn(move || { - setup_logging("stdout", None, "error", &thread_name); + setup_logging("stdout", None, None, "error", &thread_name); let mut server = Server::try_new_from_config( cmd_worker_to_main, thread_scm_worker_to_main, diff --git a/e2e/src/tests/tests.rs b/e2e/src/tests/tests.rs index 36f2a171f..2b73528d2 100644 --- a/e2e/src/tests/tests.rs +++ b/e2e/src/tests/tests.rs @@ -631,7 +631,7 @@ pub fn try_hard_or_soft_stop(soft: bool) -> State { } fn try_http_behaviors() -> State { - setup_logging("stdout", None, "debug", "BEHAVE-OUT"); + setup_logging("stdout", None, None, "debug", "BEHAVE-OUT"); info!("starting up"); @@ -1069,7 +1069,7 @@ pub fn try_blue_geen() -> State { } pub fn try_keep_alive() -> State { - setup_logging("stdout", None, "debug", "KA-OUT"); + setup_logging("stdout", None, None, "debug", "KA-OUT"); let front_address = create_local_address(); diff --git a/lib/Cargo.toml b/lib/Cargo.toml index de43bdc15..e43a90611 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -51,7 +51,6 @@ socket2 = { version = "^0.5.5", features = ["all"] } thiserror = "^1.0.56" time = "^0.3.31" once_cell = "1.19.0" - sozu-command-lib = { path = "../command", version = "^0.15.19" } [dev-dependencies] diff --git a/lib/examples/http.rs b/lib/examples/http.rs index 81b2e77b1..224b5c20b 100644 --- a/lib/examples/http.rs +++ b/lib/examples/http.rs @@ -21,7 +21,7 @@ use sozu_command_lib::{ }; fn main() -> anyhow::Result<()> { - setup_logging("stdout", None, "info", "EXAMPLE"); + setup_logging("stdout", None, None, "info", "EXAMPLE"); info!("starting up"); diff --git a/lib/examples/https.rs b/lib/examples/https.rs index e0939e210..843349a9a 100644 --- a/lib/examples/https.rs +++ b/lib/examples/https.rs @@ -21,7 +21,7 @@ use sozu_command_lib::{ }; fn main() -> anyhow::Result<()> { - setup_logging("stdout", None, "info", "EXAMPLE"); + setup_logging("stdout", None, None, "info", "EXAMPLE"); info!("MAIN\tstarting up"); diff --git a/lib/examples/tcp.rs b/lib/examples/tcp.rs index fc255bd8c..227ed6045 100644 --- a/lib/examples/tcp.rs +++ b/lib/examples/tcp.rs @@ -18,7 +18,7 @@ use sozu_command_lib::{ }; fn main() -> anyhow::Result<()> { - setup_logging("stdout", None, "info", "EXAMPLE"); + setup_logging("stdout", None, None, "info", "EXAMPLE"); info!("starting up"); @@ -33,7 +33,7 @@ fn main() -> anyhow::Result<()> { address: SocketAddress::new_v4(127, 0, 0, 1, 8080), ..Default::default() }; - setup_logging("stdout", None, "debug", "TCP"); + setup_logging("stdout", None, None, "debug", "TCP"); sozu_lib::tcp::testing::start_tcp_worker(listener, max_buffers, buffer_size, channel); }); diff --git a/lib/src/http.rs b/lib/src/http.rs index 0b64695ce..dba8afb4a 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -17,7 +17,7 @@ use rusty_ulid::Ulid; use time::{Duration, Instant}; use sozu_command::{ - logging, + logging::{self, CachedTags}, proto::command::{ request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend, @@ -43,7 +43,7 @@ use crate::{ server::{ListenToken, SessionManager}, socket::server_bind, timer::TimeoutContainer, - AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, + AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder, StateResult, }; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 7d3768411..5f6e8de8d 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -233,7 +233,7 @@ //! }; //! //! fn main() -> anyhow::Result<()> { -//! setup_logging("stdout", None, "info", "EXAMPLE"); +//! setup_logging("stdout", None, None, "info", "EXAMPLE"); //! //! info!("starting up"); //! @@ -321,7 +321,6 @@ extern crate quickcheck; pub mod util; #[macro_use] pub mod metrics; -mod logs; pub mod backends; pub mod features; @@ -361,7 +360,11 @@ use time::{Duration, Instant}; use tls::CertificateResolverError; use sozu_command::{ - proto::command::{Cluster, ListenerType, RequestHttpFrontend}, + logging::{CachedTags, LogContext}, + proto::{ + command::{Cluster, ListenerType, RequestHttpFrontend}, + display::AsStr, + }, ready::Ready, request::WorkerRequest, response::WorkerResponse, @@ -528,21 +531,6 @@ macro_rules! StateMachineBuilder { } } -pub struct CachedTags { - pub tags: BTreeMap, - pub concatenated: String, -} -impl CachedTags { - fn new(tags: BTreeMap) -> Self { - let concatenated = tags - .iter() - .map(|(k, v)| format!("{k}={v}")) - .collect::>() - .join(", "); - Self { tags, concatenated } - } -} - pub trait ListenerHandler { fn get_addr(&self) -> &SocketAddr; @@ -1051,6 +1039,41 @@ impl SessionMetrics { _ => None, } } + + pub fn register_end_of_session(&self, context: &LogContext) { + let response_time = self.response_time(); + let service_time = self.service_time(); + + if let Some(cluster_id) = context.cluster_id { + time!( + "response_time", + cluster_id, + response_time.whole_milliseconds() + ); + time!( + "service_time", + cluster_id, + service_time.whole_milliseconds() + ); + } + time!("response_time", response_time.whole_milliseconds()); + time!("service_time", service_time.whole_milliseconds()); + + if let Some(backend_id) = self.backend_id.as_ref() { + if let Some(backend_response_time) = self.backend_response_time() { + record_backend_metrics!( + context.cluster_id.as_str_or("-"), + backend_id, + backend_response_time.whole_milliseconds(), + self.backend_connection_time(), + self.backend_bin, + self.backend_bout + ); + } + } + + incr!("access_logs.count", context.cluster_id, context.backend_id); + } } /// exponentially weighted moving average with high sensibility to latency bursts diff --git a/lib/src/logs.rs b/lib/src/logs.rs deleted file mode 100644 index f28bfce04..000000000 --- a/lib/src/logs.rs +++ /dev/null @@ -1,271 +0,0 @@ -use std::{fmt, net::SocketAddr}; - -use rusty_ulid::Ulid; -use time::Duration; - -use crate::{protocol::http::parser::Method, SessionMetrics}; - -#[derive(Debug)] -pub struct LogContext<'a> { - pub request_id: Ulid, - pub cluster_id: Option<&'a str>, - pub backend_id: Option<&'a str>, -} - -impl fmt::Display for LogContext<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "{} {} {} \t", - self.request_id, - self.cluster_id.unwrap_or("-"), - self.backend_id.unwrap_or("-") - ) - } -} - -pub trait AsStr { - fn as_str_or(&self, default: &'static str) -> &str; -} -pub trait AsString { - fn as_str_or(&self, default: &'static str) -> String; -} - -impl AsStr for Option<&str> { - fn as_str_or(&self, default: &'static str) -> &str { - match self { - None => default, - Some(s) => s, - } - } -} -impl AsString for Option { - fn as_str_or(&self, default: &'static str) -> String { - match self { - None => default.to_string(), - Some(s) => s.to_string(), - } - } -} -impl AsString for Option { - fn as_str_or(&self, default: &'static str) -> String { - match self { - None => default.to_string(), - Some(s) => s.to_string(), - } - } -} -impl AsString for Option<&Method> { - fn as_str_or(&self, default: &'static str) -> String { - match self { - None => default.to_string(), - Some(s) => s.to_string(), - } - } -} - -pub struct LogDuration(pub Option); - -impl fmt::Display for LogDuration { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.0 { - None => write!(f, "-"), - Some(duration) => { - let secs = duration.whole_seconds(); - if secs >= 10 { - return write!(f, "{secs}s"); - } - - let ms = duration.whole_milliseconds(); - if ms < 10 { - let us = duration.whole_microseconds(); - if us >= 10 { - return write!(f, "{us}μs"); - } - - let ns = duration.whole_nanoseconds(); - return write!(f, "{ns}ns"); - } - - write!(f, "{ms}ms") - } - } - } -} - -#[derive(Debug)] -pub enum Endpoint<'a> { - Http { - method: Option<&'a Method>, - authority: Option<&'a str>, - path: Option<&'a str>, - status: Option, - reason: Option<&'a str>, - }, - Tcp { - context: Option<&'a str>, - }, -} - -impl fmt::Display for Endpoint<'_> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Endpoint::Http { - authority, - method, - path, - status, - .. - } => write!( - f, - "{} {} {} -> {}", - authority.as_str_or("-"), - method.as_str_or("-"), - path.as_str_or("-"), - status.as_str_or("-"), - ), - Endpoint::Tcp { context } => write!(f, "{}", context.as_str_or("-")), - } - } -} - -#[derive(Debug)] -pub struct RequestRecord<'a> { - pub error: Option<&'a str>, - pub context: LogContext<'a>, - pub session_address: Option, - pub backend_address: Option, - pub protocol: &'a str, - pub endpoint: Endpoint<'a>, - pub tags: Option<&'a str>, - pub client_rtt: Option, - pub server_rtt: Option, - pub metrics: &'a SessionMetrics, - pub user_agent: Option, -} - -impl RequestRecord<'_> { - pub fn log(self) { - let context = &self.context; - let cluster_id = context.cluster_id; - let tags = self.tags; - - let protocol = self.protocol; - let session_address = self.session_address; - let backend_address = self.backend_address; - let endpoint = &self.endpoint; - let mut user_agent = self.user_agent; - - let metrics = self.metrics; - // let backend_response_time = metrics.backend_response_time(); - // let backend_connection_time = metrics.backend_connection_time(); - // let backend_bin = metrics.backend_bin; - // let backend_bout = metrics.backend_bout; - let response_time = metrics.response_time(); - let service_time = metrics.service_time(); - // let wait_time = metrics.wait_time; - let client_rtt = self.client_rtt; - let server_rtt = self.server_rtt; - - if let Some(cluster_id) = cluster_id { - time!( - "response_time", - cluster_id, - response_time.whole_milliseconds() - ); - time!( - "service_time", - cluster_id, - service_time.whole_milliseconds() - ); - } - time!("response_time", response_time.whole_milliseconds()); - time!("service_time", service_time.whole_milliseconds()); - - if let Some(backend_id) = metrics.backend_id.as_ref() { - if let Some(backend_response_time) = metrics.backend_response_time() { - record_backend_metrics!( - cluster_id.as_str_or("-"), - backend_id, - backend_response_time.whole_milliseconds(), - metrics.backend_connection_time(), - metrics.backend_bin, - metrics.backend_bout - ); - } - } - - let (tags, ua_sep, user_agent) = match (tags, &mut user_agent) { - (None, None) => ("-", "", ""), - (Some(tags), None) => (tags, "", ""), - (None, Some(ua)) => { - prepare_user_agent(ua); - ("", "user-agent=", ua.as_str()) - } - (Some(tags), Some(ua)) => { - prepare_user_agent(ua); - (tags, ", user-agent=", ua.as_str()) - } - }; - - match self.error { - None => { - info_access!( - "{}{} -> {} \t{}/{}/{}/{} \t{} -> {} \t {}{}{} {} {}", - context, - session_address.as_str_or("X"), - backend_address.as_str_or("X"), - LogDuration(Some(response_time)), - LogDuration(Some(service_time)), - LogDuration(client_rtt), - LogDuration(server_rtt), - metrics.bin, - metrics.bout, - tags, - ua_sep, - user_agent, - protocol, - endpoint - ); - incr!( - "access_logs.count", - self.context.cluster_id, - self.context.backend_id - ); - } - Some(message) => error_access!( - "{}{} -> {} \t{}/{}/{}/{} \t{} -> {} \t {}{}{} {} {} | {}", - context, - session_address.as_str_or("X"), - backend_address.as_str_or("X"), - LogDuration(Some(response_time)), - LogDuration(Some(service_time)), - LogDuration(client_rtt), - LogDuration(server_rtt), - metrics.bin, - metrics.bout, - tags, - ua_sep, - user_agent, - protocol, - endpoint, - message - ), - } - } -} - -fn prepare_user_agent(ua: &mut String) { - let mut ua_bytes = std::mem::take(ua).into_bytes(); - for c in &mut ua_bytes { - if *c == b' ' { - *c = b'_'; - } - } - if let Some(last) = ua_bytes.last_mut() { - if *last == b',' { - *last = b'!' - } - } - *ua = unsafe { String::from_utf8_unchecked(ua_bytes) }; -} diff --git a/lib/src/protocol/kawa_h1/mod.rs b/lib/src/protocol/kawa_h1/mod.rs index 50eac5b2c..f0447134e 100644 --- a/lib/src/protocol/kawa_h1/mod.rs +++ b/lib/src/protocol/kawa_h1/mod.rs @@ -14,13 +14,13 @@ use mio::{net::TcpStream, Interest, Token}; use rusty_ulid::Ulid; use sozu_command::{ config::MAX_LOOP_ITERATIONS, + logging::EndpointRecord, proto::command::{Event, EventKind, ListenerType}, }; use time::{Duration, Instant}; use crate::{ backends::{Backend, BackendError}, - logs::{Endpoint, LogContext, RequestRecord}, pool::{Checkout, Pool}, protocol::{ http::{editor::HttpContext, parser::Method}, @@ -30,7 +30,7 @@ use crate::{ router::Route, server::{push_event, CONN_RETRIES}, socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol}, - sozu_command::ready::Ready, + sozu_command::{logging::LogContext, ready::Ready}, timer::TimeoutContainer, AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness, @@ -762,6 +762,20 @@ impl Http EndpointRecord { + let status = match self.status { + SessionStatus::Normal => self.context.status, + SessionStatus::DefaultAnswer(answers, ..) => Some(answers.into()), + }; + EndpointRecord::Http { + method: self.context.method.as_deref(), + authority: self.context.authority.as_deref(), + path: self.context.path.as_deref(), + reason: self.context.reason.as_deref(), + status, + } + } + pub fn get_session_address(&self) -> Option { self.context .session_address @@ -798,16 +812,7 @@ impl Http String { - format!( - "{}", - Endpoint::Http { - method: self.context.method.as_ref(), - authority: self.context.authority.as_deref(), - path: self.context.path.as_deref(), - status: self.context.status, - reason: self.context.reason.as_deref(), - } - ) + format!("{}", self.log_endpoint()) } pub fn log_request(&mut self, metrics: &SessionMetrics, message: Option<&str>) { @@ -817,34 +822,28 @@ impl Http host, Some((hostname, _)) => hostname, }; - listener.get_concatenated_tags(hostname) + listener.get_tags(hostname) }); - let status = match self.status { - SessionStatus::Normal => self.context.status, - SessionStatus::DefaultAnswer(answers, ..) => Some(answers.into()), - }; - let user_agent = self.context.user_agent.take(); - RequestRecord { + let context = self.log_context(); + metrics.register_end_of_session(&context); + + info_access! { error: message, - context: self.log_context(), + context, session_address: self.get_session_address(), backend_address: self.get_backend_address(), protocol: self.protocol_string(), - endpoint: Endpoint::Http { - method: self.context.method.as_ref(), - authority: self.context.authority.as_deref(), - path: self.context.path.as_deref(), - status, - reason: self.context.reason.as_deref(), - }, + endpoint: self.log_endpoint(), tags, client_rtt: socket_rtt(self.front_socket()), server_rtt: self.backend_socket.as_ref().and_then(socket_rtt), - metrics, - user_agent, - } - .log(); + service_time: metrics.service_time(), + response_time: metrics.response_time(), + bytes_in: metrics.bin, + bytes_out: metrics.bout, + user_agent: self.context.user_agent.clone(), + }; } pub fn log_request_success(&mut self, metrics: &SessionMetrics) { diff --git a/lib/src/protocol/kawa_h1/parser.rs b/lib/src/protocol/kawa_h1/parser.rs index 82356a953..e55c9f79d 100644 --- a/lib/src/protocol/kawa_h1/parser.rs +++ b/lib/src/protocol/kawa_h1/parser.rs @@ -1,6 +1,7 @@ use std::{ cmp::min, fmt::{self, Write}, + ops::Deref, str::from_utf8_unchecked, }; @@ -64,22 +65,36 @@ impl Method { } } -impl fmt::Display for Method { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { +impl AsRef for Method { + fn as_ref(&self) -> &str { match self { - Method::Get => write!(f, "GET"), - Method::Post => write!(f, "POST"), - Method::Head => write!(f, "HEAD"), - Method::Options => write!(f, "OPTIONS"), - Method::Put => write!(f, "PUT"), - Method::Delete => write!(f, "DELETE"), - Method::Trace => write!(f, "TRACE"), - Method::Connect => write!(f, "CONNECT"), - Method::Custom(s) => write!(f, "{s}"), + Self::Get => "GET", + Self::Post => "POST", + Self::Head => "HEAD", + Self::Options => "OPTIONS", + Self::Put => "PUT", + Self::Delete => "DELETE", + Self::Trace => "TRACE", + Self::Connect => "CONNECT", + Self::Custom(s) => &s, } } } +impl Deref for Method { + type Target = str; + + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl fmt::Display for Method { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + #[cfg(feature = "tolerant-http1-parser")] fn is_hostname_char(i: u8) -> bool { is_alphanumeric(i) || diff --git a/lib/src/protocol/pipe.rs b/lib/src/protocol/pipe.rs index de60906e7..a8ec6d14a 100644 --- a/lib/src/protocol/pipe.rs +++ b/lib/src/protocol/pipe.rs @@ -2,11 +2,13 @@ use std::{cell::RefCell, net::SocketAddr, rc::Rc}; use mio::{net::TcpStream, Token}; use rusty_ulid::Ulid; -use sozu_command::config::MAX_LOOP_ITERATIONS; +use sozu_command::{ + config::MAX_LOOP_ITERATIONS, + logging::{EndpointRecord, LogContext}, +}; use crate::{ backends::Backend, - logs::{Endpoint, LogContext, RequestRecord}, pool::Checkout, protocol::SessionState, socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol}, @@ -198,22 +200,25 @@ impl Pipe { pub fn log_request(&self, metrics: &SessionMetrics, message: Option<&str>) { let listener = self.listener.borrow(); - RequestRecord { + let context = self.log_context(); + let endpoint = self.log_endpoint(); + metrics.register_end_of_session(&context); + info_access!( error: message, - context: self.log_context(), + context, session_address: self.get_session_address(), backend_address: self.get_backend_address(), protocol: self.protocol_string(), - endpoint: Endpoint::Tcp { - context: self.websocket_context.as_deref(), - }, - tags: listener.get_concatenated_tags(&listener.get_addr().to_string()), + endpoint, + tags: listener.get_tags(&listener.get_addr().to_string()), client_rtt: socket_rtt(self.front_socket()), server_rtt: self.backend_socket.as_ref().and_then(socket_rtt), - metrics, - user_agent: None, - } - .log(); + service_time: metrics.service_time(), + response_time: metrics.response_time(), + bytes_in: metrics.bin, + bytes_out: metrics.bout, + user_agent: None + ); } pub fn log_request_success(&self, metrics: &SessionMetrics) { @@ -615,6 +620,12 @@ impl Pipe { backend_id: self.backend_id.as_deref(), } } + + fn log_endpoint(&self) -> EndpointRecord { + EndpointRecord::Tcp { + context: self.websocket_context.as_deref(), + } + } } impl SessionState for Pipe { diff --git a/lib/src/protocol/proxy_protocol/expect.rs b/lib/src/protocol/proxy_protocol/expect.rs index 80f9aef7a..be8a9b5b2 100644 --- a/lib/src/protocol/proxy_protocol/expect.rs +++ b/lib/src/protocol/proxy_protocol/expect.rs @@ -3,10 +3,9 @@ use std::{cell::RefCell, rc::Rc}; use mio::{net::TcpStream, *}; use nom::{Err, HexDisplay}; use rusty_ulid::Ulid; -use sozu_command::config::MAX_LOOP_ITERATIONS; +use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext}; use crate::{ - logs::LogContext, pool::Checkout, protocol::{pipe::Pipe, SessionResult, SessionState}, socket::{SocketHandler, SocketResult}, diff --git a/lib/src/protocol/rustls.rs b/lib/src/protocol/rustls.rs index 5299cc5ce..561d0c646 100644 --- a/lib/src/protocol/rustls.rs +++ b/lib/src/protocol/rustls.rs @@ -3,11 +3,11 @@ use std::{cell::RefCell, io::ErrorKind, net::SocketAddr, rc::Rc}; use mio::{net::TcpStream, Token}; use rustls::ServerConnection; use rusty_ulid::Ulid; -use sozu_command::config::MAX_LOOP_ITERATIONS; +use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext}; use crate::{ - logs::LogContext, protocol::SessionState, timer::TimeoutContainer, Readiness, Ready, - SessionMetrics, SessionResult, StateResult, + protocol::SessionState, timer::TimeoutContainer, Readiness, Ready, SessionMetrics, + SessionResult, StateResult, }; pub enum TlsState { diff --git a/lib/src/socket.rs b/lib/src/socket.rs index 25e278211..8a5a38614 100644 --- a/lib/src/socket.rs +++ b/lib/src/socket.rs @@ -633,6 +633,9 @@ pub mod stats { let info = socket_info(fd); assert!(info.is_some()); println!("{:#?}", info); - println!("rtt: {}", crate::logs::LogDuration(socket_rtt(&sock))); + println!( + "rtt: {}", + sozu_command::logging::LogDuration(socket_rtt(&sock)) + ); } } diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index 1323c412c..77bdb7706 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -14,11 +14,15 @@ use mio::{ use rusty_ulid::Ulid; use time::{Duration, Instant}; -use sozu_command::{config::MAX_LOOP_ITERATIONS, proto::command::request::RequestType, ObjectKind}; +use sozu_command::{ + config::MAX_LOOP_ITERATIONS, + logging::{EndpointRecord, LogContext}, + proto::command::request::RequestType, + ObjectKind, +}; use crate::{ backends::{Backend, BackendMap}, - logs::{Endpoint, LogContext, RequestRecord}, pool::{Checkout, Pool}, protocol::{ proxy_protocol::{ @@ -192,20 +196,24 @@ impl TcpSession { fn log_request(&self) { let listener = self.listener.borrow(); - RequestRecord { + let context = self.log_context(); + self.metrics.register_end_of_session(&context); + info_access!( error: None, - context: self.log_context(), + context, session_address: self.frontend_address, backend_address: None, protocol: "TCP", - endpoint: Endpoint::Tcp { context: None }, - tags: listener.get_concatenated_tags(&listener.get_addr().to_string()), + endpoint: EndpointRecord::Tcp { context: None }, + tags: listener.get_tags(&listener.get_addr().to_string()), client_rtt: socket_rtt(self.state.front_socket()), server_rtt: None, - metrics: &self.metrics, user_agent: None, - } - .log(); + service_time: self.metrics.service_time(), + response_time: self.metrics.response_time(), + bytes_in: self.metrics.bin, + bytes_out: self.metrics.bout + ); } fn front_hup(&mut self) -> SessionResult {