From b2b2596a0fde4138e773f4ce3364638b0f79ac83 Mon Sep 17 00:00:00 2001 From: Alex Parrill Date: Thu, 3 Oct 2024 16:37:39 -0400 Subject: [PATCH] Change sqlx-sqlite to check for uniqueness error instead of checking for existence (#52) Currently, the sqlx-sqlite store has an issue with databases running using the WAL journal if multiple requests come in at the same time, causing a "database is locked" error. The sequence is roughly this: 1. Two connections call `SqliteStore::create`, at the same time. 2. Concurrently, they both call `SqliteStore::id_exists`. Both lock the database for reading and sets their "end mark" to the current database state. 3. Connection 1 calls `save_with_conn` first and commits its transaction. This upgrades its lock to a write lock and advances the database state - connection 2's end mark is still before this update. 4. Connection 2 calls `save_with_conn`. It attempts to upgrade its read lock to a write lock, but fails since the database has been modified since the read lock was acquired, and sqlite returns a "database is locked" error. This patch fixes the issue by removing the `if_exists` check and instead attempts to insert the ID, check if the insert fails with a uniqueness error, and regenerate the ID if so. With this, all connections immediately lock the database for writing and will wait for each other to finish before inserting. It should also be more performant in the common case of the generated ID being unique. --- sqlx-store/src/sqlite_store.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/sqlx-store/src/sqlite_store.rs b/sqlx-store/src/sqlite_store.rs index 5382e77..de21995 100644 --- a/sqlx-store/src/sqlite_store.rs +++ b/sqlx-store/src/sqlite_store.rs @@ -68,19 +68,30 @@ impl SqliteStore { Ok(()) } - async fn id_exists(&self, conn: &mut SqliteConnection, id: &Id) -> session_store::Result { + async fn try_create_with_conn( + &self, + conn: &mut SqliteConnection, + record: &Record, + ) -> session_store::Result { let query = format!( r#" - select exists(select 1 from {table_name} where id = ?) + insert or abort into {table_name} + (id, data, expiry_date) values (?, ?, ?) "#, table_name = self.table_name ); + let res = sqlx::query(&query) + .bind(record.id.to_string()) + .bind(rmp_serde::to_vec(record).map_err(SqlxStoreError::Encode)?) + .bind(record.expiry_date) + .execute(conn) + .await; - Ok(sqlx::query_scalar(&query) - .bind(id.to_string()) - .fetch_one(conn) - .await - .map_err(SqlxStoreError::Sqlx)?) + match res { + Ok(_) => Ok(true), + Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Ok(false), + Err(e) => Err(SqlxStoreError::Sqlx(e).into()), + } } async fn save_with_conn( @@ -133,10 +144,9 @@ impl SessionStore for SqliteStore { async fn create(&self, record: &mut Record) -> session_store::Result<()> { let mut tx = self.pool.begin().await.map_err(SqlxStoreError::Sqlx)?; - while self.id_exists(&mut tx, &record.id).await? { + while !self.try_create_with_conn(&mut tx, record).await? { record.id = Id::default(); // Generate a new ID } - self.save_with_conn(&mut tx, record).await?; tx.commit().await.map_err(SqlxStoreError::Sqlx)?;