Skip to content

Commit

Permalink
Subgraph composition: Rework generation of the VID
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Jan 31, 2025
1 parent 9658d8c commit 1cb3401
Show file tree
Hide file tree
Showing 24 changed files with 536 additions and 277 deletions.
2 changes: 1 addition & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(key, entity, block.number)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
5 changes: 4 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1617,11 +1618,12 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
entity_cache.set(key, poi, block)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1657,6 +1659,7 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)?;
}

Expand Down
38 changes: 34 additions & 4 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::anyhow;
use anyhow::{anyhow, bail};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
Expand All @@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError};

pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;

// Number of VIDs that are reserved outside of the generated ones here.
// Currently none is used, but lets reserve a few more.
const RESERVED_VIDS: u32 = 100;

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Expand Down Expand Up @@ -105,6 +109,10 @@ pub struct EntityCache {
/// generated IDs, the `EntityCache` needs to be newly instantiated for
/// each block
seq: u32,

// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: u32,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -132,6 +140,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand All @@ -152,6 +161,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand Down Expand Up @@ -278,7 +288,7 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
if query.matches(key, &entity) =>
{
Ok(Some(entity.clone()))
}
Expand Down Expand Up @@ -349,10 +359,30 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
block: BlockNumber,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let mut entity = entity;
let old_vid = entity.set_vid(vid).expect("the vid should be set");
// Make sure that there was no VID previously set for this entity.
if let Some(ovid) = old_vid {
bail!(
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
ovid,
key.entity_type,
entity.id()
);
}

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down Expand Up @@ -489,7 +519,7 @@ impl EntityCache {
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/proof_of_indexing/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl ProofOfIndexing {
pub fn take(self) -> HashMap<Id, BlockEventStream> {
self.per_causality_region
}

pub fn get_block(&self) -> BlockNumber {
self.block_number
}
}

pub struct ProofOfIndexingFinisher {
Expand Down
25 changes: 24 additions & 1 deletion graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
derive::CacheWeight,
prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError},
runtime::gas::{Gas, GasSizeOf},
schema::{EntityKey, EntityType},
schema::{input::VID_FIELD, EntityKey, EntityType},
util::intern::{self, AtomPool},
util::intern::{Error as InternError, NullValue, Object},
};
Expand Down Expand Up @@ -910,6 +910,29 @@ impl Entity {
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
}

/// Return the VID of this entity and if its missing or of a type different than
/// i64 it panics.
pub fn vid(&self) -> i64 {
self.get(VID_FIELD)
.expect("the vid is set")
.as_int8()
.expect("the vid is set to a valid value")
}

/// Sets the VID of the entity. The previous one is returned.
pub fn set_vid(&mut self, value: i64) -> Result<Option<Value>, InternError> {
self.0.insert(VID_FIELD, value.into())
}

/// Sets the VID if it's not already set. Should be used only for tests.
#[cfg(debug_assertions)]
pub fn set_vid_if_empty(&mut self) {
let vid = self.get(VID_FIELD);
if vid.is_none() {
let _ = self.set_vid(100).expect("the vid should be set");
}
}

/// Merges an entity update `update` into this entity.
///
/// If a key exists in both entities, the value from `update` is chosen.
Expand Down
7 changes: 5 additions & 2 deletions graph/src/data/subgraph/api_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0);
// Enables eth call declarations and indexed arguments(topics) filtering in manifest
pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0);

// Enables subgraphs as datasource
// Enables subgraphs as datasource.
// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0);

// The latest spec version available
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0;
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_3_0;

pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);

Expand Down
3 changes: 3 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
const POI_DIGEST: &str = "digest";
/// The name of the PoI attribute for storing the block time
const POI_BLOCK_TIME: &str = "blockTime";
pub(crate) const VID_FIELD: &str = "vid";

pub mod kw {
pub const ENTITY: &str = "entity";
Expand Down Expand Up @@ -1597,6 +1598,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
pool.intern(POI_DIGEST);
pool.intern(POI_BLOCK_TIME);

pool.intern(VID_FIELD);

for definition in &document.definitions {
match definition {
s::Definition::TypeDefinition(typedef) => match typedef {
Expand Down
2 changes: 1 addition & 1 deletion graph/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod ast;
mod entity_key;
mod entity_type;
mod fulltext;
mod input;
pub(crate) mod input;

pub use api::{is_introspection_field, APISchemaError, INTROSPECTION_QUERY_TYPE};

Expand Down
12 changes: 6 additions & 6 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ async fn test_ipfs_block() {
// The user_data value we use with calls to ipfs_map
const USER_DATA: &str = "user_data";

fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }";
lazy_static! {
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
}
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA };
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid };
let key = THING_TYPE.parse_key(id).unwrap();
(
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
Expand Down Expand Up @@ -552,8 +552,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) {
let subgraph_id = "ipfsMap";

// Try it with two valid objects
let (str1, thing1) = make_thing("one", "eins");
let (str2, thing2) = make_thing("two", "zwei");
let (str1, thing1) = make_thing("one", "eins", 100);
let (str2, thing2) = make_thing("two", "zwei", 100);
let ops = run_ipfs_map(
ipfs.clone(),
subgraph_id,
Expand Down Expand Up @@ -1022,8 +1022,8 @@ async fn test_entity_store(api_version: Version) {

let schema = store.input_schema(&deployment.hash).unwrap();

let alex = entity! { schema => id: "alex", name: "Alex" };
let steve = entity! { schema => id: "steve", name: "Steve" };
let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64 };
let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64 };
let user_type = schema.entity_type("User").unwrap();
test_store::insert_entities(
&deployment,
Expand Down
2 changes: 1 addition & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(key, entity, block)?;

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,27 @@ impl Table {
Ok(cols)
}

// Currently the agregations entities don't have VIDs in insertion order
let vid_type = if self.object.is_object_type() {
"bigint"
} else {
"bigserial"
};

if self.immutable {
writeln!(
out,
"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block} int not null,\n\
{cols},
unique({id})
);",
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block = BLOCK_COLUMN,
id = self.primary_key().name
)
Expand All @@ -137,13 +145,14 @@ impl Table {
out,
r#"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block_range} int4range not null,
{cols}
);"#,
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block_range = BLOCK_RANGE_COLUMN
)?;

Expand Down
Loading

0 comments on commit 1cb3401

Please sign in to comment.