Skip to content

Commit

Permalink
Replace log with tracing and add JSON logging option
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jan 24, 2024
1 parent 4907a35 commit 5442f5c
Show file tree
Hide file tree
Showing 15 changed files with 450 additions and 351 deletions.
670 changes: 369 additions & 301 deletions Cargo.lock

Large diffs are not rendered by default.

49 changes: 34 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
[workspace]
members = ["clade"]

[workspace.dependencies]
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-csv = "49.0.0"
arrow-flight = "49.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "49.0.0"
arrow-schema = "49.0.0"
async-trait = "0.1.64"

datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
tracing = { version = "0.1", features = ["log"] }

[package]
name = "seafowl"
build = "build.rs"
Expand Down Expand Up @@ -49,16 +69,16 @@ arrow-select = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-wi
arrow-string = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }

[dependencies]
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-csv = "49.0.0"
arrow-flight = { version = "49.0.0", optional = true }
arrow = { workspace = true }
arrow-buffer = { workspace = true }
arrow-csv = { workspace = true }
arrow-flight = { workspace = true, optional = true }
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "49.0.0"
arrow-schema = "49.0.0"
async-trait = "0.1.64"
arrow-integration-test = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
base64 = "0.21.0"

bytes = "1.4.0"
Expand All @@ -73,24 +93,22 @@ convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch

dashmap = "5.4.0"

datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9264edea89a2fc1c35f4a6b9faab125748ff3651", features = ["s3-native-tls", "datafusion-ext"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = ">=0.10.0"
itertools = { workspace = true }
lazy_static = ">=1.4.0"
log = "0.4"
moka = { version = "0.12.2", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.8"
parking_lot = "0.12.1"
percent-encoding = "2.2.0"
pretty_env_logger = "0.4"
prost = "0.12.1"

# Needs to be in non-dev because repository::testutils can't be
Expand All @@ -111,8 +129,10 @@ strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "1"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
tokio = { workspace = true }
tonic = { version = "0.10.0", optional = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
url = "2.5"
uuid = "1.2.1"
warp = "0.3.6"
Expand All @@ -132,7 +152,6 @@ wiremock = "0.5"

[build-dependencies]
anyhow = "1.0.63" # for build.rs
prost-build = "0.12.1"
vergen = "7"

[profile.release]
Expand Down
20 changes: 10 additions & 10 deletions datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-schema = "49.0.0"
async-trait = "0.1.64"
arrow = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-34-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"
itertools = ">=0.10.0"
log = "0.4"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
itertools = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
rstest = "*"
2 changes: 1 addition & 1 deletion datafusion_remote_tables/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use log::debug;
use std::any::Any;
use std::ops::Deref;
use std::sync::Arc;
use tokio::task;
use tracing::debug;

// Implementation of a remote table, capable of querying Postgres, MySQL, SQLite, etc...
pub struct RemoteTable {
Expand Down
2 changes: 1 addition & 1 deletion src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::object_store::cache::{
};
use config::{Config, ConfigError, Environment, File, FileFormat, Map};
use hex::encode;
use log::{info, warn};
use object_store::DynObjectStore;
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use sqlx::sqlite::SqliteJournalMode;
use tempfile::TempDir;
use tracing::{info, warn};

pub const DEFAULT_DATA_DIR: &str = "seafowl-data";
pub const DEFAULT_SQLITE_DB: &str = "seafowl.sqlite";
Expand Down
2 changes: 1 addition & 1 deletion src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::writer::create_add;
use deltalake::DeltaTable;
use futures::{StreamExt, TryStreamExt};
use log::{debug, info, warn};
use object_store::path::Path;
use std::collections::HashMap;
use std::fs::File;
Expand All @@ -33,6 +32,7 @@ use tempfile::{NamedTempFile, TempPath};
use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Semaphore;
use tracing::{debug, info, warn};
use uuid::Uuid;

// Max Parquet row group size, in rows. This is what the ArrowWriter uses to determine how many
Expand Down
2 changes: 1 addition & 1 deletion src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use datafusion_common::TableReference;
use datafusion_expr::logical_plan::{Extension, LogicalPlan};
use deltalake::DeltaTable;
use itertools::Itertools;
use log::debug;
use sqlparser::ast::{
AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, FunctionDefinition,
ObjectType, Query, Statement, TableFactor, TableWithJoins, VisitMut,
};
use std::borrow::Cow;
use std::sync::Arc;
use tracing::debug;

pub fn is_read_only(plan: &LogicalPlan) -> bool {
!matches!(
Expand Down
2 changes: 1 addition & 1 deletion src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::DeltaTable;
use log::info;
use object_store::path::Path;
use std::borrow::Cow;
use std::ops::Deref;
use std::ops::Not;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::info;
use url::Url;

/// Create an ExecutionPlan that doesn't produce any results.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use deltalake::parquet::data_type::AsBytes;
use deltalake::DeltaTable;
use futures::{future, StreamExt};
use hex::encode;
use log::{debug, info, warn};
use percent_encoding::{percent_decode_str, utf8_percent_encode, NON_ALPHANUMERIC};
use serde::Deserialize;
use serde_json::json;
use sha2::{Digest, Sha256};
use tokio::sync::broadcast::Receiver;
use tracing::{debug, info, warn};
use warp::http::HeaderValue;
use warp::multipart::{FormData, Part};
use warp::reply::{with_header, Response};
Expand Down
40 changes: 26 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use clap::Parser;

use futures::{future::join_all, Future, FutureExt};

use pretty_env_logger::env_logger;
#[cfg(feature = "frontend-arrow-flight")]
use seafowl::frontend::flight::run_flight_server;
use seafowl::{
Expand All @@ -29,14 +28,13 @@ use tokio::signal::ctrl_c;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast::{channel, Sender};
use tokio::time::{interval, Duration};
use tracing::level_filters::LevelFilter;
use tracing::{error, info, subscriber, warn};
use tracing_subscriber::{filter::EnvFilter, FmtSubscriber};

#[cfg(feature = "frontend-postgres")]
use seafowl::frontend::postgres::run_pg_server;

extern crate pretty_env_logger;
#[macro_use]
extern crate log;

const DEFAULT_CONFIG_PATH: &str = "seafowl.toml";

#[derive(Debug, Parser)]
Expand All @@ -62,6 +60,28 @@ struct Args {
takes_value = false
)]
cli: bool,

#[clap(long, help = "Enable JSON logging", takes_value = false)]
json_logs: bool,
}

fn prepare_tracing(json_logs: bool) {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
.add_directive("sqlx=info".parse().unwrap());

let sub = FmtSubscriber::builder()
.with_thread_ids(true)
.with_thread_names(true)
.with_env_filter(env_filter);

if json_logs {
subscriber::set_global_default(sub.json().finish())
} else {
subscriber::set_global_default(sub.finish())
}
.expect("Global logging config set");
}

fn prepare_frontends(
Expand Down Expand Up @@ -167,15 +187,7 @@ async fn main() {
}

if !args.cli {
let mut builder = pretty_env_logger::formatted_timed_builder();

builder
.parse_filters(
env::var(env_logger::DEFAULT_FILTER_ENV)
.unwrap_or_else(|_| "sqlx=warn,info".to_string())
.as_str(),
)
.init();
prepare_tracing(args.json_logs)
}

info!("Starting Seafowl {}", env!("VERGEN_BUILD_SEMVER"));
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use crate::config::schema::str_to_hex_hash;
use async_trait::async_trait;
use bytes::{Buf, BufMut, Bytes};
use futures::stream::BoxStream;
use log::{debug, error, warn};
use moka::future::{Cache, CacheBuilder, FutureExt};
use moka::notification::RemovalCause;
use object_store::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions,
PutResult,
};
use tracing::{debug, error, warn};

use std::fmt::Display;
use std::fmt::{Debug, Formatter};
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::object_store::cache::{
};
use datafusion::prelude::SessionContext;
use lazy_static::lazy_static;
use log::warn;
use regex::Regex;
use reqwest::{header, Client, ClientBuilder, RequestBuilder, Response, StatusCode};
use std::env;
Expand All @@ -32,6 +31,7 @@ use std::ops::Range;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::io::AsyncWrite;
use tracing::warn;
use url::Url;
use warp::hyper::header::{
IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, RANGE,
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/wrapped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use crate::config::schema;
use crate::config::schema::{Local, GCS, S3};
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryFutureExt};
use log::debug;
use object_store::{
path::Path, Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use tokio::io::AsyncWrite;
use tracing::debug;

use tokio::fs::{copy, create_dir_all, remove_file, rename};

Expand Down
2 changes: 1 addition & 1 deletion src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use datafusion_common::DFSchema;
use datafusion_expr::{expr::Alias, Expr};
use deltalake::DeltaTable;

use log::warn;
use parking_lot::RwLock;
use tracing::warn;

use crate::repository::interface::FunctionId;
use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
Expand Down
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use arrow::json::LineDelimitedWriter;
use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use hex::encode;
use log::{info, warn};
use sha2::{Digest, Sha256};
use tokio::{fs::File, io::AsyncWrite};
use tracing::{info, warn};

use crate::context::SeafowlContext;
use crate::repository::interface::DroppedTableDeletionStatus;
Expand Down

0 comments on commit 5442f5c

Please sign in to comment.