Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New job queue: worker registration and leader election #3307

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 8 additions & 38 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,10 @@ rayon.opt-level = 3
regalloc2.opt-level = 3
sha2.opt-level = 3
sqlx-macros.opt-level = 3

[patch.crates-io]
sqlx = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-core = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-macros = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-macros-core = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-postgres = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
2 changes: 2 additions & 0 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ impl Options {
&mailer,
homeserver_connection.clone(),
url_builder.clone(),
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)
.await?;

Expand Down
39 changes: 34 additions & 5 deletions crates/cli/src/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ use rand::{
};
use tracing::{info, info_span};

use crate::util::{
database_pool_from_config, mailer_from_config, site_config_from_config, templates_from_config,
use crate::{
shutdown::ShutdownManager,
util::{
database_pool_from_config, mailer_from_config, site_config_from_config,
templates_from_config,
},
};

#[derive(Parser, Debug, Default)]
pub(super) struct Options {}

impl Options {
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
let shutdown = ShutdownManager::new()?;
let span = info_span!("cli.worker.init").entered();
let config = AppConfig::extract(figment)?;

Expand Down Expand Up @@ -71,11 +76,35 @@ impl Options {
let worker_name = Alphanumeric.sample_string(&mut rng, 10);

info!(worker_name, "Starting task scheduler");
let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn, url_builder).await?;

let monitor = mas_tasks::init(
&worker_name,
&pool,
&mailer,
conn,
url_builder,
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)
.await?;

// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
span.exit();

monitor.run().await?;
shutdown.run().await;

Ok(ExitCode::SUCCESS)
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions crates/storage-pg/migrations/20241004075132_queue_worker.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.

-- This table stores informations about worker, mostly to track their health
CREATE TABLE queue_workers (
queue_worker_id UUID NOT NULL PRIMARY KEY,

-- When the worker was registered
registered_at TIMESTAMP WITH TIME ZONE NOT NULL,

-- When the worker was last seen
last_seen_at TIMESTAMP WITH TIME ZONE NOT NULL,

-- When the worker was shut down
shutdown_at TIMESTAMP WITH TIME ZONE
);

-- This single-row table stores the leader of the queue
-- The leader is responsible for running maintenance tasks
CREATE UNLOGGED TABLE queue_leader (
-- This makes the row unique
active BOOLEAN NOT NULL DEFAULT TRUE UNIQUE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it sounds silly, but I'd make this a PRIMARY KEY — maybe this sounds dogmatic? But there a handful of tools are not happy with tables that don't have a primary key, e.g. logical replication in Postgres by default, I'd say it's worth always using it instead of UNIQUE etc.


-- When the leader was elected
elected_at TIMESTAMP WITH TIME ZONE NOT NULL,

-- Until when the lease is valid
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,

-- The worker ID of the leader
queue_worker_id UUID NOT NULL REFERENCES queue_workers (queue_worker_id),

-- This, combined with the unique constraint, makes sure we only ever have a single row
CONSTRAINT queue_leader_active CHECK (active IS TRUE)
);
Loading
Loading