Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow: x-schema-name annotation for 2nd-to-last resource path #1572

Merged
merged 11 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ impl TagHandler {
tracing::warn!(image = %image_composed, "capture connector spec omits resource_path_pointers");
}

// Validate that there is an x-collection-name annotation in the resource config schema
// of materialization connectors
if proto_type == RuntimeProtocol::Materialize {
if let Err(err) = crate::resource_configs::pointer_for_schema(resource_config_schema.get()) {
tracing::warn!(image = %image_composed, error = %err, "resource schema does not have x-collection-name annotation");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
}

// The tag fields may not be updated if the resource_path_pointers have
// changed. If that happens, then we bail without making any changes
// other than to job_status.
Expand Down
22 changes: 13 additions & 9 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use super::{
use crate::{
controllers::publication_status::PublicationStatus,
publications::{PublicationResult, RejectedField},
resource_configs::ResourceSpecPointers,
};
use anyhow::Context;
use itertools::Itertools;
use models::{ModelDef, OnIncompatibleSchemaChange};
use models::{ModelDef, OnIncompatibleSchemaChange, SourceCapture};
use proto_flow::materialize::response::validated::constraint::Type as ConstraintType;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -58,11 +59,11 @@ impl MaterializationStatus {
}
}

if let Some(source_capture_name) = &model.source_capture {
if let Some(source_capture) = &model.source_capture {
// If the source capture has been deleted, we will have already handled that as a
// part of `handle_deleted_dependencies`.
if let Some(source_capture_model) =
dependencies.live.captures.get_by_key(source_capture_name)
dependencies.live.captures.get_by_key(&source_capture.capture_name())
{
if self.source_capture.is_none() {
self.source_capture = Some(SourceCaptureStatus::default());
Expand Down Expand Up @@ -264,10 +265,10 @@ fn handle_deleted_dependencies(
if drafted
.source_capture
.as_ref()
.map(|sc| deleted.contains(sc.as_str()))
.map(|sc| deleted.contains(sc.capture_name().as_str()))
.unwrap_or(false)
{
let capture_name = drafted.source_capture.take().unwrap();
let capture_name = drafted.source_capture.take().unwrap().capture_name();
source_capture.take();
descriptions.push(format!(
r#"removed sourceCapture: "{capture_name}" because the capture was deleted"#
Expand Down Expand Up @@ -335,7 +336,7 @@ impl SourceCaptureStatus {
.get_connector_spec(config.image.clone())
.await
.context("failed to fetch connector spec")?;
let collection_name_pointer = crate::resource_configs::pointer_for_schema(
let resource_spec_pointers = crate::resource_configs::pointer_for_schema(
connector_spec.resource_config_schema.get(),
)?;

Expand Down Expand Up @@ -370,7 +371,8 @@ impl SourceCaptureStatus {
draft_row.is_touch = false;

update_linked_materialization(
collection_name_pointer,
model.source_capture.as_ref().unwrap(),
resource_spec_pointers,
&self.add_bindings,
draft_row.model.as_mut().unwrap(),
)?;
Expand Down Expand Up @@ -398,15 +400,17 @@ fn get_bindings_to_add(
}

fn update_linked_materialization(
resource_collection_name_ptr: doc::Pointer,
source_capture: &SourceCapture,
resource_spec_pointers: ResourceSpecPointers,
bindings_to_add: &BTreeSet<models::Collection>,
materialization: &mut models::MaterializationDef,
) -> anyhow::Result<()> {
for collection_name in bindings_to_add {
let mut resource_spec = serde_json::json!({});
crate::resource_configs::update_materialization_resource_spec(
source_capture,
&mut resource_spec,
&resource_collection_name_ptr,
&resource_spec_pointers,
&collection_name,
)?;

Expand Down
1 change: 1 addition & 0 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ mod test {
use std::collections::{BTreeSet, VecDeque};

use chrono::TimeZone;
use models::Capture;

use super::*;
use crate::controllers::materialization::SourceCaptureStatus;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: crates/agent/src/controllers/mod.rs
expression: "StatusSnapshot { starting: status, json: as_json, parsed: round_tripped }"
expression: "StatusSnapshot { starting: status, json: as_json, parsed: round_tripped, }"
---
StatusSnapshot {
starting: Materialization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_dependencies_and_controllers() {
"sourceCapture": "owls/capture",
"endpoint": {
"connector": {
"image": "ghcr.io/estuary/materialize-postgres:dev",
"image": "materialize/test:test",
"config": {}
}
},
Expand Down
23 changes: 22 additions & 1 deletion crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,28 @@ impl TestHarness {
'materialization',
'http://test.test/',
'{"type": "object"}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}}}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}, "delta": {"type": "boolean", "x-delta-updates": true}}}',
'{/id}',
'{"type": "success"}'
) on conflict do nothing
),
materialize_tag_no_annotations as (
mdibaiee marked this conversation as resolved.
Show resolved Hide resolved
insert into connector_tags (
connector_id,
image_tag,
protocol,
documentation_url,
endpoint_spec_schema,
resource_spec_schema,
resource_path_pointers,
job_status
) values (
(select id from materialize_image),
':test-no-annotation',
'materialization',
'http://test.test/',
'{"type": "object"}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string"}, "delta": {"type": "boolean"}}}',
'{/id}',
'{"type": "success"}'
) on conflict do nothing
Expand Down
Loading
Loading