From 45643e3e6d34aafe4a96464ee73f847054ddba30 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Wed, 25 Sep 2024 19:50:49 +0100 Subject: [PATCH] agent: validate sourceCapture.{targetSchema,deltaUpdates} on publish --- crates/agent/src/integration_tests/harness.rs | 21 +++++ .../src/integration_tests/source_captures.rs | 87 +++++++++++++++++++ crates/agent/src/publications.rs | 6 +- crates/agent/src/publications/specs.rs | 37 +++++++- crates/agent/src/resource_configs.rs | 6 +- go.sh | 2 +- 6 files changed, 153 insertions(+), 6 deletions(-) diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index c6a46b82a5..20ab7b34a8 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -173,6 +173,27 @@ impl TestHarness { '{"type": "success"}' ) on conflict do nothing ), + materialize_tag_no_annotations as ( + insert into connector_tags ( + connector_id, + image_tag, + protocol, + documentation_url, + endpoint_spec_schema, + resource_spec_schema, + resource_path_pointers, + job_status + ) values ( + (select id from materialize_image), + ':test-no-annotation', + 'materialization', + 'http://test.test/', + '{"type": "object"}', + '{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string"}, "delta": {"type": "boolean"}}}', + '{/id}', + '{"type": "success"}' + ) on conflict do nothing + ), default_data_plane as ( insert into data_planes ( data_plane_name, diff --git a/crates/agent/src/integration_tests/source_captures.rs b/crates/agent/src/integration_tests/source_captures.rs index 7b72b0633d..27121d3228 100644 --- a/crates/agent/src/integration_tests/source_captures.rs +++ b/crates/agent/src/integration_tests/source_captures.rs @@ -1,4 +1,6 @@ use super::harness::{draft_catalog, TestHarness}; +use models::Id; +use uuid::Uuid; #[tokio::test] #[serial_test::serial] @@ -253,3 +255,88 @@ async fn test_source_captures_collection_name() { ); assert!(no_source_status.source_capture.is_none()); } + +#[tokio::test] +#[serial_test::serial] +async fn test_source_capture_no_annotations() { + let mut harness = TestHarness::init("test_source_capture_no_annotations").await; + let user_id = harness.setup_tenant("sheep").await; + + let draft = draft_catalog(serde_json::json!({ + "collections": { + "ducks/pond/quacks": { + "schema": { + "type": "object", + "properties": { + "id": { "type": "string" } + } + }, + "key": ["/id"] + } + }, + "captures": { + "ducks/capture": { + "endpoint": { + "connector": { + "image": "source/test:test", + "config": {} + } + }, + "bindings": [ + { + "resource": { + "name": "greetings", + "prefix": "Hello {}!" + }, + "target": "ducks/pond/quacks" + } + ] + } + }, + "materializations": { + "ducks/materializeA": { + "sourceCapture": { + "capture": "ducks/capture", + "targetSchema": "fromSourceName", + "deltaUpdates": true, + }, + "endpoint": { + "connector": { + "image": "materialize/test:test-no-annotation", + "config": {} + } + }, + "bindings": [ ] + } + } + })); + let pub_id = Id::new([0, 0, 0, 0, 0, 0, 0, 9]); + let built = harness + .publisher + .build( + user_id, + pub_id, + None, + draft, + Uuid::new_v4(), + "ops/dp/public/test", + ) + .await + .expect("build failed"); + assert!(built.has_errors()); + + let errors = built.errors().collect::>(); + + insta::assert_debug_snapshot!(errors, @r###" + [ + Error { + scope: flow://materialization/ducks/materializeA, + error: sourceCapture.deltaUpdates set but the connector 'materialize/test' does not support delta updates, + }, + Error { + scope: flow://materialization/ducks/materializeA, + error: sourceCapture.targetSchema set but the connector 'materialize/test' does not support resource schemas, + }, + ] + "###); +} diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 06fdb059c4..18f0ef1049 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -219,9 +219,13 @@ impl Publisher { let forbidden_images = specs::check_connector_images(&draft, &self.db) .await .context("checking connector images")?; - if !forbidden_images.is_empty() { + let forbidden_source_capture = specs::check_source_capture_annotations(&draft, &self.db) + .await + .context("checking source capture")?; + if !forbidden_images.is_empty() || !forbidden_source_capture.is_empty() { let mut built = tables::Validations::default(); built.errors = forbidden_images; + built.errors.extend(forbidden_source_capture.into_iter()); let output = build::Output { draft, built, diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 7a6258985d..0ca584097a 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -2,7 +2,7 @@ use super::{LockFailure, UncommittedBuild}; use agent_sql::publications::{LiveRevision, LiveSpecUpdate}; use agent_sql::Capability; use anyhow::Context; -use models::{split_image_tag, Id, ModelDef}; +use models::{split_image_tag, Id, ModelDef, SourceCapture, SourceCaptureSchemaMode}; use serde_json::value::RawValue; use sqlx::types::Uuid; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -300,6 +300,41 @@ async fn update_live_specs( Ok(lock_failures) } +pub async fn check_source_capture_annotations( + draft: &tables::DraftCatalog, + pool: &sqlx::PgPool, +) -> anyhow::Result { + let mut by_image: BTreeMap = BTreeMap::new(); + let mut errors = tables::Errors::default(); + + for materialization in draft.materializations.iter() { + let Some(model) = materialization.model() else { return Ok(errors) }; + let Some(image) = model.connector_image() else { return Ok(errors) }; + let (image_name, image_tag) = split_image_tag(image); + let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { return Ok(errors) }; + let resource_config_schema = connector_spec.resource_config_schema; + + let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?; + + if let Some(SourceCapture::Configured(source_capture_def)) = &model.source_capture { + if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none() { + errors.insert(tables::Error { + scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), + error: anyhow::anyhow!("sourceCapture.deltaUpdates set but the connector '{image_name}' does not support delta updates"), + }); + } + + if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName && resource_spec_pointers.x_schema_name.is_none() { + errors.insert(tables::Error { + scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()), + error: anyhow::anyhow!("sourceCapture.targetSchema set but the connector '{image_name}' does not support resource schemas"), + }); + } + } + } + Ok(errors) +} + pub async fn check_connector_images( draft: &tables::DraftCatalog, pool: &sqlx::PgPool, diff --git a/crates/agent/src/resource_configs.rs b/crates/agent/src/resource_configs.rs index 2a337ef417..79bfe639df 100644 --- a/crates/agent/src/resource_configs.rs +++ b/crates/agent/src/resource_configs.rs @@ -71,9 +71,9 @@ pub fn update_materialization_resource_spec( } pub struct ResourceSpecPointers { - x_collection_name: doc::Pointer, - x_schema_name: Option, - x_delta_updates: Option, + pub x_collection_name: doc::Pointer, + pub x_schema_name: Option, + pub x_delta_updates: Option, } /// Runs inference on the given schema and searches for a location within the resource spec diff --git a/go.sh b/go.sh index 01c1a97c2a..b3c9e12cf8 100755 --- a/go.sh +++ b/go.sh @@ -2,7 +2,7 @@ PROFILE="${PROFILE:-release}" -export CGO_LDFLAGS="-L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE} -L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE}/librocksdb-exp -lbindings -lrocksdb -lsnappy -lstdc++ -lssl -lcrypto -ldl -lm" +export CGO_LDFLAGS="-L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE} -L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE}/librocksdb-exp -lbindings -lrocksdb -lsnappy -lstdc++ -ldl -lm" if [ "$(uname)" == "Darwin" ]; then export CGO_CFLAGS="-I $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE}/librocksdb-exp/include -I $(brew --prefix)/include -I $(brew --prefix)/opt/sqlite3/include" export CC="$(brew --prefix)/opt/llvm/bin/clang"