Skip to content

Commit

Permalink
Subgraph composition: Rework generation of the VID
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 authored and zorancv committed Jan 31, 2025
1 parent cca4d6f commit 8aa3585
Show file tree
Hide file tree
Showing 24 changed files with 530 additions and 279 deletions.
1 change: 1 addition & 0 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ where
state.entity_cache.set(
key,
entity,
block.number,
Some(&mut state.write_capacity_remaining),
)?;
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,7 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1684,11 +1685,12 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi, None)
entity_cache.set(key, poi, block, None)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1724,6 +1726,7 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)?;
}

Expand Down
32 changes: 27 additions & 5 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::anyhow;
use anyhow::{anyhow, bail};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
Expand All @@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError};

pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;

// Number of VIDs that are reserved outside of the generated ones here.
// Currently none is used, but lets reserve a few more.
const RESERVED_VIDS: u32 = 100;

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Expand Down Expand Up @@ -105,6 +109,10 @@ pub struct EntityCache {
/// generated IDs, the `EntityCache` needs to be newly instantiated for
/// each block
seq: u32,

// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: u32,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -132,6 +140,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand All @@ -152,6 +161,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand Down Expand Up @@ -278,7 +288,7 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
if query.matches(key, &entity) =>
{
Ok(Some(entity.clone()))
}
Expand Down Expand Up @@ -353,14 +363,14 @@ impl EntityCache {
&mut self,
key: EntityKey,
entity: Entity,
block: BlockNumber,
write_capacity_remaining: Option<&mut usize>,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

if let Some(write_capacity_remaining) = write_capacity_remaining {
let weight = entity.weight();

if !self.current.contains_key(&key) && weight > *write_capacity_remaining {
return Err(anyhow!(
"exceeded block write limit when writing entity `{}`",
Expand All @@ -371,7 +381,19 @@ impl EntityCache {
*write_capacity_remaining -= weight;
}

self.entity_op(key.clone(), EntityOp::Update(entity));
// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
let mut entity = entity;
let old_vid = entity.set_vid(vid).expect("the vid should be set");
// Make sure that there was no VID previously set for this entity.
if let Some(ovid) = old_vid {
bail!(
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
ovid,
key.entity_type,
entity.id()
);
}

// The updates we were given are not valid by themselves; force a
// lookup in the database and check again with an entity that merges
Expand Down Expand Up @@ -507,7 +529,7 @@ impl EntityCache {
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/proof_of_indexing/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl ProofOfIndexing {
pub fn take(self) -> HashMap<Id, BlockEventStream> {
self.per_causality_region
}

pub fn get_block(&self) -> BlockNumber {
self.block_number
}
}

pub struct ProofOfIndexingFinisher {
Expand Down
25 changes: 24 additions & 1 deletion graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
derive::CacheWeight,
prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError},
runtime::gas::{Gas, GasSizeOf},
schema::{EntityKey, EntityType},
schema::{input::VID_FIELD, EntityKey, EntityType},
util::intern::{self, AtomPool},
util::intern::{Error as InternError, NullValue, Object},
};
Expand Down Expand Up @@ -910,6 +910,29 @@ impl Entity {
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
}

/// Return the VID of this entity and if its missing or of a type different than
/// i64 it panics.
pub fn vid(&self) -> i64 {
self.get(VID_FIELD)
.expect("the vid is set")
.as_int8()
.expect("the vid is set to a valid value")
}

/// Sets the VID of the entity. The previous one is returned.
pub fn set_vid(&mut self, value: i64) -> Result<Option<Value>, InternError> {
self.0.insert(VID_FIELD, value.into())
}

/// Sets the VID if it's not already set. Should be used only for tests.
#[cfg(debug_assertions)]
pub fn set_vid_if_empty(&mut self) {
let vid = self.get(VID_FIELD);
if vid.is_none() {
let _ = self.set_vid(100).expect("the vid should be set");
}
}

/// Merges an entity update `update` into this entity.
///
/// If a key exists in both entities, the value from `update` is chosen.
Expand Down
7 changes: 5 additions & 2 deletions graph/src/data/subgraph/api_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0);
// Enables eth call declarations and indexed arguments(topics) filtering in manifest
pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0);

// Enables subgraphs as datasource
// Enables subgraphs as datasource.
// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0);

// The latest spec version available
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0;
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_3_0;

pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);

Expand Down
3 changes: 3 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
const POI_DIGEST: &str = "digest";
/// The name of the PoI attribute for storing the block time
const POI_BLOCK_TIME: &str = "blockTime";
pub(crate) const VID_FIELD: &str = "vid";

pub mod kw {
pub const ENTITY: &str = "entity";
Expand Down Expand Up @@ -1597,6 +1598,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
pool.intern(POI_DIGEST);
pool.intern(POI_BLOCK_TIME);

pool.intern(VID_FIELD);

for definition in &document.definitions {
match definition {
s::Definition::TypeDefinition(typedef) => match typedef {
Expand Down
2 changes: 1 addition & 1 deletion graph/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod ast;
mod entity_key;
mod entity_type;
mod fulltext;
mod input;
pub(crate) mod input;

pub use api::{is_introspection_field, APISchemaError, INTROSPECTION_QUERY_TYPE};

Expand Down
12 changes: 6 additions & 6 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ async fn test_ipfs_block() {
// The user_data value we use with calls to ipfs_map
const USER_DATA: &str = "user_data";

fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }";
lazy_static! {
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
}
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA };
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid };
let key = THING_TYPE.parse_key(id).unwrap();
(
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
Expand Down Expand Up @@ -553,8 +553,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) {
let subgraph_id = "ipfsMap";

// Try it with two valid objects
let (str1, thing1) = make_thing("one", "eins");
let (str2, thing2) = make_thing("two", "zwei");
let (str1, thing1) = make_thing("one", "eins", 100);
let (str2, thing2) = make_thing("two", "zwei", 100);
let ops = run_ipfs_map(
subgraph_id,
format!("{}\n{}", str1, str2),
Expand Down Expand Up @@ -1001,8 +1001,8 @@ async fn test_entity_store(api_version: Version) {

let schema = store.input_schema(&deployment.hash).unwrap();

let alex = entity! { schema => id: "alex", name: "Alex" };
let steve = entity! { schema => id: "steve", name: "Steve" };
let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64 };
let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64 };
let user_type = schema.entity_type("User").unwrap();
test_store::insert_entities(
&deployment,
Expand Down
9 changes: 6 additions & 3 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,12 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state
.entity_cache
.set(key, entity, Some(&mut state.write_capacity_remaining))?;
state.entity_cache.set(
key,
entity,
block,
Some(&mut state.write_capacity_remaining),
)?;

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,27 @@ impl Table {
Ok(cols)
}

// Currently the agregations entities don't have VIDs in insertion order
let vid_type = if self.object.is_object_type() {
"bigint"
} else {
"bigserial"
};

if self.immutable {
writeln!(
out,
"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block} int not null,\n\
{cols},
unique({id})
);",
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block = BLOCK_COLUMN,
id = self.primary_key().name
)
Expand All @@ -137,13 +145,14 @@ impl Table {
out,
r#"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block_range} int4range not null,
{cols}
);"#,
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block_range = BLOCK_RANGE_COLUMN
)?;

Expand Down
Loading

0 comments on commit 8aa3585

Please sign in to comment.