From 30f15ef987015f1d159860754a284a50f4c2227f Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 31 Jan 2025 17:40:49 +0500 Subject: [PATCH] patch transactions overlap in append_decided_leaves() --- sequencer/src/persistence/sql.rs | 398 ++++++++++++++++++------------- 1 file changed, 229 insertions(+), 169 deletions(-) diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index a736d1c0af..26b883f208 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -9,14 +9,14 @@ use espresso_types::{ BackoffParams, Leaf, NetworkConfig, Payload, }; use futures::stream::StreamExt; -use hotshot_query_service::data_source::storage::sql::Write; use hotshot_query_service::data_source::{ storage::{ pruning::PrunerCfg, - sql::{include_migrations, query_as, Config, SqlStorage, Transaction}, + sql::{include_migrations, query_as, Config, SqlStorage}, }, Transaction as _, VersionedDataSource, }; +use hotshot_types::traits::block_contents::BlockHeader; use hotshot_types::{ consensus::CommitmentMap, data::{DaProposal, QuorumProposal, VidDisperseShare}, @@ -329,6 +329,231 @@ impl Persistence { .await?; tx.commit().await } + + async fn generate_decide_events(&self, consumer: &impl EventConsumer) -> anyhow::Result<()> { + let mut last_processed_view: Option = self + .db + .read() + .await? + .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1") + .await? + .map(|row| row.get("last_processed_view")); + loop { + // overlapping read and write transactions can lead to database errors. To + // avoid this: + // - start a read transaction to query and collect all the necessary data. + // - Commit (or implicitly drop) the read transaction once the data is fetched. + // - use the collected data to generate a "decide" event for the consumer. + // - begin a write transaction to delete the data and update the event stream. + let mut tx = self.db.read().await?; + + // Collect a chain of consecutive leaves, starting from the first view after the last + // decide. This will correspond to a decide event, and defines a range of views which + // can be garbage collected. This may even include views for which there was no leaf, + // for which we might still have artifacts like proposals that never finalized. + let from_view = match last_processed_view { + Some(v) => v + 1, + None => 0, + }; + + let mut parent = None; + let mut rows = query("SELECT leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view") + .bind(from_view) + .fetch(tx.as_mut()); + let mut leaves = vec![]; + let mut final_qc = None; + while let Some(row) = rows.next().await { + let row = match row { + Ok(row) => row, + Err(err) => { + // If there's an error getting a row, try generating an event with the rows + // we do have. + tracing::warn!("error loading row: {err:#}"); + break; + } + }; + + let leaf_data: Vec = row.get("leaf"); + let leaf = bincode::deserialize::(&leaf_data)?; + let qc_data: Vec = row.get("qc"); + let qc = bincode::deserialize::>(&qc_data)?; + let height = leaf.block_header().block_number(); + + // Ensure we are only dealing with a consecutive chain of leaves. We don't want to + // garbage collect any views for which we missed a leaf or decide event; at least + // not right away, in case we need to recover that data later. + if let Some(parent) = parent { + if height != parent + 1 { + tracing::debug!( + height, + parent, + "ending decide event at non-consecutive leaf" + ); + break; + } + } + parent = Some(height); + leaves.push(leaf); + final_qc = Some(qc); + } + drop(rows); + + let Some(final_qc) = final_qc else { + // End event processing when there are no more decided views. + tracing::debug!(from_view, "no new leaves at decide"); + return Ok(()); + }; + + // Find the range of views encompassed by this leaf chain. All data in this range can be + // processed by the consumer and then deleted. + let from_view = leaves[0].view_number(); + let to_view = leaves[leaves.len() - 1].view_number(); + + // Collect VID shares for the decide event. + let mut vid_shares = tx + .fetch_all( + query("SELECT view, data FROM vid_share where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await? + .into_iter() + .map(|row| { + let view: i64 = row.get("view"); + let data: Vec = row.get("data"); + let vid_proposal = bincode::deserialize::< + Proposal>, + >(&data)?; + Ok((view as u64, vid_proposal.data)) + }) + .collect::>>()?; + + // Collect DA proposals for the decide event. + let mut da_proposals = tx + .fetch_all( + query("SELECT view, data FROM da_proposal where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await? + .into_iter() + .map(|row| { + let view: i64 = row.get("view"); + let data: Vec = row.get("data"); + let da_proposal = + bincode::deserialize::>>(&data)?; + Ok((view as u64, da_proposal.data)) + }) + .collect::>>()?; + + drop(tx); + + // Collate all the information by view number and construct a chain of leaves. + let leaf_chain = leaves + .into_iter() + // Go in reverse chronological order, as expected by Decide events. + .rev() + .map(|mut leaf| { + let view = leaf.view_number(); + + // Include the VID share if available. + let vid_share = vid_shares.remove(&view); + if vid_share.is_none() { + tracing::debug!(?view, "VID share not available at decide"); + } + + // Fill in the full block payload using the DA proposals we had persisted. + if let Some(proposal) = da_proposals.remove(&view) { + let payload = + Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); + leaf.fill_block_payload_unchecked(payload); + } else if view == ViewNumber::genesis() { + // We don't get a DA proposal for the genesis view, but we know what the + // payload always is. + leaf.fill_block_payload_unchecked(Payload::empty().0); + } else { + tracing::debug!(?view, "DA proposal not available at decide"); + } + + LeafInfo { + leaf, + vid_share: vid_share.map(Into::into), + // Note: the following fields are not used in Decide event processing, and + // should be removed. For now, we just default them. + state: Default::default(), + delta: Default::default(), + } + }) + .collect(); + + // Generate decide event for the consumer. + tracing::debug!(?to_view, ?final_qc, ?leaf_chain, "generating decide event"); + consumer + .handle_event(&Event { + view_number: to_view, + event: EventType::Decide { + leaf_chain: Arc::new(leaf_chain), + qc: Arc::new(final_qc), + block_size: None, + }, + }) + .await?; + + let mut tx = self.db.write().await?; + + // Now that we have definitely processed leaves up to `to_view`, we can update + // `last_processed_view` so we don't process these leaves again. We may still fail at + // this point, or shut down, and fail to complete this update. At worst this will lead + // to us sending a duplicate decide event the next time we are called; this is fine as + // the event consumer is required to be idempotent. + tx.upsert( + "event_stream", + ["id", "last_processed_view"], + ["id"], + [(1i32, to_view.u64() as i64)], + ) + .await?; + + // Delete the data that has been fully processed. + tx.execute( + query("DELETE FROM vid_share where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + tx.execute( + query("DELETE FROM da_proposal where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + tx.execute( + query("DELETE FROM quorum_proposals where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + tx.execute( + query("DELETE FROM quorum_certificate where view >= $1 AND view <= $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + + // Clean up leaves, but do not delete the most recent one (all leaves with a view number + // less than the given value). This is necessary to ensure that, in case of a restart, + // we can resume from the last decided leaf. + tx.execute( + query("DELETE FROM anchor_leaf WHERE view >= $1 AND view < $2") + .bind(from_view.u64() as i64) + .bind(to_view.u64() as i64), + ) + .await?; + + tx.commit().await?; + last_processed_view = Some(to_view.u64() as i64); + } + } } #[async_trait] @@ -401,13 +626,12 @@ impl SequencerPersistence for Persistence { // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer // need. - let consumer = dyn_clone::clone(consumer); - let tx = self.db.write().await?; - if let Err(err) = collect_garbage(tx, view, consumer).await { + if let Err(err) = self.generate_decide_events(consumer).await { // GC/event processing failure is not an error, since by this point we have at least // managed to persist the decided leaves successfully, and GC will just run again at the // next decide. Log an error but do not return it. tracing::warn!(?view, "GC/event processing failed: {err:#}"); + return Ok(()); } Ok(()) @@ -689,170 +913,6 @@ impl SequencerPersistence for Persistence { } } -async fn collect_garbage( - mut tx: Transaction, - view: ViewNumber, - consumer: impl EventConsumer, -) -> anyhow::Result<()> { - // Clean up and collect VID shares. - - let mut vid_shares = tx - .fetch_all( - query("DELETE FROM vid_share where view <= $1 RETURNING view, data") - .bind(view.u64() as i64), - ) - .await? - .into_iter() - .map(|row| { - let view: i64 = row.get("view"); - let data: Vec = row.get("data"); - let vid_proposal = - bincode::deserialize::>>(&data)?; - Ok((view as u64, vid_proposal.data)) - }) - .collect::>>()?; - - // Clean up and collect DA proposals. - let mut da_proposals = tx - .fetch_all( - query("DELETE FROM da_proposal where view <= $1 RETURNING view, data") - .bind(view.u64() as i64), - ) - .await? - .into_iter() - .map(|row| { - let view: i64 = row.get("view"); - let data: Vec = row.get("data"); - let da_proposal = - bincode::deserialize::>>(&data)?; - Ok((view as u64, da_proposal.data)) - }) - .collect::>>()?; - - // Clean up and collect leaves, except do not delete the most recent leaf: we need to remember - // this so that in case we restart, we can pick up from the last decided leaf. We still do - // include this leaf in the query results (the `UNION` clause) so we can include it in the - // decide event we send to the consumer. - let mut leaves = tx - .fetch_all( - query("SELECT view, leaf, qc FROM anchor_leaf WHERE view <= $1") - .bind(view.u64() as i64), - ) - .await? - .into_iter() - .map(|row| { - let view: i64 = row.get("view"); - let leaf_data: Vec = row.get("leaf"); - let leaf = bincode::deserialize::(&leaf_data)?; - let qc_data: Vec = row.get("qc"); - let qc = bincode::deserialize::>(&qc_data)?; - Ok((view as u64, (leaf, qc))) - }) - .collect::>>()?; - - tx.execute(query("DELETE FROM anchor_leaf WHERE view < $1").bind(view.u64() as i64)) - .await?; - - // Clean up old proposals. These are not part of the decide event we generate for the consumer, - // so we don't need to return them. - tx.execute(query("DELETE FROM quorum_proposals where view <= $1").bind(view.u64() as i64)) - .await?; - - // Exclude from the decide event any leaves which have definitely already been processed. We may - // have selected an already-processed leaf because the oldest leaf -- the last leaf processed in - // the previous decide event -- remained in the database to serve as the anchor leaf, so our - // query would have returned it. In fact, this will almost always be the case, but there are two - // cases where it might not be, and we must process this leaf after all: - // - // 1. The oldest leaf is the genesis leaf, and there _is_ no previous decide event - // 2. We previously stored some leaves in the database and then failed while processing the - // decide event, or shut down before generating the decide event, and so we are only now - // generating the decide event for those previous leaves. - // - // Since these cases (particularly case 2) are hard to account for explicitly, we just use a - // persistent value in the database to remember how far we have successfully processed the event - // stream. - let last_processed_view: Option = tx - .fetch_optional(query( - "SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1", - )) - .await? - .map(|row| row.get("last_processed_view")); - let leaves = if let Some(v) = last_processed_view { - let new_leaves = leaves.split_off(&((v as u64) + 1)); - if !leaves.is_empty() { - tracing::debug!( - v, - remaining_leaves = new_leaves.len(), - ?leaves, - "excluding already-processed leaves from decide event" - ); - } - new_leaves - } else { - leaves - }; - - // Generate a decide event for each leaf, to be processed by the event consumer. We make a - // separate event for each leaf because it is possible we have non-consecutive leaves in our - // storage, which would not be valid as a single decide with a single leaf chain. - for (view, (mut leaf, qc)) in leaves { - // Include the VID share if available. - let vid_share = vid_shares.remove(&view); - if vid_share.is_none() { - tracing::debug!(view, "VID share not available at decide"); - } - - // Fill in the full block payload using the DA proposals we had persisted. - if let Some(proposal) = da_proposals.remove(&view) { - let payload = Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); - leaf.fill_block_payload_unchecked(payload); - } else if view == ViewNumber::genesis().u64() { - // We don't get a DA proposal for the genesis view, but we know what the payload always - // is. - leaf.fill_block_payload_unchecked(Payload::empty().0); - } else { - tracing::debug!(view, "DA proposal not available at decide"); - } - - let leaf_info = LeafInfo { - leaf, - vid_share, - - // Note: the following fields are not used in Decide event processing, and - // should be removed. For now, we just default them. - state: Default::default(), - delta: Default::default(), - }; - tracing::debug!(?view, ?qc, ?leaf_info, "generating decide event"); - consumer - .handle_event(&Event { - view_number: ViewNumber::new(view), - event: EventType::Decide { - leaf_chain: Arc::new(vec![leaf_info]), - qc: Arc::new(qc), - block_size: None, - }, - }) - .await?; - } - - // Now that we have definitely processed leaves up to `view`, we can update - // `last_processed_view` so we don't process these leaves again. We may still fail at this - // point, or shut down, and fail to complete this update. At worst this will lead to us sending - // a duplicate decide event the next time we are called; this is fine as the event consumer is - // required to be idempotent. - tx.upsert( - "event_stream", - ["id", "last_processed_view"], - ["id"], - [(1i32, view.u64() as i64)], - ) - .await?; - - tx.commit().await -} - #[cfg(test)] mod testing { use hotshot_query_service::data_source::storage::sql::testing::TmpDb;