Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
fix(sqlite-db): Roll back if rollover::insert fails
Browse files Browse the repository at this point in the history
Previously we were allowing the following scenario:

- `RolloverCompleted` event is appended to the `events` table.
- `RolloverCompleted` event data fails to be appended to
  `rollover_completed_event_data` for whatever reason.

That scenario would leave us in a situation where the state between
the two tables would be inconsistent.

To fix it, we ensure that the rollback logic doesn't just cover
`rollover::insert::insert`, but rather the entirety of
`append_event`.
  • Loading branch information
luckysori committed Aug 17, 2022
1 parent 7009819 commit 3b78f61
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 42 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Drop connection to peer if we get a `yamux::ConnectionError` when opening a substream to them.
Dropping the connection should lead to a reconnect on the `taker`, which should fix problems where the `taker` has a broken connection to the `maker` for a long time.

### Fixed

- Ensure that rollover data is saved to the database atomically.
This fixes bugs related to accessing rollover data that is incomplete, either because the insertion hadn't finished yet or because part of the insertion had failed.

## [0.5.4] - 2022-08-05

### Added
Expand Down
7 changes: 5 additions & 2 deletions sqlite-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl Connection {
/// implements `Into<Option>` event.
pub async fn append_event(&self, event: impl Into<Option<CfdEvent>>) -> Result<()> {
let mut conn = self.inner.acquire().await?;
let mut db_tx = conn.begin().await?;

let event = match event.into() {
Some(event) => event,
Expand All @@ -252,7 +253,7 @@ impl Connection {
.bind(&event_name)
.bind(&event_data)
.bind(&timestamp)
.execute(&mut conn)
.execute(&mut db_tx)
.await?;

if query_result.rows_affected() != 1 {
Expand All @@ -261,9 +262,11 @@ impl Connection {

// if we have a rollover completed event we store it additionally in its own table
if let RolloverCompleted { .. } = event.event {
rollover::insert(&mut conn, query_result.last_insert_rowid(), event).await?;
rollover::insert(&mut db_tx, query_result.last_insert_rowid(), event).await?;
}

db_tx.commit().await?;

tracing::info!(event = %event_name, %order_id, "Appended event to database");

Ok(())
Expand Down
14 changes: 5 additions & 9 deletions sqlite-db/src/rollover/delete.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
use crate::models::OrderId;
use anyhow::Context;
use anyhow::Result;
use sqlx::Sqlite;
use sqlx::Transaction;
use sqlx::SqliteConnection;

pub(crate) async fn delete(
inner_transaction: &mut Transaction<'_, Sqlite>,
order_id: OrderId,
) -> Result<()> {
pub(crate) async fn delete(conn: &mut SqliteConnection, order_id: OrderId) -> Result<()> {
sqlx::query!(
r#"
delete from rollover_completed_event_data where cfd_id = (select id from cfds where cfds.order_id = $1)
"#,
order_id
)
.execute(&mut *inner_transaction)
.execute(&mut *conn)
.await
.with_context(|| format!("Failed to delete from rollover_completed_event_data for {order_id}"))?;

Expand All @@ -24,7 +20,7 @@ pub(crate) async fn delete(
"#,
order_id
)
.execute(&mut *inner_transaction)
.execute(&mut *conn)
.await
.with_context(|| format!("Failed to delete from revoked_commit_transactions for {order_id}"))?;

Expand All @@ -34,7 +30,7 @@ pub(crate) async fn delete(
"#,
order_id
)
.execute(&mut *inner_transaction)
.execute(&mut *conn)
.await
.with_context(|| format!("Failed to delete from open_cets for {order_id}"))?;

Expand Down
43 changes: 12 additions & 31 deletions sqlite-db/src/rollover/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,20 @@ use model::EventKind;
use model::FundingFee;
use model::RevokedCommit;
use models::BitMexPriceEventId;
use sqlx::pool::PoolConnection;
use sqlx::Connection as SqlxConnection;
use sqlx::Sqlite;
use sqlx::Transaction;
use sqlx::SqliteConnection;

pub async fn insert(
connection: &mut PoolConnection<Sqlite>,
event_id: i64,
event: CfdEvent,
) -> Result<()> {
pub async fn insert(conn: &mut SqliteConnection, event_id: i64, event: CfdEvent) -> Result<()> {
let event_kind = event.event;
match event_kind {
EventKind::RolloverCompleted {
dlc: Some(dlc),
funding_fee,
complete_fee,
} => {
let mut inner_transaction = connection.begin().await?;

crate::rollover::delete::delete(&mut inner_transaction, event.id.into()).await?;
crate::rollover::delete::delete(&mut *conn, event.id.into()).await?;

insert_rollover_completed_event_data(
&mut inner_transaction,
&mut *conn,
event_id,
&dlc,
funding_fee,
Expand All @@ -43,24 +34,14 @@ pub async fn insert(
.await?;

for revoked in dlc.revoked_commit {
insert_revoked_commit_transaction(&mut inner_transaction, event.id.into(), revoked)
.await?;
insert_revoked_commit_transaction(&mut *conn, event.id.into(), revoked).await?;
}

for (event_id, cets) in dlc.cets {
for cet in cets {
insert_cet(
&mut inner_transaction,
event_id.into(),
event.id.into(),
cet,
)
.await?;
insert_cet(&mut *conn, event_id.into(), event.id.into(), cet).await?;
}
}

// Commit the transaction to either write all or rollback
inner_transaction.commit().await?;
}
EventKind::RolloverCompleted { dlc: None, .. } => {
// We ignore rollover completed events without DLC data as we don't need to store
Expand All @@ -79,7 +60,7 @@ pub async fn insert(

/// Inserts RolloverCompleted data and returns the resulting rowid
async fn insert_rollover_completed_event_data(
inner_transaction: &mut Transaction<'_, Sqlite>,
conn: &mut SqliteConnection,
event_id: i64,
dlc: &Dlc,
funding_fee: FundingFee,
Expand Down Expand Up @@ -179,7 +160,7 @@ async fn insert_rollover_completed_event_data(
complete_fee,
complete_fee_flow,
)
.execute(&mut *inner_transaction)
.execute(&mut *conn)
.await?;

if query_result.rows_affected() != 1 {
Expand All @@ -189,7 +170,7 @@ async fn insert_rollover_completed_event_data(
}

async fn insert_revoked_commit_transaction(
inner_transaction: &mut Transaction<'_, Sqlite>,
conn: &mut SqliteConnection,
order_id: models::OrderId,
revoked: RevokedCommit,
) -> Result<()> {
Expand Down Expand Up @@ -231,7 +212,7 @@ async fn insert_revoked_commit_transaction(
complete_fee_flow,
revocation_sk_ours
)
.execute(&mut *inner_transaction)
.execute(&mut *conn)
.await?;

if query_result.rows_affected() != 1 {
Expand All @@ -241,7 +222,7 @@ async fn insert_revoked_commit_transaction(
}

async fn insert_cet(
db_transaction: &mut Transaction<'_, Sqlite>,
conn: &mut SqliteConnection,
event_id: BitMexPriceEventId,
order_id: models::OrderId,
cet: Cet,
Expand Down Expand Up @@ -278,7 +259,7 @@ async fn insert_cet(
range_end,
txid,
)
.execute(&mut *db_transaction)
.execute(&mut *conn)
.await?;

if query_result.rows_affected() != 1 {
Expand Down

0 comments on commit 3b78f61

Please sign in to comment.