Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes required for Parseable Enterprise #1215

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ pub async fn remove_manifest_from_snapshot(
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Can't remove manifest from within Index server",
),
))),
}
}

Expand All @@ -350,6 +356,7 @@ pub async fn get_first_event(
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match PARSEABLE.options.mode {
Mode::Index => unimplemented!(),
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
Expand Down
1 change: 1 addition & 0 deletions src/enterprise/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod utils;
159 changes: 159 additions & 0 deletions src/enterprise/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use datafusion::{common::Column, prelude::Expr};
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::query::stream_schema_provider::extract_primary_filter;
use crate::{
catalog::{
manifest::{File, Manifest},
snapshot, Snapshot,
},
event,
parseable::PARSEABLE,
query::{stream_schema_provider::ManifestExt, PartialTimeFilter},
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
utils::time::TimeRange,
};

pub fn create_time_filter(
time_range: &TimeRange,
time_partition: Option<String>,
table_name: &str,
) -> Vec<Expr> {
let mut new_filters = vec![];
let start_time = time_range.start.naive_utc();
let end_time = time_range.end.naive_utc();
let mut _start_time_filter: Expr;
let mut _end_time_filter: Expr;

match time_partition {
Some(time_partition) => {
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
time_partition.clone(),
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(
Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)),
);
}
None => {
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
_end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
}
}

new_filters.push(_start_time_filter);
new_filters.push(_end_time_filter);

new_filters
}

pub async fn fetch_parquet_file_paths(
stream: &str,
time_range: &TimeRange,
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
let glob_storage = PARSEABLE.storage.get_object_store();

let object_store_format = glob_storage.get_object_store_format(stream).await?;

let time_partition = object_store_format.time_partition;

let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);

let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);

let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();

let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}

let manifest_files = collect_manifest_files(
glob_storage,
merged_snapshot
.manifests(&time_filters)
.into_iter()
.sorted_by_key(|file| file.time_lower_bound)
.map(|item| item.manifest_path)
.collect(),
)
.await?;

let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();

let mut selected_files = manifest_files
.into_iter()
.flat_map(|file| file.files)
.rev()
.collect_vec();

for filter in time_filter_expr {
selected_files.retain(|file| !file.can_be_pruned(&filter))
}

selected_files
.into_iter()
.map(|file| {
let date = file.file_path.split("/").collect_vec();

let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);
Comment on lines +125 to +127
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check for potential out-of-bounds slice access.

Splitting on "/" and calling date.as_slice()[1..4] can panic if the path has fewer than four segments. Handle or verify the path structure before slicing to prevent a runtime panic.

Possible fix:

- let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
+ if date.len() < 4 {
+     // Decide whether to skip this file or handle the error
+     return Err(
+         ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
+     );
+ }
+ let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
if date.len() < 4 {
// Decide whether to skip this file or handle the error
return Err(
ObjectStorageError::Generic(format!("Unexpected path format: {:?}", date))
);
}
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);


parquet_files.entry(date).or_default().push(file);
})
.for_each(|_| {});

Ok(parquet_files)
}

async fn collect_manifest_files(
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, ObjectStorageError> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await
}));
});

let mut op = Vec::new();
for task in tasks {
let file = task.await??;
op.push(file);
}

Ok(op
.into_iter()
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
.collect())
}
19 changes: 19 additions & 0 deletions src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,25 @@ where
Ok(res)
})
}

Mode::Index => {
let accessable_endpoints = ["create", "delete"];
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
if !cond {
Box::pin(async {
Err(actix_web::error::ErrorUnauthorized(
"Only Index API can be accessed in Index Mode",
))
})
} else {
let fut = self.service.call(req);

Box::pin(async move {
let res = fut.await?;
Ok(res)
})
}
}
}
}
}
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ pub mod alerts;
pub mod analytics;
pub mod audit;
pub mod banner;
mod catalog;
pub mod catalog;
mod cli;
#[cfg(feature = "kafka")]
pub mod connectors;
pub mod correlation;
pub mod enterprise;
mod event;
pub mod handlers;
pub mod hottier;
Expand All @@ -37,15 +38,15 @@ mod oidc;
pub mod option;
pub mod otel;
pub mod parseable;
mod query;
pub mod query;
pub mod rbac;
mod response;
mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod users;
mod utils;
pub mod utils;
mod validator;

use std::time::Duration;
Expand Down
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::process::exit;

/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
Expand Down Expand Up @@ -37,6 +39,10 @@ async fn main() -> anyhow::Result<()> {
let server: Box<dyn ParseableServer> = match &PARSEABLE.options.mode {
Mode::Query => Box::new(QueryServer),
Mode::Ingest => Box::new(IngestServer),
Mode::Index => {
println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!");
exit(0)
}
Mode::All => Box::new(Server),
};

Expand Down
2 changes: 2 additions & 0 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
pub enum Mode {
Query,
Ingest,
Index,
#[default]
All,
}
Expand Down Expand Up @@ -128,6 +129,7 @@ pub mod validation {
"query" => Ok(Mode::Query),
"ingest" => Ok(Mode::Ingest),
"all" => Ok(Mode::All),
"index" => Ok(Mode::Index),
_ => Err("Invalid MODE provided".to_string()),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl Parseable {
match self.options.mode {
Mode::Query => "Distributed (Query)",
Mode::Ingest => "Distributed (Ingest)",
Mode::Index => "Distributed (Index)",
Mode::All => "Standalone",
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,7 @@ impl Stream {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
warn!("File ({}) not found; Error = {err}", file.display());
continue;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ pub fn extract_primary_filter(
.collect()
}

trait ManifestExt: ManifestFile {
pub trait ManifestExt: ManifestFile {
fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> {
let name = match partial_filter {
Expr::BinaryExpr(binary_expr) => {
Expand Down
23 changes: 22 additions & 1 deletion src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use datafusion::{
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use object_store::{
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
buffered::BufReader,
limit::LimitStore,
path::Path as StorePath,
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
};
use relative_path::{RelativePath, RelativePathBuf};
use tracing::{error, info};
Expand Down Expand Up @@ -423,6 +424,26 @@ impl BlobStore {

#[async_trait]
impl ObjectStorage for BlobStore {
async fn get_buffered_reader(
&self,
_path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Buffered reader not implemented for Blob Storage yet",
),
)))
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Head operation not implemented for Blob Storage yet",
),
)))
}

async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
Ok(self._get_object(path).await?)
}
Expand Down
20 changes: 20 additions & 0 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use bytes::Bytes;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use fs_extra::file::CopyOptions;
use futures::{stream::FuturesUnordered, TryStreamExt};
use object_store::{buffered::BufReader, ObjectMeta};
use relative_path::{RelativePath, RelativePathBuf};
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -103,6 +104,25 @@ impl LocalFS {

#[async_trait]
impl ObjectStorage for LocalFS {
async fn get_buffered_reader(
&self,
_path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Buffered reader not implemented for LocalFS yet",
),
)))
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Head operation not implemented for LocalFS yet",
),
)))
}
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
let time = Instant::now();
let file_path = self.path_in_root(path);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use chrono::Local;
use object_store::path::Path;
use relative_path::RelativePath;
use serde::{Deserialize, Serialize};
use tokio::task::JoinError;

use crate::{
catalog::snapshot::Snapshot,
Expand Down Expand Up @@ -254,6 +255,9 @@ pub enum ObjectStorageError {

#[error("{0}")]
StandaloneWithDistributed(#[from] StandaloneWithDistributed),

#[error("JoinError: {0}")]
JoinError(#[from] JoinError),
}

pub fn to_object_store_path(path: &RelativePath) -> Path {
Expand Down
Loading
Loading