Skip to content

Commit

Permalink
Update deps including librdkafka
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Dec 20, 2023
1 parent 0da10c5 commit d8a3ff6
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 433 deletions.
95 changes: 42 additions & 53 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions callysto-avro/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use callysto::nuclei;
use callysto::nuclei::Task;
use callysto::prelude::message::OwnedMessage;
use callysto::prelude::producer::FutureRecord;
use callysto::prelude::{CStream, ClientConfig};
use callysto::prelude::{CProducer, CStream, ClientConfig};
use callysto::rdkafka::Message;
use crossbeam_channel::Sender;
use cuneiform_fields::prelude::ArchPadding;
Expand Down Expand Up @@ -205,15 +205,12 @@ where
Schema::parse_str(&*schema).map_err(|e| CallystoError::GeneralError(e.to_string()))?;
let schema = sch.clone();
let data_sink = nuclei::spawn(async move {
let producer = Callysto::<()>::producer(cc);
let producer = CProducer::new(cc);
while let Ok(item) = rx.recv() {
let mut writer = Writer::new(&sch, Vec::new());
writer.append_ser(item).unwrap();
let encoded = writer.into_inner().unwrap();
let rec = FutureRecord::to(&topic).payload(&encoded);
producer
.send::<Vec<u8>, _, _>(rec, Duration::from_secs(0))
.await;
producer.send_value(&topic, encoded).await;
trace!("CAvroSink - Ingestion - Sink received an element.");
}
});
Expand Down
28 changes: 14 additions & 14 deletions callysto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,33 @@ sink_postgres = ["tokio", "deadpool-postgres", "deadpool"]
#nuclei = { version = "0.2", default-features = false, features = ["epoll", "async-exec"] }
nuclei = "0.2.1"
lightproc = "0.3.5"
lever = "0.1.3"
thiserror = "1.0.37"
async-trait = "0.1.58"
lever = "0.1.4"
thiserror = "1.0.51"
async-trait = "0.1.74"
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
futures-timer = "3.0.2"
crossbeam-channel = "0.5.6"
rdkafka = { version = "0.28.0", default-features = false, features = ["libz"] }
tracing = "0.1.37"
url = "2.3.1"
libc = "0.2.135"
crossbeam-channel = "0.5.8"
rdkafka = { version = "0.36.0", default-features = false, features = ["libz"] }
tracing = "0.1.40"
url = "2.5.0"
libc = "0.2.151"
cuneiform-fields = "0.1.1"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.87"
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
bincode = "1.3.3"
http-types = "2.12.0"
async-h1 = "2.3.3"
async-h1 = "2.3.4"
pin-project-lite = "0.2.13"
futures-lite = "1.12.0"
futures-lite = "1.13.0"

# Optionals
rocksdb = { version = "0.19.0", optional = true }
elasticsearch = { version = "7.14.0-alpha.1", optional = true }
deadpool-postgres = { version = "0.10.2", features = [
deadpool-postgres = { version = "0.10.5", features = [
"serde",
], optional = true }
deadpool = { version = "0.9.5", optional = true }
async-global-executor = "2.3.0"
async-global-executor = "2.4.1"

[dev-dependencies]
dirs = "4.0.0"
Expand Down
6 changes: 0 additions & 6 deletions callysto/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,6 @@ where
cc
}

///
/// Create default producer.
pub fn producer(cc: ClientConfig) -> FutureProducer {
cc.create().expect("Producer creation error.")
}

pub fn table<T: AsRef<str>>(&self, table_name: T) -> CTable<State> {
if self.storage_url.is_none() {
panic!("Tables can't be used without storage backend. Bailing...");
Expand Down
Loading

0 comments on commit d8a3ff6

Please sign in to comment.