Skip to content

Commit

Permalink
agent: validate sourceCapture.{targetSchema,deltaUpdates} on publish
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 25, 2024
1 parent 9b298a3 commit 7d2e5c7
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 6 deletions.
21 changes: 21 additions & 0 deletions crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ impl TestHarness {
'{"type": "success"}'
) on conflict do nothing
),
materialize_tag_no_annotations as (
insert into connector_tags (
connector_id,
image_tag,
protocol,
documentation_url,
endpoint_spec_schema,
resource_spec_schema,
resource_path_pointers,
job_status
) values (
(select id from materialize_image),
':test-no-annotation',
'materialization',
'http://test.test/',
'{"type": "object"}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string"}, "delta": {"type": "boolean"}}}',
'{/id}',
'{"type": "success"}'
) on conflict do nothing
),
default_data_plane as (
insert into data_planes (
data_plane_name,
Expand Down
87 changes: 87 additions & 0 deletions crates/agent/src/integration_tests/source_captures.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::harness::{draft_catalog, TestHarness};
use models::Id;
use uuid::Uuid;

#[tokio::test]
#[serial_test::serial]
Expand Down Expand Up @@ -253,3 +255,88 @@ async fn test_source_captures_collection_name() {
);
assert!(no_source_status.source_capture.is_none());
}

#[tokio::test]
#[serial_test::serial]
async fn test_source_capture_no_annotations() {
let mut harness = TestHarness::init("test_source_capture_no_annotations").await;
let user_id = harness.setup_tenant("sheep").await;

let draft = draft_catalog(serde_json::json!({
"collections": {
"ducks/pond/quacks": {
"schema": {
"type": "object",
"properties": {
"id": { "type": "string" }
}
},
"key": ["/id"]
}
},
"captures": {
"ducks/capture": {
"endpoint": {
"connector": {
"image": "source/test:test",
"config": {}
}
},
"bindings": [
{
"resource": {
"name": "greetings",
"prefix": "Hello {}!"
},
"target": "ducks/pond/quacks"
}
]
}
},
"materializations": {
"ducks/materializeA": {
"sourceCapture": {
"capture": "ducks/capture",
"targetSchema": "fromSourceName",
"deltaUpdates": true,
},
"endpoint": {
"connector": {
"image": "materialize/test:test-no-annotation",
"config": {}
}
},
"bindings": [ ]
}
}
}));
let pub_id = Id::new([0, 0, 0, 0, 0, 0, 0, 9]);
let built = harness
.publisher
.build(
user_id,
pub_id,
None,
draft,
Uuid::new_v4(),
"ops/dp/public/test",
)
.await
.expect("build failed");
assert!(built.has_errors());

let errors = built.errors().collect::<Vec<_>>();

insta::assert_debug_snapshot!(errors, @r###"
[
Error {
scope: flow://materialization/ducks/materializeA,
error: sourceCapture.deltaUpdates set but the connector 'materialize/test' does not support delta updates,
},
Error {
scope: flow://materialization/ducks/materializeA,
error: sourceCapture.targetSchema set but the connector 'materialize/test' does not support resource schemas,
},
]
"###);
}
6 changes: 5 additions & 1 deletion crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ impl Publisher {
let forbidden_images = specs::check_connector_images(&draft, &self.db)
.await
.context("checking connector images")?;
if !forbidden_images.is_empty() {
let forbidden_source_capture = specs::check_source_capture_annotations(&draft, &self.db)
.await
.context("checking source capture")?;
if !forbidden_images.is_empty() || !forbidden_source_capture.is_empty() {
let mut built = tables::Validations::default();
built.errors = forbidden_images;
built.errors.extend(forbidden_source_capture.into_iter());
let output = build::Output {
draft,
built,
Expand Down
48 changes: 47 additions & 1 deletion crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{LockFailure, UncommittedBuild};
use agent_sql::publications::{LiveRevision, LiveSpecUpdate};
use agent_sql::Capability;
use anyhow::Context;
use models::{split_image_tag, Id, ModelDef};
use models::{split_image_tag, Id, ModelDef, SourceCapture, SourceCaptureSchemaMode};
use serde_json::value::RawValue;
use sqlx::types::Uuid;
use std::collections::{BTreeMap, BTreeSet, HashSet};
Expand Down Expand Up @@ -300,6 +300,52 @@ async fn update_live_specs(
Ok(lock_failures)
}

pub async fn check_source_capture_annotations(
draft: &tables::DraftCatalog,
pool: &sqlx::PgPool,
) -> anyhow::Result<tables::Errors> {
let mut by_image: BTreeMap<String, bool> = BTreeMap::new();
let mut errors = tables::Errors::default();

for materialization in draft.materializations.iter() {
let Some(model) = materialization.model() else { return Ok(errors) };
let Some(image) = model.connector_image() else { return Ok(errors) };
let (image_name, image_tag) = split_image_tag(image);
let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else { return Ok(errors) };
let resource_config_schema = connector_spec.resource_config_schema;

let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?;

if let Some(SourceCapture::Configured(source_capture_def)) = &model.source_capture {
if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none() {
errors.insert(tables::Error {
scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()),
error: anyhow::anyhow!("sourceCapture.deltaUpdates set but the connector '{image_name}' does not support delta updates"),
});
}

if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName && resource_spec_pointers.x_schema_name.is_none() {
errors.insert(tables::Error {
scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()),
error: anyhow::anyhow!("sourceCapture.targetSchema set but the connector '{image_name}' does not support resource schemas"),
});
}
}
let Some(err) = check_connector_image(
materialization.materialization.as_str(),
materialization.model(),
&mut by_image,
pool,
)
.await?
else {
continue;
};
errors.insert(err);
}
Ok(errors)
}

pub async fn check_connector_images(
draft: &tables::DraftCatalog,
pool: &sqlx::PgPool,
Expand Down
6 changes: 3 additions & 3 deletions crates/agent/src/resource_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ pub fn update_materialization_resource_spec(
}

pub struct ResourceSpecPointers {
x_collection_name: doc::Pointer,
x_schema_name: Option<doc::Pointer>,
x_delta_updates: Option<doc::Pointer>,
pub x_collection_name: doc::Pointer,
pub x_schema_name: Option<doc::Pointer>,
pub x_delta_updates: Option<doc::Pointer>,
}

/// Runs inference on the given schema and searches for a location within the resource spec
Expand Down
2 changes: 1 addition & 1 deletion go.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

PROFILE="${PROFILE:-release}"

export CGO_LDFLAGS="-L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE} -L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE}/librocksdb-exp -lbindings -lrocksdb -lsnappy -lstdc++ -lssl -lcrypto -ldl -lm"
export CGO_LDFLAGS="-L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE} -L $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE}/librocksdb-exp -lbindings -lrocksdb -lsnappy -lstdc++ -ldl -lm"
if [ "$(uname)" == "Darwin" ]; then
export CGO_CFLAGS="-I $(pwd)/target/${CARGO_BUILD_TARGET}/${PROFILE}/librocksdb-exp/include -I $(brew --prefix)/include -I $(brew --prefix)/opt/sqlite3/include"
export CC="$(brew --prefix)/opt/llvm/bin/clang"
Expand Down

0 comments on commit 7d2e5c7

Please sign in to comment.