Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend server configuration with compression config #257

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,9 @@
"address": "0.0.0.0:3000",
"cors": {
"enabled": true,
"allowed_methods": [
"GET",
"POST",
"PUT",
"DELETE"
],
"allowed_origins": [
"*"
],
"allowed_headers": [
"content-type"
],
"allowed_methods": ["GET", "POST", "PUT", "DELETE"],
"allowed_origins": ["*"],
"allowed_headers": ["content-type"],
"exposed_headers": [],
"allow_credentials": false,
"allow_private_network": false
Expand All @@ -24,12 +15,8 @@
"algorithm": "HS256",
"issuer": "iggy.rs",
"audience": "iggy.rs",
"valid_issuers": [
"iggy.rs"
],
"valid_audiences": [
"iggy.rs"
],
"valid_issuers": ["iggy.rs"],
"valid_audiences": ["iggy.rs"],
"access_token_expiry": 3600,
"refresh_token_expiry": 86400,
"clock_skew": 5,
Expand Down Expand Up @@ -108,6 +95,10 @@
"enabled": false,
"key": ""
},
"compression": {
"allow_override": false,
"default_algorithm": "none"
},
"stream": {
"path": "streams"
},
Expand Down
4 changes: 4 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ size = "4GB"
enabled = false
key = ""

[system.compression]
allow_override = false
default_algorithm = "none"

[system.stream]
path = "streams"

Expand Down
178 changes: 178 additions & 0 deletions iggy/src/compression/compression_algorithm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use serde::{
de::{self, Deserializer, Visitor},
Deserialize, Serialize, Serializer,
};
use std::{
fmt::{Display, Formatter},
str::FromStr,
};

use crate::error::Error;

// for now only those, in the future will add snappy, lz4, zstd (same as in confluent kafka) in addition to that
// we should consider brotli as well.
#[derive(Debug, PartialEq, Clone)]
pub enum CompressionAlgorithm {
None,
Gzip,
}
impl FromStr for CompressionAlgorithm {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"gzip" => Ok(CompressionAlgorithm::Gzip),
"none" => Ok(CompressionAlgorithm::None),
_ => Err(format!("Unknown compression type: {}", s)),
}
}
}

impl CompressionAlgorithm {
pub fn as_code(&self) -> u8 {
match self {
CompressionAlgorithm::None => 1,
CompressionAlgorithm::Gzip => 2,
}
}

pub fn from_code(code: u8) -> Result<Self, Error> {
match code {
1 => Ok(CompressionAlgorithm::None),
2 => Ok(CompressionAlgorithm::Gzip),
_ => Err(Error::InvalidCommand),
}
}
}

impl Display for CompressionAlgorithm {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
CompressionAlgorithm::None => write!(f, "none"),
CompressionAlgorithm::Gzip => write!(f, "gzip"),
}
}
}

impl Serialize for CompressionAlgorithm {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
CompressionAlgorithm::None => serializer.serialize_str("none"),
CompressionAlgorithm::Gzip => serializer.serialize_str("gzip"),
}
}
}

impl From<CompressionAlgorithm> for String {
fn from(value: CompressionAlgorithm) -> Self {
match value {
CompressionAlgorithm::None => "none".to_string(),
CompressionAlgorithm::Gzip => "gzip".to_string(),
}
}
}
struct CompressionKindVisitor;

impl<'de> Visitor<'de> for CompressionKindVisitor {
type Value = CompressionAlgorithm;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a valid compression type, check documentation for more information.")
}

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
CompressionAlgorithm::from_str(value).map_err(de::Error::custom)
}
}

impl<'de> Deserialize<'de> for CompressionAlgorithm {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(CompressionKindVisitor)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_from() {
let none_alg = CompressionAlgorithm::from_str("none");
assert!(none_alg.is_ok());
assert_eq!(none_alg.unwrap(), CompressionAlgorithm::None);

let none_alg = CompressionAlgorithm::from_str("None");
assert!(none_alg.is_ok());
assert_eq!(none_alg.unwrap(), CompressionAlgorithm::None);

let gzip_alg = CompressionAlgorithm::from_str("gzip");
assert!(gzip_alg.is_ok());
assert_eq!(gzip_alg.unwrap(), CompressionAlgorithm::Gzip);

let gzip_alg = CompressionAlgorithm::from_str("Gzip");
assert!(gzip_alg.is_ok());
assert_eq!(gzip_alg.unwrap(), CompressionAlgorithm::Gzip);
}

#[test]
fn test_from_invalid_input() {
let invalid_compression_kind = CompressionAlgorithm::from_str("invalid");
assert!(invalid_compression_kind.is_err());

let invalid_compression_kind = CompressionAlgorithm::from_str("gzipp");
assert!(invalid_compression_kind.is_err());
}

#[test]
fn test_into() {
let none: CompressionAlgorithm = CompressionAlgorithm::None;
let none_string: String = none.into();

assert_eq!(none_string, "none".to_string());

let gzip: CompressionAlgorithm = CompressionAlgorithm::Gzip;
let gzip_string: String = gzip.into();

assert_eq!(gzip_string, "gzip".to_string());
}
#[test]
fn test_as_code() {
let none = CompressionAlgorithm::None;
let none_code = none.as_code();
assert_eq!(none_code, 1);

let gzip = CompressionAlgorithm::Gzip;
let gzip_code = gzip.as_code();
assert_eq!(gzip_code, 2);
}
#[test]
fn test_from_code() {
let none = CompressionAlgorithm::from_code(1);
assert!(none.is_ok());
assert_eq!(none.unwrap(), CompressionAlgorithm::None);

let gzip = CompressionAlgorithm::from_code(2);
assert!(gzip.is_ok());
assert_eq!(gzip.unwrap(), CompressionAlgorithm::Gzip);
}
#[test]
fn test_from_code_invalid_input() {
let invalid_compression_kind = CompressionAlgorithm::from_code(0);
assert!(invalid_compression_kind.is_err());

let invalid_compression_kind = CompressionAlgorithm::from_code(69);
assert!(invalid_compression_kind.is_err());

let invalid_compression_kind = CompressionAlgorithm::from_code(255);
assert!(invalid_compression_kind.is_err());
}
}
1 change: 1 addition & 0 deletions iggy/src/compression/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod compression_algorithm;
1 change: 1 addition & 0 deletions iggy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod clients;
#[cfg(feature = "iggy-cmd")]
pub mod cmd;
pub mod command;
pub mod compression;
pub mod consumer;
pub mod consumer_groups;
pub mod consumer_offsets;
Expand Down
14 changes: 12 additions & 2 deletions server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::configs::server::{
PersonalAccessTokenConfig, ServerConfig,
};
use crate::configs::system::{
CacheConfig, DatabaseConfig, EncryptionConfig, LoggingConfig, PartitionConfig, SegmentConfig,
StreamConfig, SystemConfig, TopicConfig,
CacheConfig, CompressionConfig, DatabaseConfig, EncryptionConfig, LoggingConfig,
PartitionConfig, SegmentConfig, StreamConfig, SystemConfig, TopicConfig,
};
use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
use std::sync::Arc;
Expand Down Expand Up @@ -126,6 +126,7 @@ impl Default for SystemConfig {
topic: TopicConfig::default(),
partition: PartitionConfig::default(),
segment: SegmentConfig::default(),
compression: CompressionConfig::default(),
}
}
}
Expand All @@ -138,6 +139,15 @@ impl Default for DatabaseConfig {
}
}

impl Default for CompressionConfig {
fn default() -> Self {
CompressionConfig {
allow_override: false,
default_algorithm: "none".parse().unwrap(),
}
}
}

impl Default for LoggingConfig {
fn default() -> LoggingConfig {
LoggingConfig {
Expand Down
11 changes: 11 additions & 0 deletions server/src/configs/displays.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::configs::quic::{QuicCertificateConfig, QuicConfig};
use crate::configs::system::CompressionConfig;
use crate::configs::{
http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig},
resource_quota::MemoryResourceQuota,
Expand Down Expand Up @@ -99,6 +100,16 @@ impl Display for MemoryResourceQuota {
}
}

impl Display for CompressionConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{ allowed_override: {}, default_algorithm: {} }}",
self.allow_override, self.default_algorithm
)
}
}

impl Display for ServerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
8 changes: 8 additions & 0 deletions server/src/configs/system.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::configs::resource_quota::MemoryResourceQuota;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -12,13 +13,20 @@ pub struct SystemConfig {
pub partition: PartitionConfig,
pub segment: SegmentConfig,
pub encryption: EncryptionConfig,
pub compression: CompressionConfig,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct DatabaseConfig {
pub path: String,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct CompressionConfig {
pub allow_override: bool,
pub default_algorithm: CompressionAlgorithm,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct LoggingConfig {
pub path: String,
Expand Down
18 changes: 18 additions & 0 deletions server/src/configs/validators.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
extern crate sysinfo;

use super::server::{MessageCleanerConfig, MessageSaverConfig};
use super::system::CompressionConfig;
use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig};
use crate::configs::system::{CacheConfig, SegmentConfig};
use crate::server_error::ServerError;
use crate::streaming::segments::segment;
use byte_unit::{Byte, ByteUnit};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::validatable::Validatable;
use sysinfo::SystemExt;
use tracing::{error, info, warn};
Expand All @@ -14,12 +16,28 @@ impl Validatable<ServerError> for ServerConfig {
fn validate(&self) -> Result<(), ServerError> {
self.system.segment.validate()?;
self.system.cache.validate()?;
self.system.compression.validate()?;
self.personal_access_token.validate()?;

Ok(())
}
}

impl Validatable<ServerError> for CompressionConfig {
fn validate(&self) -> Result<(), ServerError> {
let compression_alg = &self.default_algorithm;
if *compression_alg != CompressionAlgorithm::None {
// TODO(numinex): Change this message once server side compression is fully developed.
warn!(
"Server started with server-side compression enabled, using algorithm: {}, this feature is not implemented yet!",
compression_alg
);
}

Ok(())
}
}

impl Validatable<ServerError> for CacheConfig {
fn validate(&self) -> Result<(), ServerError> {
let limit_bytes = self.size.clone().into();
Expand Down