Skip to content

Commit

Permalink
isolate underway migrations
Browse files Browse the repository at this point in the history
This is something of a hack to work around the fact that SQLx migrations
do not currently support specifying a schema under which the migrations
table will live.

Here we provide a search path throughout our migrations and in the
transaction that will run the migrations to ensure that migrations are
applied to `underway._sqlx_migrations`. Note that this assumes a
`public._sqlx_migrations` exists.

In the future we should be able to use `sqlx.toml` to address this more
robustly. That's expected as part of the `0.9.0` release of SQLx. Please
see: launchbadge/sqlx#3383

Closes #11
  • Loading branch information
maxcountryman committed Oct 31, 2024
1 parent 707f284 commit b1691ad
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 11 deletions.
2 changes: 1 addition & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/graceful_shutdown/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/multitask/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Create the task queue.
let queue = Queue::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/rag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;

underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

let openai_client = Client::new();

Expand Down
2 changes: 1 addition & 1 deletion examples/scheduled/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(&database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/step/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Create our job.
let job = Job::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;
underway::run_migrations(&pool).await?;

// Build the job.
let job = Job::builder()
Expand Down
7 changes: 7 additions & 0 deletions migrations/20240921151751_0.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
create schema if not exists underway;

-- Make sure we're in the Underway schema for the migration.
set search_path to underway;

-- Manage Underway migrations within the Underway schema.
create table if not exists underway._sqlx_migrations
(like public._sqlx_migrations including all);

create table underway.task_queue (
name text not null,
dlq_name text,
Expand Down
3 changes: 3 additions & 0 deletions migrations/20241024174106_1.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
-- Make sure we're in the Underway schema for the migration.
set search_path to underway;

-- function to notify about task changes
create or replace function underway.task_change_notify()
returns trigger as $$
Expand Down
30 changes: 26 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@

#![warn(clippy::all, nonstandard_style, future_incompatible, missing_docs)]

use sqlx::migrate::Migrator;
use sqlx::{migrate::Migrator, PgPool};

pub use crate::{
job::{Job, To},
Expand All @@ -281,7 +281,16 @@ mod scheduler;
pub mod task;
pub mod worker;

/// A SQLx [`Migrator`] which provides Underway's schema migrations.
static MIGRATOR: Migrator = sqlx::migrate!();

/// Runs Underway migrations.
///
/// A transaction is acquired via the provided pool and migrations are run via
/// this transaction.
///
/// As no direct support for specifying the schema under which the migrations
/// table will live, we manually specify this via the search path. This ensures
/// that migrations are isolated to underway._sqlx_migrations.
///
/// These migrations must be applied before queues, tasks, and workers can be
/// run.
Expand All @@ -304,8 +313,21 @@ pub mod worker;
/// let pool = PgPool::connect(database_url).await?;
///
/// // Run migrations.
/// underway::MIGRATOR.run(&pool).await?;
/// underway::run_migrations(&pool).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
pub static MIGRATOR: Migrator = sqlx::migrate!();
pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;

// Manually specify "underway" search path to ensure the right migrations table
// is used.
sqlx::raw_sql("create schema if not exists underway; set search_path to underway;")
.execute(&mut *tx)
.await?;
MIGRATOR.run(&mut *tx).await?;

tx.commit().await?;

Ok(())
}

0 comments on commit b1691ad

Please sign in to comment.