From 98412d251051e0dee7de452e461ddff6075a767a Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 22 Aug 2024 12:46:45 +0100 Subject: [PATCH 01/11] flow: x-schema-name annotation for 2nd-to-last resource path --- Cargo.lock | 1 + .../agent/src/controllers/materialization.rs | 9 +- crates/agent/src/integration_tests/harness.rs | 8 +- .../src/integration_tests/source_captures.rs | 120 ++++++++++++++++++ crates/agent/src/resource_configs.rs | 67 +++++++--- crates/flowctl/Cargo.toml | 1 + crates/flowctl/src/generate/mod.rs | 57 ++++++++- ...t__stub_config_resource_spec_pointers.snap | 8 ++ 8 files changed, 248 insertions(+), 23 deletions(-) create mode 100644 crates/flowctl/src/generate/snapshots/flowctl__generate__test__stub_config_resource_spec_pointers.snap diff --git a/Cargo.lock b/Cargo.lock index 3664200f89..9e19cd205f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2281,6 +2281,7 @@ dependencies = [ "futures", "gazette", "humantime", + "insta", "itertools 0.10.5", "json", "labels", diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index 4c38a2acdb..4a0184ffeb 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -5,6 +5,7 @@ use super::{ use crate::{ controllers::publication_status::PublicationStatus, publications::{PublicationResult, RejectedField}, + resource_configs::ResourceSpecPointers, }; use anyhow::Context; use itertools::Itertools; @@ -335,7 +336,7 @@ impl SourceCaptureStatus { .get_connector_spec(config.image.clone()) .await .context("failed to fetch connector spec")?; - let collection_name_pointer = crate::resource_configs::pointer_for_schema( + let resource_spec_pointers = crate::resource_configs::pointer_for_schema( connector_spec.resource_config_schema.get(), )?; @@ -370,7 +371,7 @@ impl SourceCaptureStatus { draft_row.is_touch = false; update_linked_materialization( - collection_name_pointer, + resource_spec_pointers, &self.add_bindings, draft_row.model.as_mut().unwrap(), )?; @@ -398,7 +399,7 @@ fn get_bindings_to_add( } fn update_linked_materialization( - resource_collection_name_ptr: doc::Pointer, + resource_spec_pointers: ResourceSpecPointers, bindings_to_add: &BTreeSet, materialization: &mut models::MaterializationDef, ) -> anyhow::Result<()> { @@ -406,7 +407,7 @@ fn update_linked_materialization( let mut resource_spec = serde_json::json!({}); crate::resource_configs::update_materialization_resource_spec( &mut resource_spec, - &resource_collection_name_ptr, + &resource_spec_pointers, &collection_name, )?; diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index fafb9f2014..9693ffc890 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -168,7 +168,7 @@ impl TestHarness { 'materialization', 'http://test.test/', '{"type": "object"}', - '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}}}', + '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}}}', '{/id}', '{"type": "success"}' ) on conflict do nothing @@ -269,6 +269,12 @@ impl TestHarness { ), del_daily_stats as ( delete from catalog_stats_daily + ), + del_connectors as ( + delete from connectors + ), + del_connector_tags as ( + delete from connector_tags ) delete from catalog_stats_monthly;"#, system_user_id diff --git a/crates/agent/src/integration_tests/source_captures.rs b/crates/agent/src/integration_tests/source_captures.rs index 83d410148b..fc16457f8d 100644 --- a/crates/agent/src/integration_tests/source_captures.rs +++ b/crates/agent/src/integration_tests/source_captures.rs @@ -181,3 +181,123 @@ async fn test_source_captures() { let last_detail = a_status.publications.history[0].detail.as_deref().unwrap(); assert!(last_detail.contains("adding binding(s) to match the sourceCapture: [ducks/ponds]")); } + +#[tokio::test] +#[serial_test::serial] +async fn test_source_captures_collection_name() { + let mut harness = TestHarness::init("test_source_captures_collection_name").await; + + let user_id = harness.setup_tenant("ducks").await; + + let draft = draft_catalog(serde_json::json!({ + "collections": { + "ducks/pond/quacks": { + "schema": { + "type": "object", + "properties": { + "id": { "type": "string" } + } + }, + "key": ["/id"] + } + }, + "captures": { + "ducks/capture": { + "endpoint": { + "connector": { + "image": "source/test:test", + "config": {} + } + }, + "bindings": [ + { + "resource": { + "name": "greetings", + "prefix": "Hello {}!" + }, + "target": "ducks/pond/quacks" + } + ] + } + }, + "materializations": { + "ducks/materializeA": { + "sourceCapture": "ducks/capture", + "endpoint": { + "connector": { + "image": "materialize/test:test", + "config": {} + } + }, + "bindings": [ ] + }, + "ducks/materializeNoSource": { + "sourceCapture": "ducks/notARealCapture", + "endpoint": { + "connector": { + "image": "materialize/test:test", + "config": {} + } + }, + "bindings": [ ] + } + } + })); + + let result = harness + .user_publication(user_id, "test sourceCapture", draft) + .await; + assert!(result.status.is_success()); + + harness.run_pending_controllers(None).await; + let a_state = harness.get_controller_state("ducks/materializeA").await; + let a_model = a_state + .live_spec + .as_ref() + .unwrap() + .as_materialization() + .unwrap(); + assert_eq!(1, a_model.bindings.len()); + assert_eq!( + "ducks/pond/quacks", + a_model.bindings[0].source.collection().as_str() + ); + assert_eq!( + "pond", + a_model.bindings[0].resource.to_value().pointer("/schema").unwrap().as_str().unwrap() + ); + assert_eq!( + "quacks", + a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap() + ); + let a_status = a_state.current_status.unwrap_materialization(); + assert!(a_status.source_capture.as_ref().unwrap().up_to_date); + assert!(a_status + .source_capture + .as_ref() + .unwrap() + .add_bindings + .is_empty()); + assert_eq!( + Some("adding binding(s) to match the sourceCapture: [ducks/pond/quacks]"), + a_status.publications.history[0].detail.as_deref() + ); + + let no_source_state = harness + .get_controller_state("ducks/materializeNoSource") + .await; + let no_source_model = no_source_state + .live_spec + .as_ref() + .unwrap() + .as_materialization() + .unwrap(); + assert!(no_source_model.bindings.is_empty()); + assert!(no_source_model.source_capture.is_none()); + let no_source_status = no_source_state.current_status.unwrap_materialization(); + assert_eq!( + Some("in response to publication of one or more depencencies, removed sourceCapture: \"ducks/notARealCapture\" because the capture was deleted"), + no_source_status.publications.history[0].detail.as_deref() + ); + assert!(no_source_status.source_capture.is_none()); +} diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 3da60cf994..5e63795fba 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -6,29 +6,50 @@ use serde_json::Value; /// be the case since we should have already validated the collection name. pub fn update_materialization_resource_spec( resource_spec: &mut Value, - collection_name_ptr: &doc::Pointer, + resource_spec_pointers: &ResourceSpecPointers, full_collection_name: &str, -) -> anyhow::Result { - let resource_name = full_collection_name - .rsplit_once('/') - .expect("collection name is invalid (does not contain '/')") - .1 - .to_owned(); - - let Some(prev) = collection_name_ptr.create_value(resource_spec) else { +) -> anyhow::Result<()> { + let split: Vec<&str> = full_collection_name + .rsplit('/') + .take(2) + .collect(); + + if split.len() < 2 { + return Err(anyhow::anyhow!("collection name is invalid (does not contain '/')")) + } + + let x_collection_name = split[0]; + let x_schema_name = split[1]; + + let x_collection_name_ptr = &resource_spec_pointers.x_collection_name; + + let Some(x_collection_name_prev) = x_collection_name_ptr.create_value(resource_spec) else { anyhow::bail!( - "cannot create location '{collection_name_ptr}' in resource spec '{resource_spec}'" + "cannot create location '{x_collection_name_ptr}' in resource spec '{resource_spec}'" ); }; - Ok(std::mem::replace(prev, resource_name.into())) + let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into()); + + if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { + if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { + let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); + } + } + + Ok(()) +} + +pub struct ResourceSpecPointers { + x_collection_name: doc::Pointer, + x_schema_name: Option, } /// Runs inference on the given schema and searches for a location within the resource spec -/// that bears the `x-collection-name` annotation. Returns the pointer to that location, or an +/// that bears the `x-collection-name` and `x-schema-name` annotations. Returns the pointer to those location, or an /// error if no such location exists. Errors from parsing the schema are returned directly. /// The schema must be fully self-contained (a.k.a. bundled), or an error will be returned. -pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result { +pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result { // While all known connector resource spec schemas are self-contained, we don't // actually do anything to guarantee that they are. This function may fail in that case. let schema = doc::validation::build_bundle(schema_json)?; @@ -37,12 +58,24 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result { let index = builder.into_index(); let shape = doc::Shape::infer(&schema, &index); + let mut x_collection_name: Option = None; + let mut x_schema_name: Option = None; for (ptr, _, prop_shape, _) in shape.locations() { if prop_shape.annotations.contains_key("x-collection-name") { - return Ok(ptr); + x_collection_name = Some(ptr) + } else if prop_shape.annotations.contains_key("x-schema-name") { + x_schema_name = Some(ptr) } } - Err(anyhow::anyhow!( - "resource spec schema does not contain any location annotated with x-collection-name" - )) + + if let Some(x_collection_name_ptr) = x_collection_name { + Ok(ResourceSpecPointers { + x_collection_name: x_collection_name_ptr, + x_schema_name + }) + } else { + Err(anyhow::anyhow!( + "resource spec schema does not contain any location annotated with x-collection-name" + )) + } } diff --git a/crates/flowctl/Cargo.toml b/crates/flowctl/Cargo.toml index 1a97c0a8ca..7e70e0cbf5 100644 --- a/crates/flowctl/Cargo.toml +++ b/crates/flowctl/Cargo.toml @@ -72,3 +72,4 @@ warp = { workspace = true } [dev-dependencies] assert_cmd = { workspace = true } tempfile = { workspace = true } +insta = { workspace = true } diff --git a/crates/flowctl/src/generate/mod.rs b/crates/flowctl/src/generate/mod.rs index f30142ce3e..1476ed4182 100644 --- a/crates/flowctl/src/generate/mod.rs +++ b/crates/flowctl/src/generate/mod.rs @@ -383,7 +383,7 @@ fn stub_config(shape: &doc::Shape, collection: Option<&models::Collection>) -> s let mut properties = serde_json::Map::new(); for p in &shape.object.properties { - if p.is_required { + if p.is_required || p.shape.annotations.get("x-schema-name").is_some() { properties.insert(p.name.to_string(), stub_config(&p.shape, collection)); } } @@ -414,6 +414,17 @@ fn stub_config(shape: &doc::Shape, collection: Option<&models::Collection>) -> s .rsplit("/") .next() .expect("collection names always have a slash")) + } else if shape + .annotations + .get("x-schema-name") + .is_some_and(|v| matches!(v, serde_json::Value::Bool(true))) + && collection.is_some() + { + json!(collection + .unwrap() + .rsplit("/") + .nth(1) + .expect("collection names always have a slash")) } else if shape.type_.overlaps(types::STRING) { json!("") } else if shape.type_.overlaps(types::INTEGER) { @@ -426,3 +437,47 @@ fn stub_config(shape: &doc::Shape, collection: Option<&models::Collection>) -> s json!(null) } } + +#[cfg(test)] +mod test { + use super::*; + use doc::Shape; + + // Map a JSON schema, in YAML form, into a Shape. + fn shape_from(schema_yaml: &str) -> Shape { + + let url = url::Url::parse("http://example/schema").unwrap(); + let schema: serde_json::Value = serde_yaml::from_str(schema_yaml).unwrap(); + let schema = + json::schema::build::build_schema::(url.clone(), &schema).unwrap(); + + let mut index = json::schema::index::IndexBuilder::new(); + index.add(&schema).unwrap(); + index.verify_references().unwrap(); + let index = index.into_index(); + + Shape::infer(index.must_fetch(&url).unwrap(), &index) + } + + #[test] + fn test_stub_config_resource_spec_pointers() { + let obj = shape_from( + r#" + type: object + properties: + stream: + type: string + x-collection-name: true + schema: + type: string + x-schema-name: true + required: + - stream + "#, + ); + + let cfg = stub_config(&obj, Some(&models::Collection::new("my-tenant/my-task/my-collection"))); + + insta::assert_json_snapshot!(cfg); + } +} diff --git a/crates/flowctl/src/generate/snapshots/flowctl__generate__test__stub_config_resource_spec_pointers.snap b/crates/flowctl/src/generate/snapshots/flowctl__generate__test__stub_config_resource_spec_pointers.snap new file mode 100644 index 0000000000..110f1b4904 --- /dev/null +++ b/crates/flowctl/src/generate/snapshots/flowctl__generate__test__stub_config_resource_spec_pointers.snap @@ -0,0 +1,8 @@ +--- +source: crates/flowctl/src/generate/mod.rs +expression: cfg +--- +{ + "schema": "my-task", + "stream": "my-collection" +} From b82ea868585fd1abcc3cc8859834aebe12dd2cd3 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Tue, 3 Sep 2024 00:14:53 +0100 Subject: [PATCH 02/11] harness: don't delete connectors & connector_tags --- crates/agent/src/integration_tests/harness.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index 9693ffc890..7063c88f84 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -269,12 +269,6 @@ impl TestHarness { ), del_daily_stats as ( delete from catalog_stats_daily - ), - del_connectors as ( - delete from connectors - ), - del_connector_tags as ( - delete from connector_tags ) delete from catalog_stats_monthly;"#, system_user_id From 71c29e82d6c1bb5ae8572df1a12763a7a2d3b7ce Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 5 Sep 2024 10:52:24 +0100 Subject: [PATCH 03/11] agent: sourceCapture configuration for schema and delta updates --- .../agent/src/controllers/materialization.rs | 19 ++-- crates/agent/src/controllers/mod.rs | 5 + ...st__materialization-status-round-trip.snap | 14 +++ ...controllers__test__status_json_schema.snap | 55 ++++++++++ crates/agent/src/integration_tests/harness.rs | 2 +- .../src/integration_tests/source_captures.rs | 102 +++++------------- crates/agent/src/publications/specs.rs | 2 +- crates/agent/src/resource_configs.rs | 32 ++++-- crates/models/src/lib.rs | 10 +- crates/models/src/materializations.rs | 15 ++- crates/models/src/source_capture.rs | 64 +++++++++++ crates/tables/src/dependencies.rs | 2 +- crates/tables/src/draft.rs | 2 +- 13 files changed, 224 insertions(+), 100 deletions(-) create mode 100644 crates/models/src/source_capture.rs diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index 4a0184ffeb..a296cf700b 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -9,7 +9,7 @@ use crate::{ }; use anyhow::Context; use itertools::Itertools; -use models::{ModelDef, OnIncompatibleSchemaChange}; +use models::{ModelDef, OnIncompatibleSchemaChange, SourceCaptureDef}; use proto_flow::materialize::response::validated::constraint::Type as ConstraintType; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -59,14 +59,17 @@ impl MaterializationStatus { } } - if let Some(source_capture_name) = &model.source_capture { + if let Some(source_capture) = &model.source_capture { // If the source capture has been deleted, we will have already handled that as a // part of `handle_deleted_dependencies`. if let Some(source_capture_model) = - dependencies.live.captures.get_by_key(source_capture_name) + dependencies.live.captures.get_by_key(&source_capture.capture_name()) { if self.source_capture.is_none() { - self.source_capture = Some(SourceCaptureStatus::default()); + self.source_capture = Some(SourceCaptureStatus { + source_capture: source_capture.def(), + ..Default::default() + }); } let source_capture_status = self.source_capture.as_mut().unwrap(); // Source capture errors are terminal @@ -265,10 +268,10 @@ fn handle_deleted_dependencies( if drafted .source_capture .as_ref() - .map(|sc| deleted.contains(sc.as_str())) + .map(|sc| deleted.contains(sc.capture_name().as_str())) .unwrap_or(false) { - let capture_name = drafted.source_capture.take().unwrap(); + let capture_name = drafted.source_capture.take().unwrap().capture_name(); source_capture.take(); descriptions.push(format!( r#"removed sourceCapture: "{capture_name}" because the capture was deleted"# @@ -290,6 +293,7 @@ fn is_false(b: &bool) -> bool { /// Status information about the `sourceCapture` #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, JsonSchema)] pub struct SourceCaptureStatus { + pub source_capture: SourceCaptureDef, /// Whether the materialization bindings are up-to-date with respect to /// the `sourceCapture` bindings. In normal operation, this should always /// be `true`. Otherwise, there will be a controller `error` and the @@ -371,6 +375,7 @@ impl SourceCaptureStatus { draft_row.is_touch = false; update_linked_materialization( + &self.source_capture, resource_spec_pointers, &self.add_bindings, draft_row.model.as_mut().unwrap(), @@ -399,6 +404,7 @@ fn get_bindings_to_add( } fn update_linked_materialization( + source_capture: &SourceCaptureDef, resource_spec_pointers: ResourceSpecPointers, bindings_to_add: &BTreeSet, materialization: &mut models::MaterializationDef, @@ -406,6 +412,7 @@ fn update_linked_materialization( for collection_name in bindings_to_add { let mut resource_spec = serde_json::json!({}); crate::resource_configs::update_materialization_resource_spec( + source_capture, &mut resource_spec, &resource_spec_pointers, &collection_name, diff --git a/crates/agent/src/controllers/mod.rs b/crates/agent/src/controllers/mod.rs index a86b19e4a4..6254592d29 100644 --- a/crates/agent/src/controllers/mod.rs +++ b/crates/agent/src/controllers/mod.rs @@ -457,6 +457,7 @@ mod test { use std::collections::{BTreeSet, VecDeque}; use chrono::TimeZone; + use models::{SourceCaptureDef, Capture}; use super::*; use crate::controllers::materialization::SourceCaptureStatus; @@ -505,6 +506,10 @@ mod test { source_capture: Some(SourceCaptureStatus { up_to_date: false, add_bindings, + source_capture: SourceCaptureDef { + capture: Capture::new("snails/capture"), + ..Default::default() + } }), publications: PublicationStatus { max_observed_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]), diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap index 2c33905ae8..2dca6f9423 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap @@ -7,6 +7,13 @@ StatusSnapshot { MaterializationStatus { source_capture: Some( SourceCaptureStatus { + source_capture: SourceCaptureDef { + capture: Capture( + "snails/capture", + ), + schema_mode: LeaveEmpty, + delta_updates: false, + }, up_to_date: false, add_bindings: { Collection( @@ -78,6 +85,13 @@ StatusSnapshot { MaterializationStatus { source_capture: Some( SourceCaptureStatus { + source_capture: SourceCaptureDef { + capture: Capture( + "snails/capture", + ), + schema_mode: LeaveEmpty, + delta_updates: false, + }, up_to_date: false, add_bindings: { Collection( diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap index 26fd7b65f4..73f14f2baa 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap @@ -33,6 +33,14 @@ expression: schema ], "type": "object" }, + "Capture": { + "description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", + "examples": [ + "acmeCo/capture" + ], + "pattern": "^[\\p{Letter}\\p{Number}\\-_\\.]+(/[\\p{Letter}\\p{Number}\\-_\\.]+)*$", + "type": "string" + }, "Collection": { "description": "Collection names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", "examples": [ @@ -394,6 +402,47 @@ expression: schema ], "type": "object" }, + "SourceCaptureDef": { + "additionalProperties": false, + "description": "SourceCaptureDef specifies configuration for source captures", + "properties": { + "capture": { + "$ref": "#/definitions/Capture", + "description": "Capture name" + }, + "deltaUpdates": { + "description": "When adding new bindings from a source capture to a materialization, should the new bindings be marked as delta updates", + "type": "boolean" + }, + "schemaMode": { + "$ref": "#/definitions/SourceCaptureSchemaMode", + "default": "leaveEmpty", + "description": "When adding new bindings from a source capture to a materialization, how should the schema of the materialization binding be set" + } + }, + "required": [ + "capture" + ], + "type": "object" + }, + "SourceCaptureSchemaMode": { + "oneOf": [ + { + "description": "Leave the materialization binding's schema field empty, therefore falling back to the default schema of the materialization", + "enum": [ + "leaveEmpty" + ], + "type": "string" + }, + { + "description": "Use the 2nd-to-last component of the collection name as the schema of the materialization binding", + "enum": [ + "collectionSchema" + ], + "type": "string" + } + ] + }, "SourceCaptureStatus": { "description": "Status information about the `sourceCapture`", "properties": { @@ -405,11 +454,17 @@ expression: schema "type": "array", "uniqueItems": true }, + "source_capture": { + "$ref": "#/definitions/SourceCaptureDef" + }, "up_to_date": { "description": "Whether the materialization bindings are up-to-date with respect to the `sourceCapture` bindings. In normal operation, this should always be `true`. Otherwise, there will be a controller `error` and the publication status will contain details of why the update failed.", "type": "boolean" } }, + "required": [ + "source_capture" + ], "type": "object" } }, diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index 7063c88f84..c6a46b82a5 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -168,7 +168,7 @@ impl TestHarness { 'materialization', 'http://test.test/', '{"type": "object"}', - '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}}}', + '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}, "delta": {"type": "boolean", "x-delta-updates": true}}}', '{/id}', '{"type": "success"}' ) on conflict do nothing diff --git a/crates/agent/src/integration_tests/source_captures.rs b/crates/agent/src/integration_tests/source_captures.rs index fc16457f8d..03826136a3 100644 --- a/crates/agent/src/integration_tests/source_captures.rs +++ b/crates/agent/src/integration_tests/source_captures.rs @@ -9,7 +9,7 @@ async fn test_source_captures() { let draft = draft_catalog(serde_json::json!({ "collections": { - "ducks/quacks": { + "ducks/pond/quacks": { "schema": { "type": "object", "properties": { @@ -33,7 +33,7 @@ async fn test_source_captures() { "name": "greetings", "prefix": "Hello {}!" }, - "target": "ducks/quacks" + "target": "ducks/pond/quacks" } ] } @@ -77,9 +77,23 @@ async fn test_source_captures() { .unwrap(); assert_eq!(1, a_model.bindings.len()); assert_eq!( - "ducks/quacks", + "ducks/pond/quacks", a_model.bindings[0].source.collection().as_str() ); + // Schema mode not set, so we expect schema to be left empty + assert_eq!( + None, + a_model.bindings[0].resource.to_value().pointer("/schema") + ); + // Delta updates not set, so we expect delta to be left empty + assert_eq!( + None, + a_model.bindings[0].resource.to_value().pointer("/delta") + ); + assert_eq!( + "quacks", + a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap() + ); let a_status = a_state.current_status.unwrap_materialization(); assert!(a_status.source_capture.as_ref().unwrap().up_to_date); assert!(a_status @@ -89,7 +103,7 @@ async fn test_source_captures() { .add_bindings .is_empty()); assert_eq!( - Some("adding binding(s) to match the sourceCapture: [ducks/quacks]"), + Some("adding binding(s) to match the sourceCapture: [ducks/pond/quacks]"), a_status.publications.history[0].detail.as_deref() ); @@ -110,76 +124,6 @@ async fn test_source_captures() { no_source_status.publications.history[0].detail.as_deref() ); assert!(no_source_status.source_capture.is_none()); - - // Now add another binding to the source capture and assert that it gets added to the materialization - let draft2 = draft_catalog(serde_json::json!({ - "collections": { - "ducks/ponds": { - "schema": { - "type": "object", - "properties": { - "id": { "type": "string" } - } - }, - "key": ["/id"] - } - }, - "captures": { - "ducks/capture": { - "endpoint": { - "connector": { - "image": "source/test:test", - "config": {} - } - }, - "bindings": [ - { - "resource": { - "name": "greetings", - "prefix": "Hello {}!" - }, - "target": "ducks/quacks" - }, - { - "resource": { - "name": "something else", - }, - "target": "ducks/ponds" - } - ] - } - }, - })); - - let result = harness - .user_publication(user_id, "test sourceCapture update", draft2) - .await; - assert!(result.status.is_success()); - - harness.run_pending_controllers(None).await; - - let a_state = harness.get_controller_state("ducks/materializeA").await; - let a_model = a_state - .live_spec - .as_ref() - .unwrap() - .as_materialization() - .unwrap(); - assert_eq!(2, a_model.bindings.len()); - assert_eq!( - "ducks/ponds", - a_model.bindings[1].source.collection().as_str() - ); - let a_status = a_state.current_status.unwrap_materialization(); - assert!(a_status.source_capture.as_ref().unwrap().up_to_date); - assert!(a_status - .source_capture - .as_ref() - .unwrap() - .add_bindings - .is_empty()); - let last_detail = a_status.publications.history[0].detail.as_deref().unwrap(); - assert!(last_detail.contains("adding binding(s) to match the sourceCapture: [ducks/ponds]")); } #[tokio::test] @@ -222,7 +166,11 @@ async fn test_source_captures_collection_name() { }, "materializations": { "ducks/materializeA": { - "sourceCapture": "ducks/capture", + "sourceCapture": { + "capture": "ducks/capture", + "schemaMode": "collectionSchema", + "deltaUpdates": true, + }, "endpoint": { "connector": { "image": "materialize/test:test", @@ -266,6 +214,10 @@ async fn test_source_captures_collection_name() { "pond", a_model.bindings[0].resource.to_value().pointer("/schema").unwrap().as_str().unwrap() ); + assert_eq!( + true, + a_model.bindings[0].resource.to_value().pointer("/delta").unwrap().as_bool().unwrap() + ); assert_eq!( "quacks", a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap() diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 85545018f0..1fdb1a0fff 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -70,7 +70,7 @@ async fn update_live_spec_flows( catalog_type, Some(reads_from.iter().map(|c| c.as_str()).collect::>()).filter(|a| !a.is_empty()), Some(writes_to.iter().map(|c| c.as_str()).collect::>()).filter(|a| !a.is_empty()), - source_capture.as_ref().map(|c| c.as_str()), + source_capture.as_ref().map(|c| c.capture.as_str()), txn, ) .await?; diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 5e63795fba..214ca2fb75 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -1,3 +1,4 @@ +use models::{SourceCaptureDef, SourceCaptureSchemaMode}; use serde_json::Value; /// @@ -5,6 +6,7 @@ use serde_json::Value; /// If the `full_collection_name` doesn't contain any `/` characters, which should never /// be the case since we should have already validated the collection name. pub fn update_materialization_resource_spec( + source_capture: &SourceCaptureDef, resource_spec: &mut Value, resource_spec_pointers: &ResourceSpecPointers, full_collection_name: &str, @@ -31,9 +33,19 @@ pub fn update_materialization_resource_spec( let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into()); - if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { - if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { - let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); + if source_capture.schema_mode == SourceCaptureSchemaMode::CollectionSchema { + if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { + if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { + let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); + } + } + } + + if source_capture.delta_updates { + if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates { + if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) { + let _ = std::mem::replace(x_delta_updates_prev, true.into()); + } } } @@ -43,12 +55,14 @@ pub fn update_materialization_resource_spec( pub struct ResourceSpecPointers { x_collection_name: doc::Pointer, x_schema_name: Option, + x_delta_updates: Option, } /// Runs inference on the given schema and searches for a location within the resource spec -/// that bears the `x-collection-name` and `x-schema-name` annotations. Returns the pointer to those location, or an -/// error if no such location exists. Errors from parsing the schema are returned directly. -/// The schema must be fully self-contained (a.k.a. bundled), or an error will be returned. +/// that bears the `x-collection-name`, `x-schema-name` or `x-delta-updates` annotations. +/// Returns the pointer to those location, or an error if no `x-collection-name` exists. +/// Errors from parsing the schema are returned directly. The schema must be fully self-contained (a.k.a. bundled), +/// or an error will be returned. pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result { // While all known connector resource spec schemas are self-contained, we don't // actually do anything to guarantee that they are. This function may fail in that case. @@ -60,18 +74,22 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result = None; let mut x_schema_name: Option = None; + let mut x_delta_updates: Option = None; for (ptr, _, prop_shape, _) in shape.locations() { if prop_shape.annotations.contains_key("x-collection-name") { x_collection_name = Some(ptr) } else if prop_shape.annotations.contains_key("x-schema-name") { x_schema_name = Some(ptr) + } else if prop_shape.annotations.contains_key("x-delta-updates") { + x_delta_updates = Some(ptr) } } if let Some(x_collection_name_ptr) = x_collection_name { Ok(ResourceSpecPointers { x_collection_name: x_collection_name_ptr, - x_schema_name + x_schema_name, + x_delta_updates, }) } else { Err(anyhow::anyhow!( diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 715970dfd5..e98e0c1b70 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -12,6 +12,7 @@ mod derive_typescript; mod id; mod journals; mod labels; +mod source_capture; mod materializations; mod raw_value; mod references; @@ -25,6 +26,7 @@ pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint}; pub use catalogs::{Capability, Catalog, CatalogType}; pub use collections::{CollectionDef, Projection}; pub use connector::{split_image_tag, ConnectorConfig, LocalConfig}; +pub use source_capture::{SourceCaptureDef, SourceCaptureSchemaMode}; pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef}; pub use derive_sqlite::DeriveUsingSqlite; pub use derive_typescript::DeriveUsingTypescript; @@ -80,7 +82,7 @@ pub trait ModelDef: /// If this spec is a materialization, returns the value of `source_capture`. /// This function is admittedly a little smelly, but it's included in the trait /// so that we can generically get all the dependencies of each spec. - fn materialization_source_capture(&self) -> Option<&Capture> { + fn materialization_source_capture_name(&self) -> Option<&Capture> { None } @@ -89,7 +91,7 @@ pub trait ModelDef: let mut deps: BTreeSet = self.reads_from().into_iter().map(|c| c.into()).collect(); deps.extend(self.writes_to().into_iter().map(|c| c.into())); deps.extend( - self.materialization_source_capture() + self.materialization_source_capture_name() .into_iter() .map(|c| c.to_string()), ); @@ -226,9 +228,9 @@ impl ModelDef for AnySpec { } } - fn materialization_source_capture(&self) -> Option<&Capture> { + fn materialization_source_capture_name(&self) -> Option<&Capture> { match self { - AnySpec::Materialization(m) => m.materialization_source_capture(), + AnySpec::Materialization(m) => m.materialization_source_capture_name(), _ => None, } } diff --git a/crates/models/src/materializations.rs b/crates/models/src/materializations.rs index 15d51fb980..196d23f797 100644 --- a/crates/models/src/materializations.rs +++ b/crates/models/src/materializations.rs @@ -1,7 +1,10 @@ +use crate::Capture; use crate::{source::OnIncompatibleSchemaChange, Collection, Id}; +use crate::source_capture::{SourceCapture, SourceCaptureDef}; + use super::{ - Capture, ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source, + ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -15,7 +18,7 @@ use std::collections::BTreeMap; pub struct MaterializationDef { /// # Automatically materialize new bindings from a named capture #[serde(skip_serializing_if = "Option::is_none")] - pub source_capture: Option, + pub source_capture: Option, /// # Default handling of schema changes that are incompatible with the target resource. /// This can be overridden on a per-binding basis. #[serde( @@ -201,8 +204,12 @@ impl super::ModelDef for MaterializationDef { !self.shards.disable } - fn materialization_source_capture(&self) -> Option<&crate::Capture> { - self.source_capture.as_ref() + fn materialization_source_capture_name(&self) -> Option<&Capture> { + match &self.source_capture { + Some(SourceCapture::Simple(capture_name)) => Some(capture_name), + Some(SourceCapture::Configured(sc)) => Some(&sc.capture), + None => None + } } fn connector_image(&self) -> Option<&str> { diff --git a/crates/models/src/source_capture.rs b/crates/models/src/source_capture.rs new file mode 100644 index 0000000000..97ab630c67 --- /dev/null +++ b/crates/models/src/source_capture.rs @@ -0,0 +1,64 @@ +use super::Capture; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields, rename_all = "camelCase")] +pub enum SourceCaptureSchemaMode { + /// Leave the materialization binding's schema field empty, therefore falling back to the + /// default schema of the materialization + LeaveEmpty, + /// Use the 2nd-to-last component of the collection name as the schema of the materialization + /// binding + CollectionSchema, +} + +impl Default for SourceCaptureSchemaMode { + fn default() -> Self { + SourceCaptureSchemaMode::LeaveEmpty + } +} + +/// SourceCaptureDef specifies configuration for source captures +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq, Default)] +#[serde(deny_unknown_fields, rename_all = "camelCase")] +pub struct SourceCaptureDef { + /// Capture name + pub capture: Capture, + + /// When adding new bindings from a source capture to a materialization, how should the schema + /// of the materialization binding be set + #[serde(default)] + pub schema_mode: SourceCaptureSchemaMode, + + /// When adding new bindings from a source capture to a materialization, should the new + /// bindings be marked as delta updates + #[serde(default, skip_serializing_if = "super::is_false")] + pub delta_updates: bool, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields, rename_all = "camelCase", untagged)] +pub enum SourceCapture { + Simple(Capture), + Configured(SourceCaptureDef), +} + +impl SourceCapture { + pub fn capture_name(&self) -> Capture { + match self { + SourceCapture::Simple(capture) => capture.clone(), + SourceCapture::Configured(sc) => sc.capture.clone(), + } + } + + pub fn def(&self) -> SourceCaptureDef { + match self { + SourceCapture::Simple(capture) => SourceCaptureDef { + capture: capture.clone(), + ..Default::default() + }, + SourceCapture::Configured(sc) => sc.clone(), + } + } +} diff --git a/crates/tables/src/dependencies.rs b/crates/tables/src/dependencies.rs index 69ae912ec6..12024b7b71 100644 --- a/crates/tables/src/dependencies.rs +++ b/crates/tables/src/dependencies.rs @@ -81,7 +81,7 @@ impl<'a> Dependencies<'a> { deps.push(target.as_str()); } - let maybe_source_cap = model.materialization_source_capture(); + let maybe_source_cap = model.materialization_source_capture_name(); if let Some(source_cap) = maybe_source_cap.as_ref() { deps.push(source_cap.as_str()); } diff --git a/crates/tables/src/draft.rs b/crates/tables/src/draft.rs index 138c49108a..a36809ee37 100644 --- a/crates/tables/src/draft.rs +++ b/crates/tables/src/draft.rs @@ -59,7 +59,7 @@ impl DraftCatalog { for target in model.targets() { out.push(target); } - if let Some(cap) = model.materialization_source_capture() { + if let Some(cap) = model.materialization_source_capture_name() { out.push(cap.as_str()); } } From 4b882be759ca9213c52a8615de6ab311b87cc75a Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Tue, 17 Sep 2024 19:29:52 +0100 Subject: [PATCH 04/11] agent: refactoring SourceCapture and SourceCaptureDef --- .../agent/src/controllers/materialization.rs | 12 ++++-------- crates/agent/src/controllers/mod.rs | 6 +----- ...est__materialization-status-round-trip.snap | 16 +--------------- crates/agent/src/publications/specs.rs | 4 ++-- crates/agent/src/resource_configs.rs | 18 ++++++++++++++---- crates/models/src/lib.rs | 6 +++++- crates/models/src/materializations.rs | 2 +- crates/models/src/source_capture.rs | 5 +++-- 8 files changed, 31 insertions(+), 38 deletions(-) diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index a296cf700b..e13d4f0e02 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -9,7 +9,7 @@ use crate::{ }; use anyhow::Context; use itertools::Itertools; -use models::{ModelDef, OnIncompatibleSchemaChange, SourceCaptureDef}; +use models::{ModelDef, OnIncompatibleSchemaChange, SourceCapture}; use proto_flow::materialize::response::validated::constraint::Type as ConstraintType; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -66,10 +66,7 @@ impl MaterializationStatus { dependencies.live.captures.get_by_key(&source_capture.capture_name()) { if self.source_capture.is_none() { - self.source_capture = Some(SourceCaptureStatus { - source_capture: source_capture.def(), - ..Default::default() - }); + self.source_capture = Some(SourceCaptureStatus::default()) } let source_capture_status = self.source_capture.as_mut().unwrap(); // Source capture errors are terminal @@ -293,7 +290,6 @@ fn is_false(b: &bool) -> bool { /// Status information about the `sourceCapture` #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, JsonSchema)] pub struct SourceCaptureStatus { - pub source_capture: SourceCaptureDef, /// Whether the materialization bindings are up-to-date with respect to /// the `sourceCapture` bindings. In normal operation, this should always /// be `true`. Otherwise, there will be a controller `error` and the @@ -375,7 +371,7 @@ impl SourceCaptureStatus { draft_row.is_touch = false; update_linked_materialization( - &self.source_capture, + model.source_capture.as_ref().unwrap(), resource_spec_pointers, &self.add_bindings, draft_row.model.as_mut().unwrap(), @@ -404,7 +400,7 @@ fn get_bindings_to_add( } fn update_linked_materialization( - source_capture: &SourceCaptureDef, + source_capture: &SourceCapture, resource_spec_pointers: ResourceSpecPointers, bindings_to_add: &BTreeSet, materialization: &mut models::MaterializationDef, diff --git a/crates/agent/src/controllers/mod.rs b/crates/agent/src/controllers/mod.rs index 6254592d29..23f51937b9 100644 --- a/crates/agent/src/controllers/mod.rs +++ b/crates/agent/src/controllers/mod.rs @@ -457,7 +457,7 @@ mod test { use std::collections::{BTreeSet, VecDeque}; use chrono::TimeZone; - use models::{SourceCaptureDef, Capture}; + use models::Capture; use super::*; use crate::controllers::materialization::SourceCaptureStatus; @@ -506,10 +506,6 @@ mod test { source_capture: Some(SourceCaptureStatus { up_to_date: false, add_bindings, - source_capture: SourceCaptureDef { - capture: Capture::new("snails/capture"), - ..Default::default() - } }), publications: PublicationStatus { max_observed_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]), diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap index 2dca6f9423..25a66cadc6 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap @@ -1,19 +1,12 @@ --- source: crates/agent/src/controllers/mod.rs -expression: "StatusSnapshot { starting: status, json: as_json, parsed: round_tripped }" +expression: "StatusSnapshot { starting: status, json: as_json, parsed: round_tripped, }" --- StatusSnapshot { starting: Materialization( MaterializationStatus { source_capture: Some( SourceCaptureStatus { - source_capture: SourceCaptureDef { - capture: Capture( - "snails/capture", - ), - schema_mode: LeaveEmpty, - delta_updates: false, - }, up_to_date: false, add_bindings: { Collection( @@ -85,13 +78,6 @@ StatusSnapshot { MaterializationStatus { source_capture: Some( SourceCaptureStatus { - source_capture: SourceCaptureDef { - capture: Capture( - "snails/capture", - ), - schema_mode: LeaveEmpty, - delta_updates: false, - }, up_to_date: false, add_bindings: { Collection( diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 1fdb1a0fff..7a6258985d 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -63,14 +63,14 @@ async fn update_live_spec_flows( let reads_from = model.reads_from(); let writes_to = model.writes_to(); - let source_capture = model.materialization_source_capture(); + let source_capture = model.materialization_source_capture_name(); agent_sql::publications::insert_live_spec_flows( built.control_id().into(), catalog_type, Some(reads_from.iter().map(|c| c.as_str()).collect::>()).filter(|a| !a.is_empty()), Some(writes_to.iter().map(|c| c.as_str()).collect::>()).filter(|a| !a.is_empty()), - source_capture.as_ref().map(|c| c.capture.as_str()), + source_capture.as_ref().map(|c| c.as_str()), txn, ) .await?; diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 214ca2fb75..2c435d3467 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -1,4 +1,4 @@ -use models::{SourceCaptureDef, SourceCaptureSchemaMode}; +use models::{SourceCaptureSchemaMode, SourceCapture, SourceCaptureDef}; use serde_json::Value; /// @@ -6,7 +6,7 @@ use serde_json::Value; /// If the `full_collection_name` doesn't contain any `/` characters, which should never /// be the case since we should have already validated the collection name. pub fn update_materialization_resource_spec( - source_capture: &SourceCaptureDef, + source_capture: &SourceCapture, resource_spec: &mut Value, resource_spec_pointers: &ResourceSpecPointers, full_collection_name: &str, @@ -33,18 +33,28 @@ pub fn update_materialization_resource_spec( let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into()); - if source_capture.schema_mode == SourceCaptureSchemaMode::CollectionSchema { + let source_capture_def = source_capture.to_normalized_def(); + + if source_capture_def.schema_mode == SourceCaptureSchemaMode::CollectionSchema { if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); + } else { + anyhow::bail!( + "cannot create location '{x_schema_name_ptr}' in resource spec '{resource_spec}'" + ); } } } - if source_capture.delta_updates { + if source_capture_def.delta_updates { if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates { if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) { let _ = std::mem::replace(x_delta_updates_prev, true.into()); + } else { + anyhow::bail!( + "cannot create location '{x_delta_updates_ptr}' in resource spec '{resource_spec}'" + ); } } } diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index e98e0c1b70..5cbae6c18d 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -26,7 +26,7 @@ pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint}; pub use catalogs::{Capability, Catalog, CatalogType}; pub use collections::{CollectionDef, Projection}; pub use connector::{split_image_tag, ConnectorConfig, LocalConfig}; -pub use source_capture::{SourceCaptureDef, SourceCaptureSchemaMode}; +pub use source_capture::{SourceCaptureDef, SourceCapture, SourceCaptureSchemaMode}; pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef}; pub use derive_sqlite::DeriveUsingSqlite; pub use derive_typescript::DeriveUsingTypescript; @@ -291,6 +291,10 @@ fn is_false(b: &bool) -> bool { !*b } +fn is_default(b: &D) -> bool { + D::default() == *b +} + fn is_u32_zero(u: &u32) -> bool { *u == 0 } diff --git a/crates/models/src/materializations.rs b/crates/models/src/materializations.rs index 196d23f797..d4e5f7ca8f 100644 --- a/crates/models/src/materializations.rs +++ b/crates/models/src/materializations.rs @@ -1,7 +1,7 @@ use crate::Capture; use crate::{source::OnIncompatibleSchemaChange, Collection, Id}; -use crate::source_capture::{SourceCapture, SourceCaptureDef}; +use crate::source_capture::SourceCapture; use super::{ ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source, diff --git a/crates/models/src/source_capture.rs b/crates/models/src/source_capture.rs index 97ab630c67..3a60528d25 100644 --- a/crates/models/src/source_capture.rs +++ b/crates/models/src/source_capture.rs @@ -28,7 +28,7 @@ pub struct SourceCaptureDef { /// When adding new bindings from a source capture to a materialization, how should the schema /// of the materialization binding be set - #[serde(default)] + #[serde(default, skip_serializing_if = "super::is_default")] pub schema_mode: SourceCaptureSchemaMode, /// When adding new bindings from a source capture to a materialization, should the new @@ -52,7 +52,8 @@ impl SourceCapture { } } - pub fn def(&self) -> SourceCaptureDef { + /// Convert the enum to a normalized SourceCaptureDef by normalizing the Simple case + pub fn to_normalized_def(&self) -> SourceCaptureDef { match self { SourceCapture::Simple(capture) => SourceCaptureDef { capture: capture.clone(), From cee8ef47c373b2d16992b8c35b99129b52cc22ec Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Wed, 18 Sep 2024 17:36:21 +0100 Subject: [PATCH 05/11] agent: rename SourceCapture's schemaMode to targetSchema --- ...controllers__test__status_json_schema.snap | 55 ------------------- .../src/integration_tests/source_captures.rs | 2 +- crates/agent/src/resource_configs.rs | 4 +- crates/models/src/source_capture.rs | 4 +- ...a_generation__catalog_schema_snapshot.snap | 52 +++++++++++++++++- flow.schema.json | 52 +++++++++++++++++- 6 files changed, 107 insertions(+), 62 deletions(-) diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap index 73f14f2baa..26fd7b65f4 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap @@ -33,14 +33,6 @@ expression: schema ], "type": "object" }, - "Capture": { - "description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", - "examples": [ - "acmeCo/capture" - ], - "pattern": "^[\\p{Letter}\\p{Number}\\-_\\.]+(/[\\p{Letter}\\p{Number}\\-_\\.]+)*$", - "type": "string" - }, "Collection": { "description": "Collection names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", "examples": [ @@ -402,47 +394,6 @@ expression: schema ], "type": "object" }, - "SourceCaptureDef": { - "additionalProperties": false, - "description": "SourceCaptureDef specifies configuration for source captures", - "properties": { - "capture": { - "$ref": "#/definitions/Capture", - "description": "Capture name" - }, - "deltaUpdates": { - "description": "When adding new bindings from a source capture to a materialization, should the new bindings be marked as delta updates", - "type": "boolean" - }, - "schemaMode": { - "$ref": "#/definitions/SourceCaptureSchemaMode", - "default": "leaveEmpty", - "description": "When adding new bindings from a source capture to a materialization, how should the schema of the materialization binding be set" - } - }, - "required": [ - "capture" - ], - "type": "object" - }, - "SourceCaptureSchemaMode": { - "oneOf": [ - { - "description": "Leave the materialization binding's schema field empty, therefore falling back to the default schema of the materialization", - "enum": [ - "leaveEmpty" - ], - "type": "string" - }, - { - "description": "Use the 2nd-to-last component of the collection name as the schema of the materialization binding", - "enum": [ - "collectionSchema" - ], - "type": "string" - } - ] - }, "SourceCaptureStatus": { "description": "Status information about the `sourceCapture`", "properties": { @@ -454,17 +405,11 @@ expression: schema "type": "array", "uniqueItems": true }, - "source_capture": { - "$ref": "#/definitions/SourceCaptureDef" - }, "up_to_date": { "description": "Whether the materialization bindings are up-to-date with respect to the `sourceCapture` bindings. In normal operation, this should always be `true`. Otherwise, there will be a controller `error` and the publication status will contain details of why the update failed.", "type": "boolean" } }, - "required": [ - "source_capture" - ], "type": "object" } }, diff --git a/crates/agent/src/integration_tests/source_captures.rs b/crates/agent/src/integration_tests/source_captures.rs index 03826136a3..7b72b0633d 100644 --- a/crates/agent/src/integration_tests/source_captures.rs +++ b/crates/agent/src/integration_tests/source_captures.rs @@ -168,7 +168,7 @@ async fn test_source_captures_collection_name() { "ducks/materializeA": { "sourceCapture": { "capture": "ducks/capture", - "schemaMode": "collectionSchema", + "targetSchema": "fromSourceName", "deltaUpdates": true, }, "endpoint": { diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 2c435d3467..b3f7b0c09e 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -1,4 +1,4 @@ -use models::{SourceCaptureSchemaMode, SourceCapture, SourceCaptureDef}; +use models::{SourceCaptureSchemaMode, SourceCapture}; use serde_json::Value; /// @@ -35,7 +35,7 @@ pub fn update_materialization_resource_spec( let source_capture_def = source_capture.to_normalized_def(); - if source_capture_def.schema_mode == SourceCaptureSchemaMode::CollectionSchema { + if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName { if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); diff --git a/crates/models/src/source_capture.rs b/crates/models/src/source_capture.rs index 3a60528d25..b606a3d7a2 100644 --- a/crates/models/src/source_capture.rs +++ b/crates/models/src/source_capture.rs @@ -10,7 +10,7 @@ pub enum SourceCaptureSchemaMode { LeaveEmpty, /// Use the 2nd-to-last component of the collection name as the schema of the materialization /// binding - CollectionSchema, + FromSourceName, } impl Default for SourceCaptureSchemaMode { @@ -29,7 +29,7 @@ pub struct SourceCaptureDef { /// When adding new bindings from a source capture to a materialization, how should the schema /// of the materialization binding be set #[serde(default, skip_serializing_if = "super::is_default")] - pub schema_mode: SourceCaptureSchemaMode, + pub target_schema: SourceCaptureSchemaMode, /// When adding new bindings from a source capture to a materialization, should the new /// bindings be marked as delta updates diff --git a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap index 2e1f3d771f..5d201e7603 100644 --- a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap +++ b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap @@ -884,7 +884,7 @@ expression: "&schema" }, "sourceCapture": { "title": "Automatically materialize new bindings from a named capture", - "$ref": "#/definitions/Capture" + "$ref": "#/definitions/SourceCapture" } }, "additionalProperties": false @@ -1217,6 +1217,56 @@ expression: "&schema" } ] }, + "SourceCapture": { + "anyOf": [ + { + "$ref": "#/definitions/Capture" + }, + { + "$ref": "#/definitions/SourceCaptureDef" + } + ] + }, + "SourceCaptureDef": { + "description": "SourceCaptureDef specifies configuration for source captures", + "type": "object", + "required": [ + "capture" + ], + "properties": { + "capture": { + "description": "Capture name", + "$ref": "#/definitions/Capture" + }, + "deltaUpdates": { + "description": "When adding new bindings from a source capture to a materialization, should the new bindings be marked as delta updates", + "type": "boolean" + }, + "targetSchema": { + "description": "When adding new bindings from a source capture to a materialization, how should the schema of the materialization binding be set", + "$ref": "#/definitions/SourceCaptureSchemaMode" + } + }, + "additionalProperties": false + }, + "SourceCaptureSchemaMode": { + "oneOf": [ + { + "description": "Leave the materialization binding's schema field empty, therefore falling back to the default schema of the materialization", + "type": "string", + "enum": [ + "leaveEmpty" + ] + }, + { + "description": "Use the 2nd-to-last component of the collection name as the schema of the materialization binding", + "type": "string", + "enum": [ + "fromSourceName" + ] + } + ] + }, "Test": { "description": "Test names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", "examples": [ diff --git a/flow.schema.json b/flow.schema.json index 1c17046035..16c83fbe35 100644 --- a/flow.schema.json +++ b/flow.schema.json @@ -880,7 +880,7 @@ }, "sourceCapture": { "title": "Automatically materialize new bindings from a named capture", - "$ref": "#/definitions/Capture" + "$ref": "#/definitions/SourceCapture" } }, "additionalProperties": false @@ -1213,6 +1213,56 @@ } ] }, + "SourceCapture": { + "anyOf": [ + { + "$ref": "#/definitions/Capture" + }, + { + "$ref": "#/definitions/SourceCaptureDef" + } + ] + }, + "SourceCaptureDef": { + "description": "SourceCaptureDef specifies configuration for source captures", + "type": "object", + "required": [ + "capture" + ], + "properties": { + "capture": { + "description": "Capture name", + "$ref": "#/definitions/Capture" + }, + "deltaUpdates": { + "description": "When adding new bindings from a source capture to a materialization, should the new bindings be marked as delta updates", + "type": "boolean" + }, + "targetSchema": { + "description": "When adding new bindings from a source capture to a materialization, how should the schema of the materialization binding be set", + "$ref": "#/definitions/SourceCaptureSchemaMode" + } + }, + "additionalProperties": false + }, + "SourceCaptureSchemaMode": { + "oneOf": [ + { + "description": "Leave the materialization binding's schema field empty, therefore falling back to the default schema of the materialization", + "type": "string", + "enum": [ + "leaveEmpty" + ] + }, + { + "description": "Use the 2nd-to-last component of the collection name as the schema of the materialization binding", + "type": "string", + "enum": [ + "fromSourceName" + ] + } + ] + }, "Test": { "description": "Test names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", "examples": [ From 9b298a3841fafb3059a7510b00982e165d79a21f Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Tue, 24 Sep 2024 19:58:56 +0100 Subject: [PATCH 06/11] sourceCapture: error when setting targetSchema without x-schema-name --- crates/agent/src/controllers/materialization.rs | 2 +- crates/agent/src/resource_configs.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index e13d4f0e02..dfd6147922 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -66,7 +66,7 @@ impl MaterializationStatus { dependencies.live.captures.get_by_key(&source_capture.capture_name()) { if self.source_capture.is_none() { - self.source_capture = Some(SourceCaptureStatus::default()) + self.source_capture = Some(SourceCaptureStatus::default()); } let source_capture_status = self.source_capture.as_mut().unwrap(); // Source capture errors are terminal diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index b3f7b0c09e..2a337ef417 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -44,6 +44,10 @@ pub fn update_materialization_resource_spec( "cannot create location '{x_schema_name_ptr}' in resource spec '{resource_spec}'" ); } + } else { + anyhow::bail!( + "sourceCapture.targetSchema set on a materialization which does not have x-schema-name annotation" + ); } } @@ -56,6 +60,10 @@ pub fn update_materialization_resource_spec( "cannot create location '{x_delta_updates_ptr}' in resource spec '{resource_spec}'" ); } + } else { + anyhow::bail!( + "sourceCapture.deltaUpdates set on a materialization which does not have x-delta-updates annotation" + ); } } From ea8ddda22b2d7be0a298be9ac18d9f41f6ed7671 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Wed, 25 Sep 2024 19:50:49 +0100 Subject: [PATCH 07/11] agent: validate sourceCapture.{targetSchema,deltaUpdates} on publish --- crates/agent/src/integration_tests/harness.rs | 21 +++++ .../src/integration_tests/source_captures.rs | 87 +++++++++++++++++++ crates/agent/src/publications.rs | 6 +- crates/agent/src/publications/specs.rs | 37 +++++++- crates/agent/src/resource_configs.rs | 6 +- 5 files changed, 152 insertions(+), 5 deletions(-) diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index c6a46b82a5..20ab7b34a8 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -173,6 +173,27 @@ impl TestHarness { '{"type": "success"}' ) on conflict do nothing ), + materialize_tag_no_annotations as ( + insert into connector_tags ( + connector_id, + image_tag, + protocol, + documentation_url, + endpoint_spec_schema, + resource_spec_schema, + resource_path_pointers, + job_status + ) values ( + (select id from materialize_image), + ':test-no-annotation', + 'materialization', + 'http://test.test/', + '{"type": "object"}', + '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string"}, "delta": {"type": "boolean"}}}', + '{/id}', + '{"type": "success"}' + ) on conflict do nothing + ), default_data_plane as ( insert into data_planes ( data_plane_name, diff --git a/crates/agent/src/integration_tests/source_captures.rs b/crates/agent/src/integration_tests/source_captures.rs index 7b72b0633d..27121d3228 100644 --- a/crates/agent/src/integration_tests/source_captures.rs +++ b/crates/agent/src/integration_tests/source_captures.rs @@ -1,4 +1,6 @@ use super::harness::{draft_catalog, TestHarness}; +use models::Id; +use uuid::Uuid; #[tokio::test] #[serial_test::serial] @@ -253,3 +255,88 @@ async fn test_source_captures_collection_name() { ); assert!(no_source_status.source_capture.is_none()); } + +#[tokio::test] +#[serial_test::serial] +async fn test_source_capture_no_annotations() { + let mut harness = TestHarness::init("test_source_capture_no_annotations").await; + let user_id = harness.setup_tenant("sheep").await; + + let draft = draft_catalog(serde_json::json!({ + "collections": { + "ducks/pond/quacks": { + "schema": { + "type": "object", + "properties": { + "id": { "type": "string" } + } + }, + "key": ["/id"] + } + }, + "captures": { + "ducks/capture": { + "endpoint": { + "connector": { + "image": "source/test:test", + "config": {} + } + }, + "bindings": [ + { + "resource": { + "name": "greetings", + "prefix": "Hello {}!" + }, + "target": "ducks/pond/quacks" + } + ] + } + }, + "materializations": { + "ducks/materializeA": { + "sourceCapture": { + "capture": "ducks/capture", + "targetSchema": "fromSourceName", + "deltaUpdates": true, + }, + "endpoint": { + "connector": { + "image": "materialize/test:test-no-annotation", + "config": {} + } + }, + "bindings": [ ] + } + } + })); + let pub_id = Id::new([0, 0, 0, 0, 0, 0, 0, 9]); + let built = harness + .publisher + .build( + user_id, + pub_id, + None, + draft, + Uuid::new_v4(), + "ops/dp/public/test", + ) + .await + .expect("build failed"); + assert!(built.has_errors()); + + let errors = built.errors().collect::>(); + + insta::assert_debug_snapshot!(errors, @r###" + [ + Error { + scope: flow://materialization/ducks/materializeA, + error: sourceCapture.deltaUpdates set but the connector 'materialize/test' does not support delta updates, + }, + Error { + scope: flow://materialization/ducks/materializeA, + error: sourceCapture.targetSchema set but the connector 'materialize/test' does not support resource schemas, + }, + ] + "###); +} diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 06fdb059c4..18f0ef1049 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -219,9 +219,13 @@ impl Publisher { let forbidden_images = specs::check_connector_images(&draft, &self.db) .await .context("checking connector images")?; - if !forbidden_images.is_empty() { + let forbidden_source_capture = specs::check_source_capture_annotations(&draft, &self.db) + .await + .context("checking source capture")?; + if !forbidden_images.is_empty() || !forbidden_source_capture.is_empty() { let mut built = tables::Validations::default(); built.errors = forbidden_images; + built.errors.extend(forbidden_source_capture.into_iter()); let output = build::Output { draft, built, diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 7a6258985d..0ca584097a 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -2,7 +2,7 @@ use super::{LockFailure, UncommittedBuild}; use agent_sql::publications::{LiveRevision, LiveSpecUpdate}; use agent_sql::Capability; use anyhow::Context; -use models::{split_image_tag, Id, ModelDef}; +use models::{split_image_tag, Id, ModelDef, SourceCapture, SourceCaptureSchemaMode}; use serde_json::value::RawValue; use sqlx::types::Uuid; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -300,6 +300,41 @@ async fn update_live_specs( Ok(lock_failures) } +pub async fn check_source_capture_annotations( + draft: &tables::DraftCatalog, + pool: &sqlx::PgPool, +) -> anyhow::Result { + let mut by_image: BTreeMap = BTreeMap::new(); + let mut errors = tables::Errors::default(); + + for materialization in draft.materializations.iter() { + let Some(model) = materialization.model() else { return Ok(errors) }; + let Some(image) = model.connector_image() else { return Ok(errors) }; + let (image_name, image_tag) = split_image_tag(image); + let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { return Ok(errors) }; + let resource_config_schema = connector_spec.resource_config_schema; + + let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?; + + if let Some(SourceCapture::Configured(source_capture_def)) = &model.source_capture { + if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none() { + errors.insert(tables::Error { + scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), + error: anyhow::anyhow!("sourceCapture.deltaUpdates set but the connector '{image_name}' does not support delta updates"), + }); + } + + if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName && resource_spec_pointers.x_schema_name.is_none() { + errors.insert(tables::Error { + scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), + error: anyhow::anyhow!("sourceCapture.targetSchema set but the connector '{image_name}' does not support resource schemas"), + }); + } + } + } + Ok(errors) +} + pub async fn check_connector_images( draft: &tables::DraftCatalog, pool: &sqlx::PgPool, diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 2a337ef417..79bfe639df 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -71,9 +71,9 @@ pub fn update_materialization_resource_spec( } pub struct ResourceSpecPointers { - x_collection_name: doc::Pointer, - x_schema_name: Option, - x_delta_updates: Option, + pub x_collection_name: doc::Pointer, + pub x_schema_name: Option, + pub x_delta_updates: Option, } /// Runs inference on the given schema and searches for a location within the resource spec From 410d4064c052a074c95816a68266a401d74d1ee1 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 26 Sep 2024 15:23:18 +0100 Subject: [PATCH 08/11] agent: validate materialization x-collection-name --- crates/agent/src/connector_tags.rs | 9 +++++++++ crates/agent/src/publications/specs.rs | 13 ++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index cdb13b37b4..9993ed2a29 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -163,6 +163,15 @@ impl TagHandler { tracing::warn!(image = %image_composed, "capture connector spec omits resource_path_pointers"); } + // Validate that there is an x-collection-name annotation in the resource config schema + // of materialization connectors + if proto_type == RuntimeProtocol::Materialize { + if let Err(err) = crate::resource_configs::pointer_for_schema(resource_config_schema.get()) { + tracing::warn!(image = %image_composed, error = %err, "resource schema does not have x-collection-name annotation"); + return Ok((row.tag_id, JobStatus::SpecFailed)); + } + } + // The tag fields may not be updated if the resource_path_pointers have // changed. If that happens, then we bail without making any changes // other than to job_status. diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 0ca584097a..13ff4b82c4 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -304,19 +304,18 @@ pub async fn check_source_capture_annotations( draft: &tables::DraftCatalog, pool: &sqlx::PgPool, ) -> anyhow::Result { - let mut by_image: BTreeMap = BTreeMap::new(); let mut errors = tables::Errors::default(); for materialization in draft.materializations.iter() { let Some(model) = materialization.model() else { return Ok(errors) }; - let Some(image) = model.connector_image() else { return Ok(errors) }; - let (image_name, image_tag) = split_image_tag(image); - let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { return Ok(errors) }; - let resource_config_schema = connector_spec.resource_config_schema; + if let Some(SourceCapture::Configured(source_capture_def)) = &model.source_capture { + let Some(image) = model.connector_image() else { return Ok(errors) }; + let (image_name, image_tag) = split_image_tag(image); + let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { return Ok(errors) }; + let resource_config_schema = connector_spec.resource_config_schema; - let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?; + let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?; - if let Some(SourceCapture::Configured(source_capture_def)) = &model.source_capture { if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none() { errors.insert(tables::Error { scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), From 4a2b7a2f36255847a149a5dfeaad5d5fab5f40ad Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 26 Sep 2024 15:32:54 +0100 Subject: [PATCH 09/11] agent: refactor resource_configs.update_materialization_resource_spec --- crates/agent/src/resource_configs.rs | 36 +++++++++++++--------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 79bfe639df..739c279dab 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -36,35 +36,31 @@ pub fn update_materialization_resource_spec( let source_capture_def = source_capture.to_normalized_def(); if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName { - if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name { - if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) { - let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); - } else { - anyhow::bail!( - "cannot create location '{x_schema_name_ptr}' in resource spec '{resource_spec}'" - ); - } - } else { + let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name else { anyhow::bail!( "sourceCapture.targetSchema set on a materialization which does not have x-schema-name annotation" ); - } + }; + let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) else { + anyhow::bail!( + "cannot create location '{x_schema_name_ptr}' in resource spec '{resource_spec}'" + ); + }; + let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into()); } if source_capture_def.delta_updates { - if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates { - if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) { - let _ = std::mem::replace(x_delta_updates_prev, true.into()); - } else { - anyhow::bail!( - "cannot create location '{x_delta_updates_ptr}' in resource spec '{resource_spec}'" - ); - } - } else { + let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates else { anyhow::bail!( "sourceCapture.deltaUpdates set on a materialization which does not have x-delta-updates annotation" ); - } + }; + let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) else { + anyhow::bail!( + "cannot create location '{x_delta_updates_ptr}' in resource spec '{resource_spec}'" + ); + }; + let _ = std::mem::replace(x_delta_updates_prev, true.into()); } Ok(()) From ea35544d9363207ff33c730e5013a0493f297320 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Fri, 27 Sep 2024 15:13:24 +0100 Subject: [PATCH 10/11] agent: validate existence of connector_tags for sourceCapture --- crates/agent/src/publications/specs.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 13ff4b82c4..621b2913da 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -308,12 +308,23 @@ pub async fn check_source_capture_annotations( for materialization in draft.materializations.iter() { let Some(model) = materialization.model() else { return Ok(errors) }; - if let Some(SourceCapture::Configured(source_capture_def)) = &model.source_capture { - let Some(image) = model.connector_image() else { return Ok(errors) }; - let (image_name, image_tag) = split_image_tag(image); - let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { return Ok(errors) }; + let Some(image) = model.connector_image() else { return Ok(errors) }; + let (image_name, image_tag) = split_image_tag(image); + + let Some(source_capture) = &model.source_capture else { return Ok(errors) }; + + // SourceCaptures require a connector_tags row in any case. To avoid an error down the line + // in the controller we validate that here. This should only happen for test connector + // tags, hence the technical error message + let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { + errors.insert(tables::Error { + scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), + error: anyhow::anyhow!("materializations with a sourceCapture only work for known connector tags. {image} is not known to the control plane"), + }); + return Ok(errors); + }; + if let SourceCapture::Configured(source_capture_def) = source_capture { let resource_config_schema = connector_spec.resource_config_schema; - let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?; if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none() { From cbac742a6c64c9b1620232dfa4919ba4e203778c Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Fri, 27 Sep 2024 15:18:36 +0100 Subject: [PATCH 11/11] agent: fix check_source_capture_annotations multi-materialization --- .../src/integration_tests/dependencies_and_activations.rs | 2 +- crates/agent/src/integration_tests/user_publications.rs | 2 +- crates/agent/src/publications/specs.rs | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/agent/src/integration_tests/dependencies_and_activations.rs b/crates/agent/src/integration_tests/dependencies_and_activations.rs index 01653d962d..08cc9a6e72 100644 --- a/crates/agent/src/integration_tests/dependencies_and_activations.rs +++ b/crates/agent/src/integration_tests/dependencies_and_activations.rs @@ -75,7 +75,7 @@ async fn test_dependencies_and_controllers() { "sourceCapture": "owls/capture", "endpoint": { "connector": { - "image": "ghcr.io/estuary/materialize-postgres:dev", + "image": "materialize/test:test", "config": {} } }, diff --git a/crates/agent/src/integration_tests/user_publications.rs b/crates/agent/src/integration_tests/user_publications.rs index eab61714b5..d3bfe45619 100644 --- a/crates/agent/src/integration_tests/user_publications.rs +++ b/crates/agent/src/integration_tests/user_publications.rs @@ -45,7 +45,7 @@ async fn test_user_publications() { "sourceCapture": "cats/capture", "endpoint": { "connector": { - "image": "ghcr.io/estuary/materialize-postgres:dev", + "image": "materialize/test:test", "config": {} } }, diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 621b2913da..7e0f1b2998 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -307,11 +307,11 @@ pub async fn check_source_capture_annotations( let mut errors = tables::Errors::default(); for materialization in draft.materializations.iter() { - let Some(model) = materialization.model() else { return Ok(errors) }; - let Some(image) = model.connector_image() else { return Ok(errors) }; + let Some(model) = materialization.model() else { continue }; + let Some(image) = model.connector_image() else { continue }; let (image_name, image_tag) = split_image_tag(image); - let Some(source_capture) = &model.source_capture else { return Ok(errors) }; + let Some(source_capture) = &model.source_capture else { continue }; // SourceCaptures require a connector_tags row in any case. To avoid an error down the line // in the controller we validate that here. This should only happen for test connector @@ -321,7 +321,7 @@ pub async fn check_source_capture_annotations( scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), error: anyhow::anyhow!("materializations with a sourceCapture only work for known connector tags. {image} is not known to the control plane"), }); - return Ok(errors); + continue }; if let SourceCapture::Configured(source_capture_def) = source_capture { let resource_config_schema = connector_spec.resource_config_schema;