From c00ffa67f7fc06b198a2a39d1ff8a123b2536f29 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 5 Sep 2024 10:52:24 +0100 Subject: [PATCH] agent: sourceCapture configuration for schema and delta updates --- .../agent/src/controllers/materialization.rs | 19 ++++-- crates/agent/src/controllers/mod.rs | 5 ++ ...st__materialization-status-round-trip.snap | 16 ++++- ...controllers__test__status_json_schema.snap | 55 ++++++++++++++++ crates/agent/src/integration_tests/harness.rs | 2 +- .../src/integration_tests/source_captures.rs | 28 +++++++- crates/agent/src/publications/specs.rs | 2 +- crates/agent/src/resource_configs.rs | 32 ++++++++-- crates/models/src/lib.rs | 8 ++- crates/models/src/materializations.rs | 17 +++-- crates/models/src/source_capture.rs | 64 +++++++++++++++++++ 11 files changed, 222 insertions(+), 26 deletions(-) create mode 100644 crates/models/src/source_capture.rs diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index 108faefc3a..25ab0357db 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -9,7 +9,7 @@ use crate::{ }; use anyhow::Context; use itertools::Itertools; -use models::{ModelDef, OnIncompatibleSchemaChange}; +use models::{ModelDef, OnIncompatibleSchemaChange, SourceCaptureDef}; use proto_flow::materialize::response::validated::constraint::Type as ConstraintType; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -57,14 +57,17 @@ impl MaterializationStatus { } } - if let Some(source_capture_name) = &model.source_capture { + if let Some(source_capture) = &model.source_capture { // If the source capture has been deleted, we will have already handled that as a // part of `handle_deleted_dependencies`. if let Some(source_capture_model) = - dependencies.live.captures.get_by_key(source_capture_name) + dependencies.live.captures.get_by_key(&source_capture.capture_name()) { if self.source_capture.is_none() { - self.source_capture = Some(SourceCaptureStatus::default()); + self.source_capture = Some(SourceCaptureStatus { + source_capture: source_capture.def(), + ..Default::default() + }); } let source_capture_status = self.source_capture.as_mut().unwrap(); // Source capture errors are terminal @@ -259,10 +262,10 @@ fn handle_deleted_dependencies( if drafted .source_capture .as_ref() - .map(|sc| deleted.contains(sc.as_str())) + .map(|sc| deleted.contains(sc.capture_name().as_str())) .unwrap_or(false) { - let capture_name = drafted.source_capture.take().unwrap(); + let capture_name = drafted.source_capture.take().unwrap().capture_name(); source_capture.take(); descriptions.push(format!( r#"removed sourceCapture: "{capture_name}" because the capture was deleted"# @@ -284,6 +287,7 @@ fn is_false(b: &bool) -> bool { /// Status information about the `sourceCapture` #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, JsonSchema)] pub struct SourceCaptureStatus { + pub source_capture: SourceCaptureDef, /// Whether the materialization bindings are up-to-date with respect to /// the `sourceCapture` bindings. In normal operation, this should always /// be `true`. Otherwise, there will be a controller `error` and the @@ -363,6 +367,7 @@ impl SourceCaptureStatus { // Failures here are terminal update_linked_materialization( + &self.source_capture, resource_spec_pointers, &self.add_bindings, draft_row.model.as_mut().unwrap(), @@ -391,6 +396,7 @@ fn get_bindings_to_add( } fn update_linked_materialization( + source_capture: &SourceCaptureDef, resource_spec_pointers: ResourceSpecPointers, bindings_to_add: &BTreeSet, materialization: &mut models::MaterializationDef, @@ -398,6 +404,7 @@ fn update_linked_materialization( for collection_name in bindings_to_add { let mut resource_spec = serde_json::json!({}); crate::resource_configs::update_materialization_resource_spec( + source_capture, &mut resource_spec, &resource_spec_pointers, &collection_name, diff --git a/crates/agent/src/controllers/mod.rs b/crates/agent/src/controllers/mod.rs index cc77e65c1f..4b2e658466 100644 --- a/crates/agent/src/controllers/mod.rs +++ b/crates/agent/src/controllers/mod.rs @@ -462,6 +462,7 @@ mod test { use std::collections::{BTreeSet, VecDeque}; use chrono::TimeZone; + use models::{SourceCaptureDef, Capture}; use super::*; use crate::controllers::materialization::SourceCaptureStatus; @@ -508,6 +509,10 @@ mod test { source_capture: Some(SourceCaptureStatus { up_to_date: false, add_bindings, + source_capture: SourceCaptureDef { + capture: Capture::new("snails/capture"), + ..Default::default() + } }), publications: PublicationStatus { target_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]), diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap index e420588046..95eddfd412 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap @@ -7,6 +7,13 @@ StatusSnapshot { MaterializationStatus { source_capture: Some( SourceCaptureStatus { + source_capture: SourceCaptureDef { + capture: Capture( + "snails/capture", + ), + schema_mode: LeaveEmpty, + delta_updates: false, + }, up_to_date: false, add_bindings: { Collection( @@ -69,11 +76,18 @@ StatusSnapshot { }, }, ), - json: "{\n \"type\": \"Materialization\",\n \"source_capture\": {\n \"add_bindings\": [\n \"snails/shells\"\n ]\n },\n \"publications\": {\n \"target_pub_id\": \"0102030405060708\",\n \"max_observed_pub_id\": \"0102030405060708\",\n \"history\": [\n {\n \"id\": \"0403020101020304\",\n \"created\": \"2024-05-30T09:10:11Z\",\n \"completed\": \"2024-05-30T09:10:11Z\",\n \"detail\": \"some detail\",\n \"result\": {\n \"type\": \"buildFailed\",\n \"incompatible_collections\": [\n {\n \"collection\": \"snails/water\",\n \"affected_materializations\": [\n {\n \"name\": \"snails/materialize\",\n \"fields\": [\n {\n \"field\": \"a_field\",\n \"reason\": \"do not like\"\n }\n ]\n }\n ]\n }\n ]\n },\n \"errors\": [\n {\n \"catalog_name\": \"snails/shells\",\n \"scope\": \"flow://materializations/snails/shells\",\n \"detail\": \"a_field simply cannot be tolerated\"\n }\n ]\n }\n ]\n },\n \"activation\": {\n \"last_activated\": \"0102030404030201\"\n }\n}", + json: "{\n \"type\": \"Materialization\",\n \"source_capture\": {\n \"source_capture\": {\n \"capture\": \"snails/capture\",\n \"schemaMode\": \"leaveEmpty\"\n },\n \"add_bindings\": [\n \"snails/shells\"\n ]\n },\n \"publications\": {\n \"target_pub_id\": \"0102030405060708\",\n \"max_observed_pub_id\": \"0102030405060708\",\n \"history\": [\n {\n \"id\": \"0403020101020304\",\n \"created\": \"2024-05-30T09:10:11Z\",\n \"completed\": \"2024-05-30T09:10:11Z\",\n \"detail\": \"some detail\",\n \"result\": {\n \"type\": \"buildFailed\",\n \"incompatible_collections\": [\n {\n \"collection\": \"snails/water\",\n \"affected_materializations\": [\n {\n \"name\": \"snails/materialize\",\n \"fields\": [\n {\n \"field\": \"a_field\",\n \"reason\": \"do not like\"\n }\n ]\n }\n ]\n }\n ]\n },\n \"errors\": [\n {\n \"catalog_name\": \"snails/shells\",\n \"scope\": \"flow://materializations/snails/shells\",\n \"detail\": \"a_field simply cannot be tolerated\"\n }\n ]\n }\n ]\n },\n \"activation\": {\n \"last_activated\": \"0102030404030201\"\n }\n}", parsed: Materialization( MaterializationStatus { source_capture: Some( SourceCaptureStatus { + source_capture: SourceCaptureDef { + capture: Capture( + "snails/capture", + ), + schema_mode: LeaveEmpty, + delta_updates: false, + }, up_to_date: false, add_bindings: { Collection( diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap index 66e07ec02b..e260897dd3 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap @@ -33,6 +33,14 @@ expression: schema ], "type": "object" }, + "Capture": { + "description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", + "examples": [ + "acmeCo/capture" + ], + "pattern": "^[\\p{Letter}\\p{Number}\\-_\\.]+(/[\\p{Letter}\\p{Number}\\-_\\.]+)*$", + "type": "string" + }, "Collection": { "description": "Collection names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", "examples": [ @@ -364,6 +372,47 @@ expression: schema ], "type": "object" }, + "SourceCaptureDef": { + "additionalProperties": false, + "description": "SourceCaptureDef specifies configuration for source captures", + "properties": { + "capture": { + "$ref": "#/definitions/Capture", + "description": "Capture name" + }, + "deltaUpdates": { + "description": "When adding new bindings from a source capture to a materialization, should the new bindings be marked as delta updates", + "type": "boolean" + }, + "schemaMode": { + "$ref": "#/definitions/SourceCaptureSchemaMode", + "default": "leaveEmpty", + "description": "When adding new bindings from a source capture to a materialization, how should the schema of the materialization binding be set" + } + }, + "required": [ + "capture" + ], + "type": "object" + }, + "SourceCaptureSchemaMode": { + "oneOf": [ + { + "description": "Leave the materialization binding's schema field empty, therefore falling back to the default schema of the materialization", + "enum": [ + "leaveEmpty" + ], + "type": "string" + }, + { + "description": "Use the 2nd-to-last component of the collection name as the schema of the materialization binding", + "enum": [ + "collectionSchema" + ], + "type": "string" + } + ] + }, "SourceCaptureStatus": { "description": "Status information about the `sourceCapture`", "properties": { @@ -375,11 +424,17 @@ expression: schema "type": "array", "uniqueItems": true }, + "source_capture": { + "$ref": "#/definitions/SourceCaptureDef" + }, "up_to_date": { "description": "Whether the materialization bindings are up-to-date with respect to the `sourceCapture` bindings. In normal operation, this should always be `true`. Otherwise, there will be a controller `error` and the publication status will contain details of why the update failed.", "type": "boolean" } }, + "required": [ + "source_capture" + ], "type": "object" } }, diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index 1a9e72dfc5..9e52f728d8 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -165,7 +165,7 @@ impl TestHarness { 'materialization', 'http://test.test/', '{"type": "object"}', - '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}}}', + '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}, "delta": {"type": "boolean", "x-delta-updates": true}}}', '{/id}', '{"type": "success"}' ) on conflict do nothing diff --git a/crates/agent/src/integration_tests/source_captures.rs b/crates/agent/src/integration_tests/source_captures.rs index 1487c374b8..eec72d46fb 100644 --- a/crates/agent/src/integration_tests/source_captures.rs +++ b/crates/agent/src/integration_tests/source_captures.rs @@ -9,7 +9,7 @@ async fn test_source_captures() { let draft = draft_catalog(serde_json::json!({ "collections": { - "ducks/quacks": { + "ducks/pond/quacks": { "schema": { "type": "object", "properties": { @@ -77,9 +77,23 @@ async fn test_source_captures() { .unwrap(); assert_eq!(1, a_model.bindings.len()); assert_eq!( - "ducks/quacks", + "ducks/pond/quacks", a_model.bindings[0].source.collection().as_str() ); + // Schema mode not set, so we expect schema to be left empty + assert_eq!( + None, + a_model.bindings[0].resource.to_value().pointer("/schema") + ); + // Delta updates not set, so we expect delta to be left empty + assert_eq!( + None, + a_model.bindings[0].resource.to_value().pointer("/delta") + ); + assert_eq!( + "quacks", + a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap() + ); let a_status = a_state.current_status.unwrap_materialization(); assert!(a_status.source_capture.as_ref().unwrap().up_to_date); assert!(a_status @@ -152,7 +166,11 @@ async fn test_source_captures_collection_name() { }, "materializations": { "ducks/materializeA": { - "sourceCapture": "ducks/capture", + "sourceCapture": { + "capture": "ducks/capture", + "schemaMode": "collectionSchema", + "deltaUpdates": true, + }, "endpoint": { "connector": { "image": "materialize/test:test", @@ -196,6 +214,10 @@ async fn test_source_captures_collection_name() { "pond", a_model.bindings[0].resource.to_value().pointer("/schema").unwrap().as_str().unwrap() ); + assert_eq!( + true, + a_model.bindings[0].resource.to_value().pointer("/delta").unwrap().as_bool().unwrap() + ); assert_eq!( "quacks", a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap() diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index d1d80856ad..761e4ca447 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -111,7 +111,7 @@ async fn update_live_spec_flows( catalog_type, Some(reads_from.iter().map(|c| c.as_str()).collect::>()).filter(|a| !a.is_empty()), Some(writes_to.iter().map(|c| c.as_str()).collect::>()).filter(|a| !a.is_empty()), - source_capture.as_ref().map(|c| c.as_str()), + source_capture.as_ref().map(|c| c.capture.as_str()), txn, ) .await?; diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 5e63795fba..214ca2fb75 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -1,3 +1,4 @@ +use models::{SourceCaptureDef, SourceCaptureSchemaMode}; use serde_json::Value; /// @@ -5,6 +6,7 @@ use serde_json::Value; /// If the `full_collection_name` doesn't contain any `/` characters, which should never /// be the case since we should have already validated the collection name. pub fn update_materialization_resource_spec( + source_capture: &SourceCaptureDef, resource_spec: &mut Value, resource_spec_pointers: &ResourceSpecPointers, full_collection_name: &str, @@ -31,9 +33,19 @@ pub fn update_materialization_resource_spec( let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into()); - if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { - if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { - let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); + if source_capture.schema_mode == SourceCaptureSchemaMode::CollectionSchema { + if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { + if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { + let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); + } + } + } + + if source_capture.delta_updates { + if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates { + if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) { + let _ = std::mem::replace(x_delta_updates_prev, true.into()); + } } } @@ -43,12 +55,14 @@ pub fn update_materialization_resource_spec( pub struct ResourceSpecPointers { x_collection_name: doc::Pointer, x_schema_name: Option, + x_delta_updates: Option, } /// Runs inference on the given schema and searches for a location within the resource spec -/// that bears the `x-collection-name` and `x-schema-name` annotations. Returns the pointer to those location, or an -/// error if no such location exists. Errors from parsing the schema are returned directly. -/// The schema must be fully self-contained (a.k.a. bundled), or an error will be returned. +/// that bears the `x-collection-name`, `x-schema-name` or `x-delta-updates` annotations. +/// Returns the pointer to those location, or an error if no `x-collection-name` exists. +/// Errors from parsing the schema are returned directly. The schema must be fully self-contained (a.k.a. bundled), +/// or an error will be returned. pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result { // While all known connector resource spec schemas are self-contained, we don't // actually do anything to guarantee that they are. This function may fail in that case. @@ -60,18 +74,22 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result = None; let mut x_schema_name: Option = None; + let mut x_delta_updates: Option = None; for (ptr, _, prop_shape, _) in shape.locations() { if prop_shape.annotations.contains_key("x-collection-name") { x_collection_name = Some(ptr) } else if prop_shape.annotations.contains_key("x-schema-name") { x_schema_name = Some(ptr) + } else if prop_shape.annotations.contains_key("x-delta-updates") { + x_delta_updates = Some(ptr) } } if let Some(x_collection_name_ptr) = x_collection_name { Ok(ResourceSpecPointers { x_collection_name: x_collection_name_ptr, - x_schema_name + x_schema_name, + x_delta_updates, }) } else { Err(anyhow::anyhow!( diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 77df69757e..1ee9111346 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -10,6 +10,7 @@ mod derive_typescript; mod id; mod journals; mod labels; +mod source_capture; mod materializations; mod raw_value; mod references; @@ -25,6 +26,7 @@ pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint}; pub use catalogs::Catalog; pub use collections::{CollectionDef, Projection}; pub use connector::{split_image_tag, ConnectorConfig, LocalConfig}; +pub use source_capture::{SourceCaptureDef, SourceCaptureSchemaMode}; pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef}; pub use derive_sqlite::DeriveUsingSqlite; pub use derive_typescript::DeriveUsingTypescript; @@ -81,7 +83,7 @@ pub trait ModelDef: /// If this spec is a materialization, returns the value of `source_capture`. /// This function is admittedly a little smelly, but it's included in the trait /// so that we can generically get all the dependencies of each spec. - fn materialization_source_capture(&self) -> Option { + fn materialization_source_capture(&self) -> Option { None } @@ -92,7 +94,7 @@ pub trait ModelDef: deps.extend( self.materialization_source_capture() .into_iter() - .map(|c| c.into()), + .map(|c| c.capture.into()), ); deps } @@ -251,7 +253,7 @@ impl ModelDef for AnySpec { } } - fn materialization_source_capture(&self) -> Option { + fn materialization_source_capture(&self) -> Option { match self { AnySpec::Materialization(m) => m.materialization_source_capture(), _ => None, diff --git a/crates/models/src/materializations.rs b/crates/models/src/materializations.rs index 19c9a7b116..3d51b66c57 100644 --- a/crates/models/src/materializations.rs +++ b/crates/models/src/materializations.rs @@ -1,7 +1,9 @@ use crate::{source::OnIncompatibleSchemaChange, Collection, Id}; +use crate::source_capture::{SourceCapture, SourceCaptureDef}; + use super::{ - Capture, ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source, + ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -15,7 +17,7 @@ use std::collections::BTreeMap; pub struct MaterializationDef { /// # Automatically materialize new bindings from a named capture #[serde(skip_serializing_if = "Option::is_none")] - pub source_capture: Option, + pub source_capture: Option, /// # Default handling of schema changes that are incompatible with the target resource. /// This can be overridden on a per-binding basis. #[serde( @@ -201,8 +203,15 @@ impl super::ModelDef for MaterializationDef { !self.shards.disable } - fn materialization_source_capture(&self) -> Option { - self.source_capture.clone() + fn materialization_source_capture(&self) -> Option { + match &self.source_capture { + Some(SourceCapture::Simple(capture_name)) => Some(SourceCaptureDef { + capture: capture_name.clone(), + ..Default::default() + }), + Some(SourceCapture::Configured(sc)) => Some(sc.clone()), + None => None + } } fn connector_image(&self) -> Option<&str> { diff --git a/crates/models/src/source_capture.rs b/crates/models/src/source_capture.rs new file mode 100644 index 0000000000..97ab630c67 --- /dev/null +++ b/crates/models/src/source_capture.rs @@ -0,0 +1,64 @@ +use super::Capture; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields, rename_all = "camelCase")] +pub enum SourceCaptureSchemaMode { + /// Leave the materialization binding's schema field empty, therefore falling back to the + /// default schema of the materialization + LeaveEmpty, + /// Use the 2nd-to-last component of the collection name as the schema of the materialization + /// binding + CollectionSchema, +} + +impl Default for SourceCaptureSchemaMode { + fn default() -> Self { + SourceCaptureSchemaMode::LeaveEmpty + } +} + +/// SourceCaptureDef specifies configuration for source captures +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq, Default)] +#[serde(deny_unknown_fields, rename_all = "camelCase")] +pub struct SourceCaptureDef { + /// Capture name + pub capture: Capture, + + /// When adding new bindings from a source capture to a materialization, how should the schema + /// of the materialization binding be set + #[serde(default)] + pub schema_mode: SourceCaptureSchemaMode, + + /// When adding new bindings from a source capture to a materialization, should the new + /// bindings be marked as delta updates + #[serde(default, skip_serializing_if = "super::is_false")] + pub delta_updates: bool, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields, rename_all = "camelCase", untagged)] +pub enum SourceCapture { + Simple(Capture), + Configured(SourceCaptureDef), +} + +impl SourceCapture { + pub fn capture_name(&self) -> Capture { + match self { + SourceCapture::Simple(capture) => capture.clone(), + SourceCapture::Configured(sc) => sc.capture.clone(), + } + } + + pub fn def(&self) -> SourceCaptureDef { + match self { + SourceCapture::Simple(capture) => SourceCaptureDef { + capture: capture.clone(), + ..Default::default() + }, + SourceCapture::Configured(sc) => sc.clone(), + } + } +}