Skip to content

Commit

Permalink
fix(backend): Fix env var for multiple queues
Browse files Browse the repository at this point in the history
  • Loading branch information
amaury1093 committed Nov 28, 2024
1 parent 976e8f0 commit ed19166
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 48 deletions.
125 changes: 78 additions & 47 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,50 @@ impl BackendConfig {
}
}

/// Attempt connection to the Postgres database and RabbitMQ. Also populates
/// the internal `pg_pool1, `check_email_channel` and `preprocess_channel`
/// fields with the connections.
pub async fn connect(&mut self) -> Result<(), anyhow::Error> {
let pg_pool = if self.worker.enable {
let db_url = self
.worker
.postgres
.as_ref()
.map(|c| &c.db_url)
.ok_or_else(|| {
anyhow::anyhow!("Worker configuration is missing the postgres configuration")
})?;
Some(create_db(db_url).await?)
} else if let Ok(db_url) = env::var("DATABASE_URL") {
// For legacy reasons, we also support the DATABASE_URL environment variable:
Some(create_db(&db_url).await?)
} else {
None
};
self.pg_pool = pg_pool;

#[cfg(feature = "worker")]
{
let (check_email_channel, preprocess_channel) = if self.worker.enable {
let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| {
anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration")
})?;
let (check_email_channel, preprocess_channel) =
setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?;
(
Some(Arc::new(check_email_channel)),
Some(Arc::new(preprocess_channel)),
)
} else {
(None, None)
};
self.check_email_channel = check_email_channel;
self.preprocess_channel = preprocess_channel;
}

Ok(())
}

pub fn get_pg_pool(&self) -> Option<PgPool> {
self.pg_pool.clone()
}
Expand Down Expand Up @@ -272,7 +316,7 @@ pub struct RabbitMQConfig {
/// Queue names that the worker can consume from. Each email is routed to a
/// one and only one queue, based on the email provider. A single worker can
/// consume from multiple queues.
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum Queue {
Gmail,
HotmailB2B,
Expand Down Expand Up @@ -364,58 +408,45 @@ impl ThrottleConfig {
/// Load the worker configuration from the worker_config.toml file and from the
/// environment.
pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {
let cfg = Config::builder()
let mut cfg = Config::builder()
.add_source(config::File::with_name("backend_config"))
.add_source(
config::Environment::with_prefix("RCH")
.separator("__")
.try_parsing(true)
.list_separator(","),
)
.build()?;

let mut cfg = cfg.try_deserialize::<BackendConfig>()?;
.add_source(config::Environment::with_prefix("RCH").separator("__"));

// The RCH__WORKER__RABBITMQ__QUEUES is always read as a str, whereas we
// sometimes want to parse it as a list of strings, if it's not "all". We
// handle this case separately.
if let Ok(queues) = env::var("RCH__WORKER__RABBITMQ__QUEUES") {
if queues != "all" {
let queues: Vec<String> = queues.split(',').map(String::from).collect();
cfg = cfg.set_override("worker.rabbitmq.queues", queues)?;
}
}
let cfg = cfg.build()?.try_deserialize::<BackendConfig>()?;

if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) {
warn!(target: LOG_TARGET, "worker.enable is set to false, ignoring throttling and concurrency settings.")
}

let pg_pool = if cfg.worker.enable {
let db_url = cfg
.worker
.postgres
.as_ref()
.map(|c| &c.db_url)
.ok_or_else(|| {
anyhow::anyhow!("Worker configuration is missing the postgres configuration")
})?;
Some(create_db(db_url).await?)
} else if let Ok(db_url) = env::var("DATABASE_URL") {
// For legacy reasons, we also support the DATABASE_URL environment variable:
Some(create_db(&db_url).await?)
} else {
None
};
cfg.pg_pool = pg_pool;
Ok(cfg)
}

#[cfg(feature = "worker")]
{
let (check_email_channel, preprocess_channel) = if cfg.worker.enable {
let rabbitmq_config = cfg.worker.rabbitmq.as_ref().ok_or_else(|| {
anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration")
})?;
let (check_email_channel, preprocess_channel) =
setup_rabbit_mq(&cfg.backend_name, rabbitmq_config).await?;
(
Some(Arc::new(check_email_channel)),
Some(Arc::new(preprocess_channel)),
)
} else {
(None, None)
};
cfg.check_email_channel = check_email_channel;
cfg.preprocess_channel = preprocess_channel;
#[cfg(test)]
mod tests {
use super::{load_config, Queue};
use std::env;

#[tokio::test]
async fn test_env_vars() {
env::set_var("RCH__BACKEND_NAME", "test-backend");
env::set_var(
"RCH__WORKER__RABBITMQ__QUEUES",
"check.gmail,check.hotmailb2b",
);
let cfg = load_config().await.unwrap();
assert_eq!(cfg.backend_name, "test-backend");
assert_eq!(
cfg.worker.rabbitmq.unwrap().queues.to_queues(),
vec![Queue::Gmail, Queue::HotmailB2B]
);
}

Ok(cfg)
}
3 changes: 2 additions & 1 deletion backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ async fn main() -> Result<(), anyhow::Error> {
// Initialize logging.
tracing_subscriber::fmt::init();
info!(target: LOG_TARGET, version=?CARGO_PKG_VERSION, "Running Reacher");
let config = load_config().await?;
let mut config = load_config().await?;
config.connect().await?;
debug!(target: LOG_TARGET, "{:#?}", config);

// Setup sentry bug tracking.
Expand Down

0 comments on commit ed19166

Please sign in to comment.