Skip to content

Commit

Permalink
feat: publish message in separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Oct 9, 2023
1 parent b0462d8 commit 02afa13
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn main() {
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher,
publisher: messenger_kafka_publisher.into(),
rx_actions_channel,
tx_feedback_channel,
};
Expand Down
1 change: 0 additions & 1 deletion packages/talos_messenger_actions/src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use talos_certifier_adapters::kafka::utils::build_kafka_headers;
use talos_rdkafka_utils::kafka_config::KafkaConfig;

// Kafka Producer
// #[derive(Clone)]
pub struct KafkaProducer<C: ProducerContext + 'static = DefaultProducerContext> {
producer: ThreadedProducer<C>,
topic: String,
Expand Down
12 changes: 9 additions & 3 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use log::{error, info};
use tokio::sync::mpsc;
Expand All @@ -11,8 +13,8 @@ use talos_messenger_core::{
use super::models::KafkaAction;

#[derive(Debug)]
pub struct KafkaActionService<M: MessengerPublisher<Payload = KafkaAction> + Send + Sync> {
pub publisher: M,
pub struct KafkaActionService<M: MessengerPublisher<Payload = KafkaAction> + Send + Sync + 'static> {
pub publisher: Arc<M>,
pub rx_actions_channel: mpsc::Receiver<MessengerCommitActions>,
pub tx_feedback_channel: mpsc::Sender<MessengerChannelFeedback>,
}
Expand All @@ -37,9 +39,13 @@ where
Ok(actions) => {

let total_len = actions.len() as u32;

for action in actions {
let publisher = self.publisher.clone();
// Publish the message
self.publisher.send(version, action, total_len ).await;
tokio::spawn(async move {
publisher.send(version, action, total_len ).await;
});

}
},
Expand Down

0 comments on commit 02afa13

Please sign in to comment.