Skip to content

Commit

Permalink
Extend server configuration with compression config (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex authored Nov 7, 2023
1 parent f6e0728 commit ab5f3c9
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 20 deletions.
27 changes: 9 additions & 18 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,9 @@
"address": "0.0.0.0:3000",
"cors": {
"enabled": true,
"allowed_methods": [
"GET",
"POST",
"PUT",
"DELETE"
],
"allowed_origins": [
"*"
],
"allowed_headers": [
"content-type"
],
"allowed_methods": ["GET", "POST", "PUT", "DELETE"],
"allowed_origins": ["*"],
"allowed_headers": ["content-type"],
"exposed_headers": [],
"allow_credentials": false,
"allow_private_network": false
Expand All @@ -24,12 +15,8 @@
"algorithm": "HS256",
"issuer": "iggy.rs",
"audience": "iggy.rs",
"valid_issuers": [
"iggy.rs"
],
"valid_audiences": [
"iggy.rs"
],
"valid_issuers": ["iggy.rs"],
"valid_audiences": ["iggy.rs"],
"access_token_expiry": 3600,
"refresh_token_expiry": 86400,
"clock_skew": 5,
Expand Down Expand Up @@ -108,6 +95,10 @@
"enabled": false,
"key": ""
},
"compression": {
"allow_override": false,
"default_algorithm": "none"
},
"stream": {
"path": "streams"
},
Expand Down
4 changes: 4 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ size = "4GB"
enabled = false
key = ""

[system.compression]
allow_override = false
default_algorithm = "none"

[system.stream]
path = "streams"

Expand Down
178 changes: 178 additions & 0 deletions iggy/src/compression/compression_algorithm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use serde::{
de::{self, Deserializer, Visitor},
Deserialize, Serialize, Serializer,
};
use std::{
fmt::{Display, Formatter},
str::FromStr,
};

use crate::error::Error;

// for now only those, in the future will add snappy, lz4, zstd (same as in confluent kafka) in addition to that
// we should consider brotli as well.
#[derive(Debug, PartialEq, Clone)]
pub enum CompressionAlgorithm {
None,
Gzip,
}
impl FromStr for CompressionAlgorithm {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"gzip" => Ok(CompressionAlgorithm::Gzip),
"none" => Ok(CompressionAlgorithm::None),
_ => Err(format!("Unknown compression type: {}", s)),
}
}
}

impl CompressionAlgorithm {
pub fn as_code(&self) -> u8 {
match self {
CompressionAlgorithm::None => 1,
CompressionAlgorithm::Gzip => 2,
}
}

pub fn from_code(code: u8) -> Result<Self, Error> {
match code {
1 => Ok(CompressionAlgorithm::None),
2 => Ok(CompressionAlgorithm::Gzip),
_ => Err(Error::InvalidCommand),
}
}
}

impl Display for CompressionAlgorithm {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
CompressionAlgorithm::None => write!(f, "none"),
CompressionAlgorithm::Gzip => write!(f, "gzip"),
}
}
}

impl Serialize for CompressionAlgorithm {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
CompressionAlgorithm::None => serializer.serialize_str("none"),
CompressionAlgorithm::Gzip => serializer.serialize_str("gzip"),
}
}
}

impl From<CompressionAlgorithm> for String {
fn from(value: CompressionAlgorithm) -> Self {
match value {
CompressionAlgorithm::None => "none".to_string(),
CompressionAlgorithm::Gzip => "gzip".to_string(),
}
}
}
struct CompressionKindVisitor;

impl<'de> Visitor<'de> for CompressionKindVisitor {
type Value = CompressionAlgorithm;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a valid compression type, check documentation for more information.")
}

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
CompressionAlgorithm::from_str(value).map_err(de::Error::custom)
}
}

impl<'de> Deserialize<'de> for CompressionAlgorithm {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(CompressionKindVisitor)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_from() {
let none_alg = CompressionAlgorithm::from_str("none");
assert!(none_alg.is_ok());
assert_eq!(none_alg.unwrap(), CompressionAlgorithm::None);

let none_alg = CompressionAlgorithm::from_str("None");
assert!(none_alg.is_ok());
assert_eq!(none_alg.unwrap(), CompressionAlgorithm::None);

let gzip_alg = CompressionAlgorithm::from_str("gzip");
assert!(gzip_alg.is_ok());
assert_eq!(gzip_alg.unwrap(), CompressionAlgorithm::Gzip);

let gzip_alg = CompressionAlgorithm::from_str("Gzip");
assert!(gzip_alg.is_ok());
assert_eq!(gzip_alg.unwrap(), CompressionAlgorithm::Gzip);
}

#[test]
fn test_from_invalid_input() {
let invalid_compression_kind = CompressionAlgorithm::from_str("invalid");
assert!(invalid_compression_kind.is_err());

let invalid_compression_kind = CompressionAlgorithm::from_str("gzipp");
assert!(invalid_compression_kind.is_err());
}

#[test]
fn test_into() {
let none: CompressionAlgorithm = CompressionAlgorithm::None;
let none_string: String = none.into();

assert_eq!(none_string, "none".to_string());

let gzip: CompressionAlgorithm = CompressionAlgorithm::Gzip;
let gzip_string: String = gzip.into();

assert_eq!(gzip_string, "gzip".to_string());
}
#[test]
fn test_as_code() {
let none = CompressionAlgorithm::None;
let none_code = none.as_code();
assert_eq!(none_code, 1);

let gzip = CompressionAlgorithm::Gzip;
let gzip_code = gzip.as_code();
assert_eq!(gzip_code, 2);
}
#[test]
fn test_from_code() {
let none = CompressionAlgorithm::from_code(1);
assert!(none.is_ok());
assert_eq!(none.unwrap(), CompressionAlgorithm::None);

let gzip = CompressionAlgorithm::from_code(2);
assert!(gzip.is_ok());
assert_eq!(gzip.unwrap(), CompressionAlgorithm::Gzip);
}
#[test]
fn test_from_code_invalid_input() {
let invalid_compression_kind = CompressionAlgorithm::from_code(0);
assert!(invalid_compression_kind.is_err());

let invalid_compression_kind = CompressionAlgorithm::from_code(69);
assert!(invalid_compression_kind.is_err());

let invalid_compression_kind = CompressionAlgorithm::from_code(255);
assert!(invalid_compression_kind.is_err());
}
}
1 change: 1 addition & 0 deletions iggy/src/compression/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod compression_algorithm;
1 change: 1 addition & 0 deletions iggy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod clients;
#[cfg(feature = "iggy-cmd")]
pub mod cmd;
pub mod command;
pub mod compression;
pub mod consumer;
pub mod consumer_groups;
pub mod consumer_offsets;
Expand Down
14 changes: 12 additions & 2 deletions server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::configs::server::{
PersonalAccessTokenConfig, ServerConfig,
};
use crate::configs::system::{
CacheConfig, DatabaseConfig, EncryptionConfig, LoggingConfig, PartitionConfig, SegmentConfig,
StreamConfig, SystemConfig, TopicConfig,
CacheConfig, CompressionConfig, DatabaseConfig, EncryptionConfig, LoggingConfig,
PartitionConfig, SegmentConfig, StreamConfig, SystemConfig, TopicConfig,
};
use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
use std::sync::Arc;
Expand Down Expand Up @@ -126,6 +126,7 @@ impl Default for SystemConfig {
topic: TopicConfig::default(),
partition: PartitionConfig::default(),
segment: SegmentConfig::default(),
compression: CompressionConfig::default(),
}
}
}
Expand All @@ -138,6 +139,15 @@ impl Default for DatabaseConfig {
}
}

impl Default for CompressionConfig {
fn default() -> Self {
CompressionConfig {
allow_override: false,
default_algorithm: "none".parse().unwrap(),
}
}
}

impl Default for LoggingConfig {
fn default() -> LoggingConfig {
LoggingConfig {
Expand Down
11 changes: 11 additions & 0 deletions server/src/configs/displays.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::configs::quic::{QuicCertificateConfig, QuicConfig};
use crate::configs::system::CompressionConfig;
use crate::configs::{
http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig},
resource_quota::MemoryResourceQuota,
Expand Down Expand Up @@ -99,6 +100,16 @@ impl Display for MemoryResourceQuota {
}
}

impl Display for CompressionConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{ allowed_override: {}, default_algorithm: {} }}",
self.allow_override, self.default_algorithm
)
}
}

impl Display for ServerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
8 changes: 8 additions & 0 deletions server/src/configs/system.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::configs::resource_quota::MemoryResourceQuota;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -12,13 +13,20 @@ pub struct SystemConfig {
pub partition: PartitionConfig,
pub segment: SegmentConfig,
pub encryption: EncryptionConfig,
pub compression: CompressionConfig,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct DatabaseConfig {
pub path: String,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct CompressionConfig {
pub allow_override: bool,
pub default_algorithm: CompressionAlgorithm,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct LoggingConfig {
pub path: String,
Expand Down
18 changes: 18 additions & 0 deletions server/src/configs/validators.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
extern crate sysinfo;

use super::server::{MessageCleanerConfig, MessageSaverConfig};
use super::system::CompressionConfig;
use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig};
use crate::configs::system::{CacheConfig, SegmentConfig};
use crate::server_error::ServerError;
use crate::streaming::segments::segment;
use byte_unit::{Byte, ByteUnit};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::validatable::Validatable;
use sysinfo::SystemExt;
use tracing::{error, info, warn};
Expand All @@ -14,12 +16,28 @@ impl Validatable<ServerError> for ServerConfig {
fn validate(&self) -> Result<(), ServerError> {
self.system.segment.validate()?;
self.system.cache.validate()?;
self.system.compression.validate()?;
self.personal_access_token.validate()?;

Ok(())
}
}

impl Validatable<ServerError> for CompressionConfig {
fn validate(&self) -> Result<(), ServerError> {
let compression_alg = &self.default_algorithm;
if *compression_alg != CompressionAlgorithm::None {
// TODO(numinex): Change this message once server side compression is fully developed.
warn!(
"Server started with server-side compression enabled, using algorithm: {}, this feature is not implemented yet!",
compression_alg
);
}

Ok(())
}
}

impl Validatable<ServerError> for CacheConfig {
fn validate(&self) -> Result<(), ServerError> {
let limit_bytes = self.size.clone().into();
Expand Down

0 comments on commit ab5f3c9

Please sign in to comment.