diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 41b372d..0051a43 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/graceful_shutdown/src/main.rs b/examples/graceful_shutdown/src/main.rs index 7d406f6..e213e3a 100644 --- a/examples/graceful_shutdown/src/main.rs +++ b/examples/graceful_shutdown/src/main.rs @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box> { .await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/multitask/src/main.rs b/examples/multitask/src/main.rs index 867009b..47322bb 100644 --- a/examples/multitask/src/main.rs +++ b/examples/multitask/src/main.rs @@ -155,7 +155,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Create the task queue. let queue = Queue::builder() diff --git a/examples/rag/src/main.rs b/examples/rag/src/main.rs index 08c73ee..94afb0d 100644 --- a/examples/rag/src/main.rs +++ b/examples/rag/src/main.rs @@ -152,7 +152,7 @@ async fn main() -> Result<(), Box> { let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set"); let pool = PgPool::connect(database_url).await?; - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; let openai_client = Client::new(); diff --git a/examples/scheduled/src/main.rs b/examples/scheduled/src/main.rs index d753e9e..c436eed 100644 --- a/examples/scheduled/src/main.rs +++ b/examples/scheduled/src/main.rs @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(&database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/examples/step/src/main.rs b/examples/step/src/main.rs index fd5e557..80afe91 100644 --- a/examples/step/src/main.rs +++ b/examples/step/src/main.rs @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Create our job. let job = Job::builder() diff --git a/examples/tracing/src/main.rs b/examples/tracing/src/main.rs index b4c93d6..3f489d7 100644 --- a/examples/tracing/src/main.rs +++ b/examples/tracing/src/main.rs @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box> { let pool = PgPool::connect(database_url).await?; // Run migrations. - underway::MIGRATOR.run(&pool).await?; + underway::run_migrations(&pool).await?; // Build the job. let job = Job::builder() diff --git a/migrations/20240921151751_0.sql b/migrations/20240921151751_0.sql index 9bad654..52f1070 100644 --- a/migrations/20240921151751_0.sql +++ b/migrations/20240921151751_0.sql @@ -1,5 +1,12 @@ create schema if not exists underway; +-- Make sure we're in the Underway schema for the migration. +set search_path to underway; + +-- Manage Underway migrations within the Underway schema. +create table if not exists underway._sqlx_migrations +(like public._sqlx_migrations including all); + create table underway.task_queue ( name text not null, dlq_name text, diff --git a/migrations/20241024174106_1.sql b/migrations/20241024174106_1.sql index 0fed87d..53a19b7 100644 --- a/migrations/20241024174106_1.sql +++ b/migrations/20241024174106_1.sql @@ -1,3 +1,6 @@ +-- Make sure we're in the Underway schema for the migration. +set search_path to underway; + -- function to notify about task changes create or replace function underway.task_change_notify() returns trigger as $$ diff --git a/src/lib.rs b/src/lib.rs index daf36d1..adb27b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -265,7 +265,7 @@ #![warn(clippy::all, nonstandard_style, future_incompatible, missing_docs)] -use sqlx::migrate::Migrator; +use sqlx::{migrate::Migrator, PgPool}; pub use crate::{ job::{Job, To}, @@ -281,7 +281,16 @@ mod scheduler; pub mod task; pub mod worker; -/// A SQLx [`Migrator`] which provides Underway's schema migrations. +static MIGRATOR: Migrator = sqlx::migrate!(); + +/// Runs Underway migrations. +/// +/// A transaction is acquired via the provided pool and migrations are run via +/// this transaction. +/// +/// As no direct support for specifying the schema under which the migrations +/// table will live, we manually specify this via the search path. This ensures +/// that migrations are isolated to underway._sqlx_migrations. /// /// These migrations must be applied before queues, tasks, and workers can be /// run. @@ -304,8 +313,21 @@ pub mod worker; /// let pool = PgPool::connect(database_url).await?; /// /// // Run migrations. -/// underway::MIGRATOR.run(&pool).await?; +/// underway::run_migrations(&pool).await?; /// # Ok::<(), Box>(()) /// # }); /// # } -pub static MIGRATOR: Migrator = sqlx::migrate!(); +pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> { + let mut tx = pool.begin().await?; + + // Manually specify "underway" search path to ensure the right migrations table + // is used. + sqlx::raw_sql("create schema if not exists underway; set search_path to underway;") + .execute(&mut *tx) + .await?; + MIGRATOR.run(&mut *tx).await?; + + tx.commit().await?; + + Ok(()) +}