Skip to content

Commit

Permalink
Changes required for Parseable Enterprise
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Feb 28, 2025
1 parent 1b4ea73 commit 48bd263
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/enterprise/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod utils;
160 changes: 160 additions & 0 deletions src/enterprise/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use datafusion::{common::Column, prelude::Expr};
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::query::stream_schema_provider::extract_primary_filter;
use crate::{
catalog::{
manifest::{File, Manifest},
snapshot, Snapshot,
},
event,
parseable::PARSEABLE,
query::{stream_schema_provider::ManifestExt, PartialTimeFilter},
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
utils::time::TimeRange,
};

pub fn create_time_filter(
time_range: &TimeRange,
time_partition: Option<String>,
table_name: &str,
) -> Vec<Expr> {
let mut new_filters = vec![];
let start_time = time_range.start.naive_utc();
let end_time = time_range.end.naive_utc();
let mut _start_time_filter: Expr;
let mut _end_time_filter: Expr;

match time_partition {
Some(time_partition) => {
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
time_partition.clone(),
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(
Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)),
);
}
None => {
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
_end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
}
}

new_filters.push(_start_time_filter);
new_filters.push(_end_time_filter);

new_filters
}

pub async fn fetch_parquet_file_paths(
stream: &str,
time_range: &TimeRange,
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
let glob_storage = PARSEABLE.storage.get_object_store();

let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();

let time_partition = object_store_format.time_partition;

let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);

let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);

let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();

let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}

let manifest_files = collect_manifest_files(
glob_storage,
merged_snapshot
.manifests(&time_filters)
.into_iter()
.sorted_by_key(|file| file.time_lower_bound)
.map(|item| item.manifest_path)
.collect(),
)
.await
.unwrap();

let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();

let mut selected_files = manifest_files
.into_iter()
.flat_map(|file| file.files)
.rev()
.collect_vec();

for filter in time_filter_expr {
selected_files.retain(|file| !file.can_be_pruned(&filter))
}

selected_files
.into_iter()
.map(|file| {
let date = file.file_path.split("/").collect_vec();

let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);

parquet_files.entry(date).or_default().push(file);
})
.for_each(|_| {});

Ok(parquet_files)
}

async fn collect_manifest_files(
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, object_store::Error> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
let storage = Arc::clone(&storage);
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await.unwrap()
}));
});

let mut op = Vec::new();
for task in tasks {
let file = task.await.unwrap();
op.push(file);
}

Ok(op
.into_iter()
.map(|res| serde_json::from_slice(&res).unwrap())
.collect())
}
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ pub mod alerts;
pub mod analytics;
pub mod audit;
pub mod banner;
mod catalog;
pub mod catalog;
mod cli;
#[cfg(feature = "kafka")]
pub mod connectors;
pub mod correlation;
pub mod enterprise;
mod event;
pub mod handlers;
pub mod hottier;
Expand All @@ -37,15 +38,15 @@ mod oidc;
pub mod option;
pub mod otel;
pub mod parseable;
mod query;
pub mod query;
pub mod rbac;
mod response;
mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod users;
mod utils;
pub mod utils;
mod validator;

use std::time::Duration;
Expand Down
5 changes: 1 addition & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,7 @@ impl Stream {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
warn!("File ({}) not found; Error = {err}", file.display());
continue;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ pub fn extract_primary_filter(
.collect()
}

trait ManifestExt: ManifestFile {
pub trait ManifestExt: ManifestFile {
fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> {
let name = match partial_filter {
Expr::BinaryExpr(binary_expr) => {
Expand Down
13 changes: 12 additions & 1 deletion src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use datafusion::{
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use object_store::{
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
buffered::BufReader,
limit::LimitStore,
path::Path as StorePath,
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
};
use relative_path::{RelativePath, RelativePathBuf};
use tracing::{error, info};
Expand Down Expand Up @@ -423,6 +424,16 @@ impl BlobStore {

#[async_trait]
impl ObjectStorage for BlobStore {
async fn get_buffered_reader(
&self,
_path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
unimplemented!()
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
unimplemented!()
}

async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
Ok(self._get_object(path).await?)
}
Expand Down
10 changes: 10 additions & 0 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use bytes::Bytes;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use fs_extra::file::CopyOptions;
use futures::{stream::FuturesUnordered, TryStreamExt};
use object_store::{buffered::BufReader, ObjectMeta};
use relative_path::{RelativePath, RelativePathBuf};
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -103,6 +104,15 @@ impl LocalFS {

#[async_trait]
impl ObjectStorage for LocalFS {
async fn get_buffered_reader(
&self,
_path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
unimplemented!()
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
unimplemented!()
}
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
let time = Instant::now();
let file_path = self.path_in_root(path);
Expand Down
7 changes: 7 additions & 0 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use object_store::buffered::BufReader;
use object_store::ObjectMeta;
use once_cell::sync::OnceCell;
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
Expand Down Expand Up @@ -74,6 +76,11 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync

#[async_trait]
pub trait ObjectStorage: Debug + Send + Sync + 'static {
async fn get_buffered_reader(
&self,
path: &RelativePath,
) -> Result<BufReader, ObjectStorageError>;
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError>;
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError>;
// TODO: make the filter function optional as we may want to get all objects
async fn get_objects(
Expand Down
23 changes: 18 additions & 5 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ use datafusion::{
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use object_store::{
aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum},
buffered::BufReader,
limit::LimitStore,
path::Path as StorePath,
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
};
use relative_path::{RelativePath, RelativePathBuf};
use tracing::{error, info};
Expand Down Expand Up @@ -310,9 +311,6 @@ impl ObjectStorageProvider for S3Config {
fn construct_client(&self) -> Arc<dyn ObjectStorage> {
let s3 = self.get_default_builder().build().unwrap();

// limit objectstore to a concurrent request limit
let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS);

Arc::new(S3 {
client: s3,
bucket: self.bucket_name.clone(),
Expand All @@ -331,7 +329,7 @@ impl ObjectStorageProvider for S3Config {

#[derive(Debug)]
pub struct S3 {
client: LimitStore<AmazonS3>,
client: AmazonS3,
bucket: String,
root: StorePath,
}
Expand Down Expand Up @@ -557,6 +555,21 @@ impl S3 {

#[async_trait]
impl ObjectStorage for S3 {
async fn get_buffered_reader(
&self,
path: &RelativePath,
) -> Result<BufReader, ObjectStorageError> {
let path = &to_object_store_path(path);
let meta = self.client.head(path).await.unwrap();

let store: Arc<dyn ObjectStore> = Arc::new(self.client.clone());
let buf = object_store::buffered::BufReader::new(store, &meta);
Ok(buf)
}
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
Ok(self.client.head(&to_object_store_path(path)).await.unwrap())
}

async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
Ok(self._get_object(path).await?)
}
Expand Down

0 comments on commit 48bd263

Please sign in to comment.