Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: graphprotocol/graph-node
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 799a3e9dd029f4f83c077f81a7770d8210907cc2
Choose a base ref
..
head repository: graphprotocol/graph-node
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b98760f4b5805127daae80892725321e62d34aa6
Choose a head ref
Showing with 56 additions and 7 deletions.
  1. +4 −0 graph/src/schema/entity_type.rs
  2. +7 −0 graph/src/schema/input/mod.rs
  3. +28 −2 store/postgres/src/relational/prune.rs
  4. +17 −5 store/postgres/src/relational_queries.rs
4 changes: 4 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
@@ -150,6 +150,10 @@ impl EntityType {
pub fn is_object_type(&self) -> bool {
self.schema.is_object_type(self.atom)
}

pub fn new_vid_form(&self) -> bool {
self.schema.new_vid_form()
}
}

impl fmt::Display for EntityType {
7 changes: 7 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ use crate::data::graphql::{DirectiveExt, DocumentExt, ObjectTypeExt, TypeExt, Va
use crate::data::store::{
self, EntityValidationError, IdType, IntoEntityIterator, TryIntoEntityIterator, ValueType, ID,
};
use crate::data::subgraph::SPEC_VERSION_1_3_0;
use crate::data::value::Word;
use crate::derive::CheapClone;
use crate::prelude::q::Value;
@@ -955,6 +956,7 @@ pub struct Inner {
pool: Arc<AtomPool>,
/// A list of all timeseries types by interval
agg_mappings: Box<[AggregationMapping]>,
spec_version: Version,
}

impl InputSchema {
@@ -1042,6 +1044,7 @@ impl InputSchema {
enum_map,
pool,
agg_mappings,
spec_version: spec_version.clone(),
}),
})
}
@@ -1585,6 +1588,10 @@ impl InputSchema {
}?;
Some(EntityType::new(self.cheap_clone(), obj_type.name))
}

pub fn new_vid_form(&self) -> bool {
self.inner.spec_version >= SPEC_VERSION_1_3_0
}
}

/// Create a new pool that contains the names of all the types defined
30 changes: 28 additions & 2 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,12 @@ use graph::{
};
use itertools::Itertools;

use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table};
use crate::{
catalog,
copy::AdaptiveBatchSize,
deployment,
relational::{Table, VID_COLUMN},
};

use super::{Catalog, Layout, Namespace};

@@ -68,6 +73,7 @@ struct TablePair {
// has the same name as `src` but is in a different namespace
dst: Arc<Table>,
src_nsp: Namespace,
dst_nsp: Namespace,
}

impl TablePair {
@@ -94,7 +100,12 @@ impl TablePair {
}
conn.batch_execute(&query)?;

Ok(TablePair { src, dst, src_nsp })
Ok(TablePair {
src,
dst,
src_nsp,
dst_nsp,
})
}

/// Copy all entity versions visible between `earliest_block` and
@@ -228,6 +239,12 @@ impl TablePair {
let src_qname = &self.src.qualified_name;
let dst_qname = &self.dst.qualified_name;
let src_nsp = &self.src_nsp;
let dst_nsp = &self.dst_nsp;

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.new_vid_form();

let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
@@ -237,6 +254,15 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
)?;
}

writeln!(query, "drop table {src_qname};")?;
writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?;
conn.transaction(|conn| conn.batch_execute(&query))?;
22 changes: 17 additions & 5 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
@@ -2519,6 +2519,8 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let new_vid_form = self.table.object.new_vid_form();

// Construct a query
// insert into schema.table(column, ...)
// values
@@ -2544,7 +2546,9 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}
out.push_sql(") values\n");

for (i, row) in self.rows.iter().enumerate() {
@@ -2562,8 +2566,10 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
if new_vid_form {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
out.push_sql(")");
}

@@ -5061,6 +5067,8 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.new_vid_form();

// Construct a query
// insert into {dst}({columns})
// select {columns} from {src}
@@ -5081,7 +5089,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(")\nselect ");
for column in &self.columns {
@@ -5147,7 +5157,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(" from ");
out.push_sql(self.src.qualified_name.as_str());