From 950a0fd2d3dbe39a97855ee7835d91689d09037d Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Thu, 16 Jan 2025 16:12:08 +0100 Subject: [PATCH] remove VID wrapping --- graph/src/components/store/entity_cache.rs | 31 +++---- graph/src/components/store/mod.rs | 4 +- graph/src/components/store/write.rs | 29 ++---- graph/src/data/store/mod.rs | 38 +++----- runtime/test/src/test.rs | 4 +- server/index-node/src/resolver.rs | 2 +- store/postgres/src/relational.rs | 5 +- store/postgres/src/relational_queries.rs | 11 ++- store/test-store/src/store.rs | 7 +- store/test-store/tests/graph/entity_cache.rs | 91 +++++++++++-------- store/test-store/tests/graphql/query.rs | 7 +- .../test-store/tests/postgres/aggregation.rs | 18 ++-- store/test-store/tests/postgres/graft.rs | 10 +- store/test-store/tests/postgres/relational.rs | 6 +- .../tests/postgres/relational_bytes.rs | 10 +- store/test-store/tests/postgres/store.rs | 60 ++++++------ store/test-store/tests/postgres/writable.rs | 27 +++--- 17 files changed, 179 insertions(+), 181 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 6a539203b45..2bb4c7b1791 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::components::store::{self as s, Entity, EntityOperation}; -use crate::data::store::{EntityV, EntityValidationError, Id, IdType, IntoEntityIterator}; +use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator}; use crate::prelude::ENV_VARS; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; @@ -33,8 +33,8 @@ pub enum GetScope { #[derive(Debug, Clone)] enum EntityOp { Remove, - Update(EntityV), - Overwrite(EntityV), + Update(Entity), + Overwrite(Entity), } impl EntityOp { @@ -45,7 +45,7 @@ impl EntityOp { use EntityOp::*; match (self, entity) { (Remove, _) => Ok(None), - (Overwrite(new), _) | (Update(new), None) => Ok(Some(new.e)), + (Overwrite(new), _) | (Update(new), None) => Ok(Some(new)), (Update(updates), Some(entity)) => { let mut e = entity.borrow().clone(); e.merge_remove_null_fields(updates)?; @@ -69,7 +69,7 @@ impl EntityOp { match self { // This is how `Overwrite` is constructed, by accumulating `Update` onto `Remove`. Remove => *self = Overwrite(update), - Update(current) | Overwrite(current) => current.e.merge(update.e), + Update(current) | Overwrite(current) => current.merge(update), } } } @@ -288,9 +288,9 @@ impl EntityCache { ) -> Result, anyhow::Error> { match op { EntityOp::Update(entity) | EntityOp::Overwrite(entity) - if query.matches(key, &entity.e) => + if query.matches(key, &entity) => { - Ok(Some(entity.e.clone())) + Ok(Some(entity.clone())) } EntityOp::Remove => Ok(None), _ => Ok(None), @@ -371,7 +371,10 @@ impl EntityCache { // The next VID is based on a block number and a sequence within the block let vid = ((block as i64) << 32) + self.vid_seq as i64; self.vid_seq += 1; - let entity = EntityV::new(entity, vid); + let mut entity = entity; + let old_vid = entity.set_vid(vid).expect("the vid should be set"); + // Make sure that there was no VID previously set for this entity. + assert!(old_vid.is_none()); self.entity_op(key.clone(), EntityOp::Update(entity)); @@ -478,22 +481,19 @@ impl EntityCache { // Entity was created (None, EntityOp::Update(mut updates)) | (None, EntityOp::Overwrite(mut updates)) => { - let vid = updates.vid; - updates.e.remove_null_fields(); - let data = Arc::new(updates.e.clone()); + updates.remove_null_fields(); + let data = Arc::new(updates); self.current.insert(key.clone(), Some(data.cheap_clone())); Some(Insert { key, data, block, end: None, - vid, }) } // Entity may have been changed (Some(current), EntityOp::Update(updates)) => { let mut data = current.as_ref().clone(); - let vid = updates.vid; data.merge_remove_null_fields(updates) .map_err(|e| key.unknown_attribute(e))?; let data = Arc::new(data); @@ -504,7 +504,6 @@ impl EntityCache { data, block, end: None, - vid, }) } else { None @@ -512,8 +511,7 @@ impl EntityCache { } // Entity was removed and then updated, so it will be overwritten (Some(current), EntityOp::Overwrite(data)) => { - let vid = data.vid; - let data = Arc::new(data.e.clone()); + let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); if current != data { Some(Overwrite { @@ -521,7 +519,6 @@ impl EntityCache { data, block, end: None, - vid, }) } else { None diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 9713b78c150..31b0e62cfae 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -30,7 +30,7 @@ use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::constraint_violation; use crate::data::store::scalar::Bytes; -use crate::data::store::{EntityV, Id, IdList, Value}; +use crate::data::store::{Id, IdList, Value}; use crate::data::value::Word; use crate::data_source::CausalityRegion; use crate::derive::CheapClone; @@ -829,7 +829,7 @@ where pub enum EntityOperation { /// Locates the entity specified by `key` and sets its attributes according to the contents of /// `data`. If no entity exists with this key, creates a new entity. - Set { key: EntityKey, data: EntityV }, + Set { key: EntityKey, data: Entity }, /// Removes an entity with the specified key, if one exists. Remove { key: EntityKey }, diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 6dd2cda472b..d4632c8410f 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -45,7 +45,6 @@ pub enum EntityModification { data: Arc, block: BlockNumber, end: Option, - vid: i64, }, /// Update the entity by overwriting it Overwrite { @@ -53,7 +52,6 @@ pub enum EntityModification { data: Arc, block: BlockNumber, end: Option, - vid: i64, }, /// Remove the entity Remove { key: EntityKey, block: BlockNumber }, @@ -69,7 +67,6 @@ pub struct EntityWrite<'a> { // The end of the block range for which this write is valid. The value // of `end` itself is not included in the range pub end: Option, - pub vid: i64, } impl std::fmt::Display for EntityWrite<'_> { @@ -92,28 +89,24 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> { data, block, end, - vid, } => Ok(EntityWrite { id: &key.entity_id, entity: data, causality_region: key.causality_region, block: *block, end: *end, - vid: *vid, }), EntityModification::Overwrite { key, data, block, end, - vid, } => Ok(EntityWrite { id: &key.entity_id, entity: &data, causality_region: key.causality_region, block: *block, end: *end, - vid: *vid, }), EntityModification::Remove { .. } => Err(()), @@ -220,13 +213,11 @@ impl EntityModification { data, block, end, - vid, } => Ok(Insert { key, data, block, end, - vid, }), Remove { key, .. } => { return Err(constraint_violation!( @@ -280,23 +271,21 @@ impl EntityModification { } impl EntityModification { - pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self { + pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self { EntityModification::Insert { key, data: Arc::new(data), block, end: None, - vid, } } - pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self { + pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self { EntityModification::Overwrite { key, data: Arc::new(data), block, end: None, - vid, } } @@ -1028,36 +1017,32 @@ mod test { let value = value.clone(); let key = THING_TYPE.parse_key("one").unwrap(); - let vid = 0; + let vid = 0i64; match value { Ins(block) => EntityModification::Insert { key, - data: Arc::new(entity! { SCHEMA => id: "one", count: block }), + data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }), block, end: None, - vid, }, Ovw(block) => EntityModification::Overwrite { key, - data: Arc::new(entity! { SCHEMA => id: "one", count: block }), + data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }), block, end: None, - vid, }, Rem(block) => EntityModification::Remove { key, block }, InsC(block, end) => EntityModification::Insert { key, - data: Arc::new(entity! { SCHEMA => id: "one", count: block }), + data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }), block, end: Some(end), - vid, }, OvwC(block, end) => EntityModification::Overwrite { key, - data: Arc::new(entity! { SCHEMA => id: "one", count: block }), + data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }), block, end: Some(end), - vid, }, } } diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index 86e255b23e7..cd16c8ee5e7 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -735,9 +735,6 @@ where lazy_static! { /// The name of the id attribute, `"id"` pub static ref ID: Word = Word::from("id"); - - /// The name of the vid attribute, `"vid"` - pub static ref VID: Word = Word::from("vid"); } /// An entity is represented as a map of attribute names to values. @@ -922,14 +919,18 @@ impl Entity { .expect("the vid is set to a valid value") } - /// This version of the function returns 0 if the VID is not set. It should be - /// used only in the testing code for more lenient definition of entities. + /// Sets the VID of the entity. The previous one is returned. + pub fn set_vid(&mut self, value: i64) -> Result, InternError> { + self.0.insert("vid", value.into()) + } + + /// Sets the VID if not set. Should be used only for tests. #[cfg(debug_assertions)] - pub fn vid_or_default(&self) -> i64 { - self.get("vid") - .unwrap_or(&Value::Int8(100)) - .as_int8() - .expect("the vid is set to a valid value") + pub fn set_vid_if_empty(&mut self) { + let vid = self.get("vid"); + if vid.is_none() { + let _ = self.set_vid(100).expect("the vid should be set"); + } } /// Merges an entity update `update` into this entity. @@ -946,8 +947,8 @@ impl Entity { /// If a key exists in both entities, the value from `update` is chosen. /// If a key only exists on one entity, the value from that entity is chosen. /// If a key is set to `Value::Null` in `update`, the key/value pair is removed. - pub fn merge_remove_null_fields(&mut self, update: EntityV) -> Result<(), InternError> { - for (key, value) in update.e.0.into_iter() { + pub fn merge_remove_null_fields(&mut self, update: Entity) -> Result<(), InternError> { + for (key, value) in update.into_iter() { match value { Value::Null => self.0.remove(&key), _ => self.0.insert(&key, value)?, @@ -1098,19 +1099,6 @@ impl std::fmt::Debug for Entity { } } -/// An entity wrapper that has VID too. -#[derive(Debug, Clone, CacheWeight, PartialEq, Eq, Serialize)] -pub struct EntityV { - pub e: Entity, - pub vid: i64, -} - -impl EntityV { - pub fn new(e: Entity, vid: i64) -> Self { - Self { e, vid } - } -} - /// An object that is returned from a query. It's a an `r::Value` which /// carries the attributes of the object (`__typename`, `id` etc.) and /// possibly a pointer to its parent if the query that constructed it is one diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index 8acbf48b5e8..b926562e0d4 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -482,11 +482,11 @@ fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) { static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter"); static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap(); } - let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA}; + let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid }; let key = THING_TYPE.parse_key(id).unwrap(); ( format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value), - EntityModification::insert(key, data, 0, vid), + EntityModification::insert(key, data, 0), ) } diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 870d319dcf1..fb3937afdc2 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -768,7 +768,7 @@ fn entity_changes_to_graphql(entity_changes: Vec) -> r::Value { .push(key.entity_id); } EntityOperation::Set { key, data } => { - updates.entry(key.entity_type).or_default().push(data.e); + updates.entry(key.entity_type).or_default().push(data); } } } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 3fff8c8dae7..f6a14c3a5fa 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -64,7 +64,7 @@ use crate::{ }, }; use graph::components::store::DerivedEntityQuery; -use graph::data::store::{EntityV, Id, IdList, IdType, BYTES_SCALAR}; +use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR}; use graph::data::subgraph::schema::POI_TABLE; use graph::prelude::{ anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityOperation, Logger, @@ -697,10 +697,9 @@ impl Layout { let entity_id = data.id(); processed_entities.insert((entity_type.clone(), entity_id.clone())); - let vid = data.vid(); changes.push(EntityOperation::Set { key: entity_type.key_in(entity_id, CausalityRegion::from_entity(&data)), - data: EntityV::new(data, vid), + data, }); } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 681266f6a0e..b9fb1088924 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -539,7 +539,14 @@ impl EntityData { // table column; those will be things like the // block_range that `select *` pulls in but that we // don't care about here - if let Some(column) = table.column(&SqlName::verbatim(key)) { + if key == "vid" { + // VID is not in the input schema but we need it so deserialize it too + match T::Value::from_column_value(&ColumnType::Int8, json) { + Ok(value) if value.is_null() => None, + Ok(value) => Some(Ok((Word::from("vid"), value))), + Err(e) => Some(Err(e)), + } + } else if let Some(column) = table.column(&SqlName::verbatim(key)) { match T::Value::from_column_value(&column.column_type, json) { Ok(value) if value.is_null() => None, Ok(value) => Some(Ok((Word::from(column.field.to_string()), value))), @@ -2450,7 +2457,7 @@ impl<'a> InsertRow<'a> { } let br_value = BlockRangeValue::new(table, row.block, row.end); let causality_region = row.causality_region; - let vid = row.vid; + let vid = row.entity.vid(); Ok(Self { values, br_value, diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 30017db3d21..5475a29db2c 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -6,7 +6,6 @@ use graph::components::store::BlockStore; use graph::data::graphql::load_manager::LoadManager; use graph::data::query::QueryResults; use graph::data::query::QueryTarget; -use graph::data::store::EntityV; use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError}; use graph::data::subgraph::SubgraphFeature; use graph::data_source::DataSource; @@ -423,11 +422,11 @@ pub async fn insert_entities( deployment: &DeploymentLocator, entities: Vec<(EntityType, Entity)>, ) -> Result<(), StoreError> { - let insert_ops = entities.into_iter().map(|(entity_type, data)| { - let vid = data.vid_or_default(); + let insert_ops = entities.into_iter().map(|(entity_type, mut data)| { + data.set_vid_if_empty(); EntityOperation::Set { key: entity_type.key(data.id()), - data: EntityV::new(data, vid), + data, } }); diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 2d210ea25ce..ab3cc5ed02e 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -4,7 +4,7 @@ use graph::components::store::{ DeploymentCursorTracker, DerivedEntityQuery, GetScope, LoadRelatedRequest, ReadStore, StoredDynamicDataSource, WritableStore, }; -use graph::data::store::{EntityV, Id}; +use graph::data::store::Id; use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth}; use graph::data_source::CausalityRegion; use graph::schema::{EntityKey, EntityType, InputSchema}; @@ -207,24 +207,27 @@ fn insert_modifications() { let store = Arc::new(store); let mut cache = EntityCache::new(store); - let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" }; + let mut mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" }; let mogwai_key = make_band_key("mogwai"); cache .set(mogwai_key.clone(), mogwai_data.clone(), 0) .unwrap(); - let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; + let mut sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; let sigurros_key = make_band_key("sigurros"); cache .set(sigurros_key.clone(), sigurros_data.clone(), 0) .unwrap(); + mogwai_data.set_vid(100).unwrap(); + sigurros_data.set_vid(101).unwrap(); + let result = cache.as_modifications(0); assert_eq!( sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![ - EntityModification::insert(mogwai_key, mogwai_data, 0, 100), - EntityModification::insert(sigurros_key, sigurros_data, 0, 101) + EntityModification::insert(mogwai_key, mogwai_data, 0), + EntityModification::insert(sigurros_key, sigurros_data, 0) ]) ); } @@ -253,24 +256,27 @@ fn overwrite_modifications() { let store = Arc::new(store); let mut cache = EntityCache::new(store); - let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; + let mut mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; let mogwai_key = make_band_key("mogwai"); cache .set(mogwai_key.clone(), mogwai_data.clone(), 0) .unwrap(); - let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 }; + let mut sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994}; let sigurros_key = make_band_key("sigurros"); cache .set(sigurros_key.clone(), sigurros_data.clone(), 0) .unwrap(); + mogwai_data.set_vid(100).unwrap(); + sigurros_data.set_vid(101).unwrap(); + let result = cache.as_modifications(0); assert_eq!( sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![ - EntityModification::overwrite(mogwai_key, mogwai_data, 0, 100), - EntityModification::overwrite(sigurros_key, sigurros_data, 0, 101) + EntityModification::overwrite(mogwai_key, mogwai_data, 0), + EntityModification::overwrite(sigurros_key, sigurros_data, 0) ]) ); } @@ -307,9 +313,8 @@ fn consecutive_modifications() { sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![EntityModification::overwrite( update_key, - entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }, + entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995, vid: 101i64 }, 0, - 100 )]) ); } @@ -465,20 +470,26 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator fn create_account_entity(id: &str, name: &str, email: &str, age: i32, vid: i64) -> EntityOperation { let test_entity = - entity! { LOAD_RELATED_SUBGRAPH => id: id, name: name, email: email, age: age }; + entity! { LOAD_RELATED_SUBGRAPH => id: id, name: name, email: email, age: age, vid: vid}; EntityOperation::Set { key: ACCOUNT_TYPE.parse_key(id).unwrap(), - data: EntityV::new(test_entity, vid), + data: test_entity, } } -fn create_wallet_entity(id: &str, account_id: &Id, balance: i32) -> Entity { +fn create_wallet_entity(id: &str, account_id: &Id, balance: i32, vid: i64) -> Entity { let account_id = Value::from(account_id.clone()); - entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance } + entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance, vid: vid} } + +fn create_wallet_entity_no_vid(id: &str, account_id: &Id, balance: i32) -> Entity { + let account_id = Value::from(account_id.clone()); + entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance} +} + fn create_wallet_operation(id: &str, account_id: &Id, balance: i32, vid: i64) -> EntityOperation { - let test_wallet = EntityV::new(create_wallet_entity(id, account_id, balance), vid); + let test_wallet = create_wallet_entity(id, account_id, balance, vid); EntityOperation::Set { key: WALLET_TYPE.parse_key(id).unwrap(), data: test_wallet, @@ -496,9 +507,9 @@ fn check_for_account_with_multiple_wallets() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("1", &account_id, 67_i32); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_1 = create_wallet_entity("1", &account_id, 67_i32, 1); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -516,7 +527,7 @@ fn check_for_account_with_single_wallet() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("4", &account_id, 32_i32); + let wallet_1 = create_wallet_entity("4", &account_id, 32_i32, 4); let expeted_vec = vec![wallet_1]; assert_eq!(result, expeted_vec); @@ -600,9 +611,9 @@ fn check_for_insert_async_store() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("4", &account_id, 32_i32); - let wallet_2 = create_wallet_entity("5", &account_id, 79_i32); - let wallet_3 = create_wallet_entity("6", &account_id, 200_i32); + let wallet_1 = create_wallet_entity("4", &account_id, 32_i32, 4); + let wallet_2 = create_wallet_entity("5", &account_id, 79_i32, 12); + let wallet_3 = create_wallet_entity("6", &account_id, 200_i32, 13); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -632,9 +643,9 @@ fn check_for_insert_async_not_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("1", &account_id, 67_i32); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_1 = create_wallet_entity("1", &account_id, 67_i32, 1); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -652,7 +663,7 @@ fn check_for_update_async_related() { EntityOperation::Set { ref data, .. } => data.clone(), _ => unreachable!(), }; - assert_ne!(writable.get(&entity_key).unwrap().unwrap(), new_data.e); + assert_ne!(writable.get(&entity_key).unwrap().unwrap(), new_data); // insert a new wallet transact_entity_operations( &store, @@ -670,9 +681,9 @@ fn check_for_update_async_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); - let expeted_vec = vec![new_data.e, wallet_2, wallet_3]; + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); + let expeted_vec = vec![new_data, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); }); @@ -700,8 +711,8 @@ fn check_for_delete_async_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -713,26 +724,30 @@ fn scoped_get() { // Key for an existing entity that is in the store let account1 = ACCOUNT_TYPE.parse_id("1").unwrap(); let key1 = WALLET_TYPE.parse_key("1").unwrap(); - let wallet1 = create_wallet_entity("1", &account1, 67); + let wallet1 = create_wallet_entity_no_vid("1", &account1, 67); // Create a new entity that is not in the store let account5 = ACCOUNT_TYPE.parse_id("5").unwrap(); - let wallet5 = create_wallet_entity("5", &account5, 100); + let mut wallet5 = create_wallet_entity_no_vid("5", &account5, 100); let key5 = WALLET_TYPE.parse_key("5").unwrap(); cache.set(key5.clone(), wallet5.clone(), 0).unwrap(); + wallet5.set_vid(100).unwrap(); // For the new entity, we can retrieve it with either scope let act5 = cache.get(&key5, GetScope::InBlock).unwrap(); assert_eq!(Some(&wallet5), act5.as_ref().map(|e| e.as_ref())); let act5 = cache.get(&key5, GetScope::Store).unwrap(); assert_eq!(Some(&wallet5), act5.as_ref().map(|e| e.as_ref())); + let mut wallet1a = wallet1.clone(); + wallet1a.set_vid(1).unwrap(); // For an entity in the store, we can not get it `InBlock` but with // `Store` let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); assert_eq!(None, act1); let act1 = cache.get(&key1, GetScope::Store).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!(Some(&wallet1a), act1.as_ref().map(|e| e.as_ref())); + // Even after reading from the store, the entity is not visible with // `InBlock` let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); @@ -741,10 +756,12 @@ fn scoped_get() { let mut wallet1 = wallet1; wallet1.set("balance", 70).unwrap(); cache.set(key1.clone(), wallet1.clone(), 0).unwrap(); + wallet1a = wallet1; + wallet1a.set_vid(101).unwrap(); let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!(Some(&wallet1a), act1.as_ref().map(|e| e.as_ref())); let act1 = cache.get(&key1, GetScope::Store).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!(Some(&wallet1a), act1.as_ref().map(|e| e.as_ref())); }) } diff --git a/store/test-store/tests/graphql/query.rs b/store/test-store/tests/graphql/query.rs index 5358249040d..d7e7dec8f55 100644 --- a/store/test-store/tests/graphql/query.rs +++ b/store/test-store/tests/graphql/query.rs @@ -1,7 +1,6 @@ use graph::blockchain::{Block, BlockTime}; use graph::data::query::Trace; use graph::data::store::scalar::Timestamp; -use graph::data::store::EntityV; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::subgraph::LATEST_VERSION; use graph::entity; @@ -425,11 +424,11 @@ async fn insert_test_entities( .into_iter() .map(|(typename, entities)| { let entity_type = schema.entity_type(typename).unwrap(); - entities.into_iter().map(move |data| { - let vid = data.vid_or_default(); + entities.into_iter().map(move |mut data| { + data.set_vid_if_empty(); EntityOperation::Set { key: entity_type.key(data.id()), - data: EntityV::new(data, vid), + data, } }) }) diff --git a/store/test-store/tests/postgres/aggregation.rs b/store/test-store/tests/postgres/aggregation.rs index de47b88d6a1..5c0420a495a 100644 --- a/store/test-store/tests/postgres/aggregation.rs +++ b/store/test-store/tests/postgres/aggregation.rs @@ -1,7 +1,6 @@ use std::fmt::Write; use std::{future::Future, sync::Arc}; -use graph::data::store::EntityV; use graph::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, components::{ @@ -80,14 +79,11 @@ pub async fn insert( let schema = ReadStore::input_schema(store); let ops = entities .into_iter() - .map(|data| { + .map(|mut data| { let data_type = schema.entity_type("Data").unwrap(); let key = data_type.key(data.id()); - let vid = data.vid_or_default(); - EntityOperation::Set { - data: EntityV::new(data, vid), - key, - } + let _ = data.set_vid_if_empty(); + EntityOperation::Set { data, key } }) .collect(); @@ -178,10 +174,10 @@ fn stats_hour(schema: &InputSchema) -> Vec> { let block2 = vec![ entity! { schema => id: 11i64, timestamp: ts2, token: TOKEN1.clone(), sum: bd(3), sum_sq: bd(5), max: bd(10), first: bd(10), last: bd(2), - value: bd(14), totalValue: bd(14) }, + value: bd(14), totalValue: bd(14), vid: 1i64 }, entity! { schema => id: 12i64, timestamp: ts2, token: TOKEN2.clone(), sum: bd(3), sum_sq: bd(5), max: bd(20), first: bd(1), last: bd(20), - value: bd(41), totalValue: bd(41) }, + value: bd(41), totalValue: bd(41), vid: 2i64 }, ]; let ts3 = BlockTime::since_epoch(3600, 0); @@ -191,10 +187,10 @@ fn stats_hour(schema: &InputSchema) -> Vec> { let mut v2 = vec![ entity! { schema => id: 21i64, timestamp: ts3, token: TOKEN1.clone(), sum: bd(3), sum_sq: bd(9), max: bd(30), first: bd(30), last: bd(30), - value: bd(90), totalValue: bd(104) }, + value: bd(90), totalValue: bd(104), vid: 3i64 }, entity! { schema => id: 22i64, timestamp: ts3, token: TOKEN2.clone(), sum: bd(3), sum_sq: bd(9), max: bd(3), first: bd(3), last: bd(3), - value: bd(9), totalValue: bd(50)}, + value: bd(9), totalValue: bd(50), vid: 4i64 }, ]; v1.append(&mut v2); v1 diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 59cbe5e3f0a..0d741b1b26c 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -9,7 +9,7 @@ use graph::components::store::{ DeploymentLocator, EntityOrder, EntityQuery, PruneReporter, PruneRequest, PruningStrategy, VersionStats, }; -use graph::data::store::{scalar, EntityV, Id}; +use graph::data::store::{scalar, Id}; use graph::data::subgraph::schema::*; use graph::data::subgraph::*; use graph::semver::Version; @@ -257,13 +257,14 @@ fn create_test_entity( seconds_age: age * 31557600, weight: Value::BigDecimal(weight.into()), coffee: coffee, - favorite_color: favorite_color + favorite_color: favorite_color, + vid: vid, }; let entity_type = TEST_SUBGRAPH_SCHEMA.entity_type(entity_type).unwrap(); EntityOperation::Set { key: entity_type.parse_key(id).unwrap(), - data: EntityV::new(test_entity, vid), + data: test_entity, } } @@ -329,9 +330,10 @@ async fn check_graft( // Make our own entries for block 2 shaq.set("email", "shaq@gmail.com").unwrap(); + let _ = shaq.set_vid(3); let op = EntityOperation::Set { key: user_type.parse_key("3").unwrap(), - data: EntityV::new(shaq, 3), + data: shaq, }; transact_and_wait(&store, &deployment, BLOCKS[2].clone(), vec![op]) .await diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index 8a2e5a2f3d9..b7568b492a1 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -205,11 +205,13 @@ lazy_static! { bigInt: big_int.clone(), bigIntArray: vec![big_int.clone(), (big_int + 1.into())], color: "yellow", + vid: 0i64, } }; static ref EMPTY_NULLABLESTRINGS_ENTITY: Entity = { entity! { THINGS_SCHEMA => id: "one", + vid: 0i64, } }; static ref SCALAR_TYPE: EntityType = THINGS_SCHEMA.entity_type("Scalar").unwrap(); @@ -495,7 +497,7 @@ fn create_schema(conn: &mut PgConnection) -> Layout { fn scrub(entity: &Entity) -> Entity { let mut scrubbed = entity.clone(); scrubbed.remove_null_fields(); - scrubbed.remove("vid"); + // scrubbed.remove("vid"); scrubbed } @@ -756,7 +758,6 @@ fn serialize_bigdecimal() { ) .expect("Failed to read Scalar[one]") .unwrap(); - entity.remove("vid"); assert_entity_eq!(entity, actual); } }); @@ -918,7 +919,6 @@ fn conflicting_entity() { data: fred, block: 2, end: None, - vid: 0, }, 2, ) diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index 6897dc9e914..c60a59335c3 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -84,9 +84,9 @@ pub fn row_group_update( ) -> RowGroup { let mut group = RowGroup::new(entity_type.clone(), false); for (key, data) in data { - let vid = data.vid_or_default(); + // let vid = data.vid_or_default(); group - .push(EntityModification::overwrite(key, data, block, vid), block) + .push(EntityModification::overwrite(key, data, block), block) .unwrap(); } group @@ -99,9 +99,9 @@ pub fn row_group_insert( ) -> RowGroup { let mut group = RowGroup::new(entity_type.clone(), false); for (key, data) in data { - let vid = data.vid_or_default(); + // let vid = data.vid_or_default(); group - .push(EntityModification::insert(key, data, block, vid), block) + .push(EntityModification::insert(key, data, block), block) .unwrap(); } group @@ -162,7 +162,7 @@ fn create_schema(conn: &mut PgConnection) -> Layout { fn scrub(entity: &Entity) -> Entity { let mut scrubbed = entity.clone(); scrubbed.remove_null_fields(); - scrubbed.remove("vid"); + // scrubbed.remove("vid"); scrubbed } diff --git a/store/test-store/tests/postgres/store.rs b/store/test-store/tests/postgres/store.rs index 22e473e3810..be7f3cf550b 100644 --- a/store/test-store/tests/postgres/store.rs +++ b/store/test-store/tests/postgres/store.rs @@ -2,7 +2,6 @@ use graph::blockchain::block_stream::FirehoseCursor; use graph::blockchain::BlockTime; use graph::data::graphql::ext::TypeDefinitionExt; use graph::data::query::QueryTarget; -use graph::data::store::EntityV; use graph::data::subgraph::schema::DeploymentCreate; use graph::data_source::common::MappingABI; use graph::futures01::{future, Stream}; @@ -290,11 +289,12 @@ fn create_test_entity( weight: Value::BigDecimal(weight.into()), coffee: coffee, favorite_color: favorite_color, + vid: vid, }; EntityOperation::Set { key: entity_type.parse_key(id).unwrap(), - data: EntityV::new(test_entity, vid), + data: test_entity, } } @@ -358,6 +358,7 @@ fn get_entity_1() { seconds_age: Value::BigInt(BigInt::from(2114359200)), weight: Value::BigDecimal(184.4.into()), coffee: false, + vid: 0i64 }; // "favorite_color" was set to `Null` earlier and should be absent @@ -383,6 +384,7 @@ fn get_entity_3() { seconds_age: Value::BigInt(BigInt::from(883612800)), weight: Value::BigDecimal(111.7.into()), coffee: false, + vid: 3_i64, }; // "favorite_color" was set to `Null` earlier and should be absent @@ -444,7 +446,7 @@ fn update_existing() { }; // Verify that the entity before updating is different from what we expect afterwards - assert_ne!(writable.get(&entity_key).unwrap().unwrap(), new_data.e); + assert_ne!(writable.get(&entity_key).unwrap().unwrap(), new_data); // Set test entity; as the entity already exists an update should be performed let count = get_entity_count(store.clone(), &deployment.hash); @@ -459,16 +461,13 @@ fn update_existing() { assert_eq!(count, get_entity_count(store.clone(), &deployment.hash)); // Verify that the entity in the store has changed to what we have set. - let bin_name = match new_data.e.get("bin_name") { + let bin_name = match new_data.get("bin_name") { Some(Value::Bytes(bytes)) => bytes.clone(), _ => unreachable!(), }; - new_data - .e - .insert("bin_name", Value::Bytes(bin_name)) - .unwrap(); - assert_eq!(writable.get(&entity_key).unwrap(), Some(new_data.e)); + new_data.insert("bin_name", Value::Bytes(bin_name)).unwrap(); + assert_eq!(writable.get(&entity_key).unwrap(), Some(new_data)); }) } @@ -478,7 +477,8 @@ fn partially_update_existing() { let entity_key = USER_TYPE.parse_key("1").unwrap(); let schema = writable.input_schema(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 11i64 }; let original_entity = writable .get(&entity_key) @@ -492,7 +492,7 @@ fn partially_update_existing() { TEST_BLOCK_3_PTR.clone(), vec![EntityOperation::Set { key: entity_key.clone(), - data: EntityV::new(partial_entity.clone(), 11), + data: partial_entity.clone(), }], ) .await @@ -1088,7 +1088,8 @@ fn revert_block_with_partial_update() { let entity_key = USER_TYPE.parse_key("1").unwrap(); let schema = writable.input_schema(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 5i64 }; let original_entity = writable.get(&entity_key).unwrap().expect("missing entity"); @@ -1099,7 +1100,7 @@ fn revert_block_with_partial_update() { TEST_BLOCK_3_PTR.clone(), vec![EntityOperation::Set { key: entity_key.clone(), - data: EntityV::new(partial_entity.clone(), 5), + data: partial_entity, }], ) .await @@ -1183,7 +1184,8 @@ fn revert_block_with_dynamic_data_source_operations() { // Create operations to add a user let user_key = USER_TYPE.parse_key("1").unwrap(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 5i64 }; // Get the original user for comparisons let original_user = writable.get(&user_key).unwrap().expect("missing entity"); @@ -1194,7 +1196,7 @@ fn revert_block_with_dynamic_data_source_operations() { let ops = vec![EntityOperation::Set { key: user_key.clone(), - data: EntityV::new(partial_entity.clone(), 5), + data: partial_entity.clone(), }]; // Add user and dynamic data source to the store @@ -1315,9 +1317,13 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { TEST_BLOCK_1_PTR.clone(), added_entities .iter() - .map(|(id, data)| EntityOperation::Set { - key: USER_TYPE.parse_key(id.as_str()).unwrap(), - data: EntityV::new(data.clone(), data.vid_or_default()), + .map(|(id, data)| { + let mut data = data.clone(); + data.set_vid_if_empty(); + EntityOperation::Set { + key: USER_TYPE.parse_key(id.as_str()).unwrap(), + data, + } }) .collect(), ) @@ -1325,10 +1331,10 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { .unwrap(); // Update an entity in the store - let updated_entity = entity! { schema => id: "1", name: "Johnny" }; + let updated_entity = entity! { schema => id: "1", name: "Johnny", vid: 7i64 }; let update_op = EntityOperation::Set { key: USER_TYPE.parse_key("1").unwrap(), - data: EntityV::new(updated_entity.clone(), 7), + data: updated_entity.clone(), }; // Delete an entity in the store @@ -1523,11 +1529,11 @@ fn handle_large_string_with_index() { block: BlockNumber, vid: i64, ) -> EntityModification { - let data = entity! { schema => id: id, name: name }; + let data = entity! { schema => id: id, name: name, vid: vid }; let key = USER_TYPE.parse_key(id).unwrap(); - EntityModification::insert(key, data, block, vid) + EntityModification::insert(key, data, block) } run_test(|store, writable, deployment| async move { @@ -1568,6 +1574,7 @@ fn handle_large_string_with_index() { ) .await .expect("Failed to insert large text"); + writable.flush().await.unwrap(); let query = user_query() @@ -1623,11 +1630,11 @@ fn handle_large_bytea_with_index() { block: BlockNumber, vid: i64, ) -> EntityModification { - let data = entity! { schema => id: id, bin_name: scalar::Bytes::from(name) }; + let data = entity! { schema => id: id, bin_name: scalar::Bytes::from(name), vid: vid }; let key = USER_TYPE.parse_key(id).unwrap(); - EntityModification::insert(key, data, block, vid) + EntityModification::insert(key, data, block) } run_test(|store, writable, deployment| async move { @@ -1832,11 +1839,12 @@ fn window() { age: i32, vid: i64, ) -> EntityOperation { - let entity = entity! { TEST_SUBGRAPH_SCHEMA => id: id, age: age, favorite_color: color }; + let entity = + entity! { TEST_SUBGRAPH_SCHEMA => id: id, age: age, favorite_color: color, vid: vid }; EntityOperation::Set { key: entity_type.parse_key(id).unwrap(), - data: EntityV::new(entity, vid), + data: entity, } } diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index dbc66e5e401..2228c148d25 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -1,5 +1,4 @@ use graph::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; -use graph::data::store::EntityV; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::value::Word; use graph::data_source::CausalityRegion; @@ -143,12 +142,13 @@ async fn insert_count( let count_key_local = |counter_type: &EntityType, id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => id: "1", - count: count as i32 + count: count as i32, + vid: block as i64, }; let entity_op = if block != 3 && block != 5 && block != 7 { EntityOperation::Set { key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), - data: EntityV::new(data, block.into()), + data, } } else { EntityOperation::Remove { @@ -164,10 +164,11 @@ async fn insert_count( let data = entity! { TEST_SUBGRAPH_SCHEMA => id: &block.to_string(), count :count as i32, + vid: block as i64, }; let entity_op = EntityOperation::Set { key: count_key_local(&COUNTER2_TYPE, &data.get("id").unwrap().to_string()), - data: EntityV::new(data, block.into()), + data, }; ops.push(entity_op); } @@ -296,7 +297,7 @@ fn restart() { // Cause an error by leaving out the non-nullable `count` attribute let entity_ops = vec![EntityOperation::Set { key: count_key("1"), - data: EntityV::new(entity! { schema => id: "1"}, 0), + data: entity! { schema => id: "1", vid: 0i64}, }]; transact_entity_operations( &subgraph_store, @@ -320,7 +321,7 @@ fn restart() { // Retry our write with correct data let entity_ops = vec![EntityOperation::Set { key: count_key("1"), - data: EntityV::new(entity! { schema => id: "1", count: 1}, 0), + data: entity! { schema => id: "1", count: 1, vid: 0i64}, }]; // `SubgraphStore` caches the correct writable so that this call // uses the restarted writable, and is equivalent to using @@ -342,13 +343,13 @@ fn restart() { fn read_range_test() { run_test(|store, writable, sourceable, deployment| async move { let result_entities = vec![ - r#"(1, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }])"#, - r#"(2, [EntitySourceOperation { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2") }, vid: 2 }])"#, - r#"(3, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3") }, vid: 3 }])"#, - r#"(4, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 4 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4") }, vid: 4 }])"#, - r#"(5, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 4 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5") }, vid: 5 }])"#, - r#"(6, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 6 }])"#, - r#"(7, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 6 }])"#, + r#"(1, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1"), vid: Int8(1) }, vid: 1 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1"), vid: Int8(1) }, vid: 1 }])"#, + r#"(2, [EntitySourceOperation { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1"), vid: Int8(2) }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2"), vid: Int8(2) }, vid: 2 }])"#, + r#"(3, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1"), vid: Int8(2) }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3"), vid: Int8(3) }, vid: 3 }])"#, + r#"(4, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1"), vid: Int8(4) }, vid: 4 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4"), vid: Int8(4) }, vid: 4 }])"#, + r#"(5, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1"), vid: Int8(4) }, vid: 4 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5"), vid: Int8(5) }, vid: 5 }])"#, + r#"(6, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1"), vid: Int8(6) }, vid: 6 }])"#, + r#"(7, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1"), vid: Int8(6) }, vid: 6 }])"#, ]; let subgraph_store = store.subgraph_store(); writable.deployment_synced().unwrap();