Skip to content

Commit

Permalink
agent: sourceCapture configuration for schema and delta updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 5, 2024
1 parent d11ea66 commit c00ffa6
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 26 deletions.
19 changes: 13 additions & 6 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use anyhow::Context;
use itertools::Itertools;
use models::{ModelDef, OnIncompatibleSchemaChange};
use models::{ModelDef, OnIncompatibleSchemaChange, SourceCaptureDef};
use proto_flow::materialize::response::validated::constraint::Type as ConstraintType;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -57,14 +57,17 @@ impl MaterializationStatus {
}
}

if let Some(source_capture_name) = &model.source_capture {
if let Some(source_capture) = &model.source_capture {
// If the source capture has been deleted, we will have already handled that as a
// part of `handle_deleted_dependencies`.
if let Some(source_capture_model) =
dependencies.live.captures.get_by_key(source_capture_name)
dependencies.live.captures.get_by_key(&source_capture.capture_name())
{
if self.source_capture.is_none() {
self.source_capture = Some(SourceCaptureStatus::default());
self.source_capture = Some(SourceCaptureStatus {
source_capture: source_capture.def(),
..Default::default()
});
}
let source_capture_status = self.source_capture.as_mut().unwrap();
// Source capture errors are terminal
Expand Down Expand Up @@ -259,10 +262,10 @@ fn handle_deleted_dependencies(
if drafted
.source_capture
.as_ref()
.map(|sc| deleted.contains(sc.as_str()))
.map(|sc| deleted.contains(sc.capture_name().as_str()))
.unwrap_or(false)
{
let capture_name = drafted.source_capture.take().unwrap();
let capture_name = drafted.source_capture.take().unwrap().capture_name();
source_capture.take();
descriptions.push(format!(
r#"removed sourceCapture: "{capture_name}" because the capture was deleted"#
Expand All @@ -284,6 +287,7 @@ fn is_false(b: &bool) -> bool {
/// Status information about the `sourceCapture`
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, JsonSchema)]
pub struct SourceCaptureStatus {
pub source_capture: SourceCaptureDef,
/// Whether the materialization bindings are up-to-date with respect to
/// the `sourceCapture` bindings. In normal operation, this should always
/// be `true`. Otherwise, there will be a controller `error` and the
Expand Down Expand Up @@ -363,6 +367,7 @@ impl SourceCaptureStatus {

// Failures here are terminal
update_linked_materialization(
&self.source_capture,
resource_spec_pointers,
&self.add_bindings,
draft_row.model.as_mut().unwrap(),
Expand Down Expand Up @@ -391,13 +396,15 @@ fn get_bindings_to_add(
}

fn update_linked_materialization(
source_capture: &SourceCaptureDef,
resource_spec_pointers: ResourceSpecPointers,
bindings_to_add: &BTreeSet<models::Collection>,
materialization: &mut models::MaterializationDef,
) -> anyhow::Result<()> {
for collection_name in bindings_to_add {
let mut resource_spec = serde_json::json!({});
crate::resource_configs::update_materialization_resource_spec(
source_capture,
&mut resource_spec,
&resource_spec_pointers,
&collection_name,
Expand Down
5 changes: 5 additions & 0 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ mod test {
use std::collections::{BTreeSet, VecDeque};

use chrono::TimeZone;
use models::{SourceCaptureDef, Capture};

use super::*;
use crate::controllers::materialization::SourceCaptureStatus;
Expand Down Expand Up @@ -508,6 +509,10 @@ mod test {
source_capture: Some(SourceCaptureStatus {
up_to_date: false,
add_bindings,
source_capture: SourceCaptureDef {
capture: Capture::new("snails/capture"),
..Default::default()
}
}),
publications: PublicationStatus {
target_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ StatusSnapshot {
MaterializationStatus {
source_capture: Some(
SourceCaptureStatus {
source_capture: SourceCaptureDef {
capture: Capture(
"snails/capture",
),
schema_mode: LeaveEmpty,
delta_updates: false,
},
up_to_date: false,
add_bindings: {
Collection(
Expand Down Expand Up @@ -69,11 +76,18 @@ StatusSnapshot {
},
},
),
json: "{\n \"type\": \"Materialization\",\n \"source_capture\": {\n \"add_bindings\": [\n \"snails/shells\"\n ]\n },\n \"publications\": {\n \"target_pub_id\": \"0102030405060708\",\n \"max_observed_pub_id\": \"0102030405060708\",\n \"history\": [\n {\n \"id\": \"0403020101020304\",\n \"created\": \"2024-05-30T09:10:11Z\",\n \"completed\": \"2024-05-30T09:10:11Z\",\n \"detail\": \"some detail\",\n \"result\": {\n \"type\": \"buildFailed\",\n \"incompatible_collections\": [\n {\n \"collection\": \"snails/water\",\n \"affected_materializations\": [\n {\n \"name\": \"snails/materialize\",\n \"fields\": [\n {\n \"field\": \"a_field\",\n \"reason\": \"do not like\"\n }\n ]\n }\n ]\n }\n ]\n },\n \"errors\": [\n {\n \"catalog_name\": \"snails/shells\",\n \"scope\": \"flow://materializations/snails/shells\",\n \"detail\": \"a_field simply cannot be tolerated\"\n }\n ]\n }\n ]\n },\n \"activation\": {\n \"last_activated\": \"0102030404030201\"\n }\n}",
json: "{\n \"type\": \"Materialization\",\n \"source_capture\": {\n \"source_capture\": {\n \"capture\": \"snails/capture\",\n \"schemaMode\": \"leaveEmpty\"\n },\n \"add_bindings\": [\n \"snails/shells\"\n ]\n },\n \"publications\": {\n \"target_pub_id\": \"0102030405060708\",\n \"max_observed_pub_id\": \"0102030405060708\",\n \"history\": [\n {\n \"id\": \"0403020101020304\",\n \"created\": \"2024-05-30T09:10:11Z\",\n \"completed\": \"2024-05-30T09:10:11Z\",\n \"detail\": \"some detail\",\n \"result\": {\n \"type\": \"buildFailed\",\n \"incompatible_collections\": [\n {\n \"collection\": \"snails/water\",\n \"affected_materializations\": [\n {\n \"name\": \"snails/materialize\",\n \"fields\": [\n {\n \"field\": \"a_field\",\n \"reason\": \"do not like\"\n }\n ]\n }\n ]\n }\n ]\n },\n \"errors\": [\n {\n \"catalog_name\": \"snails/shells\",\n \"scope\": \"flow://materializations/snails/shells\",\n \"detail\": \"a_field simply cannot be tolerated\"\n }\n ]\n }\n ]\n },\n \"activation\": {\n \"last_activated\": \"0102030404030201\"\n }\n}",
parsed: Materialization(
MaterializationStatus {
source_capture: Some(
SourceCaptureStatus {
source_capture: SourceCaptureDef {
capture: Capture(
"snails/capture",
),
schema_mode: LeaveEmpty,
delta_updates: false,
},
up_to_date: false,
add_bindings: {
Collection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ expression: schema
],
"type": "object"
},
"Capture": {
"description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.",
"examples": [
"acmeCo/capture"
],
"pattern": "^[\\p{Letter}\\p{Number}\\-_\\.]+(/[\\p{Letter}\\p{Number}\\-_\\.]+)*$",
"type": "string"
},
"Collection": {
"description": "Collection names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.",
"examples": [
Expand Down Expand Up @@ -364,6 +372,47 @@ expression: schema
],
"type": "object"
},
"SourceCaptureDef": {
"additionalProperties": false,
"description": "SourceCaptureDef specifies configuration for source captures",
"properties": {
"capture": {
"$ref": "#/definitions/Capture",
"description": "Capture name"
},
"deltaUpdates": {
"description": "When adding new bindings from a source capture to a materialization, should the new bindings be marked as delta updates",
"type": "boolean"
},
"schemaMode": {
"$ref": "#/definitions/SourceCaptureSchemaMode",
"default": "leaveEmpty",
"description": "When adding new bindings from a source capture to a materialization, how should the schema of the materialization binding be set"
}
},
"required": [
"capture"
],
"type": "object"
},
"SourceCaptureSchemaMode": {
"oneOf": [
{
"description": "Leave the materialization binding's schema field empty, therefore falling back to the default schema of the materialization",
"enum": [
"leaveEmpty"
],
"type": "string"
},
{
"description": "Use the 2nd-to-last component of the collection name as the schema of the materialization binding",
"enum": [
"collectionSchema"
],
"type": "string"
}
]
},
"SourceCaptureStatus": {
"description": "Status information about the `sourceCapture`",
"properties": {
Expand All @@ -375,11 +424,17 @@ expression: schema
"type": "array",
"uniqueItems": true
},
"source_capture": {
"$ref": "#/definitions/SourceCaptureDef"
},
"up_to_date": {
"description": "Whether the materialization bindings are up-to-date with respect to the `sourceCapture` bindings. In normal operation, this should always be `true`. Otherwise, there will be a controller `error` and the publication status will contain details of why the update failed.",
"type": "boolean"
}
},
"required": [
"source_capture"
],
"type": "object"
}
},
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl TestHarness {
'materialization',
'http://test.test/',
'{"type": "object"}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}}}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}, "delta": {"type": "boolean", "x-delta-updates": true}}}',
'{/id}',
'{"type": "success"}'
) on conflict do nothing
Expand Down
28 changes: 25 additions & 3 deletions crates/agent/src/integration_tests/source_captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn test_source_captures() {

let draft = draft_catalog(serde_json::json!({
"collections": {
"ducks/quacks": {
"ducks/pond/quacks": {
"schema": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -77,9 +77,23 @@ async fn test_source_captures() {
.unwrap();
assert_eq!(1, a_model.bindings.len());
assert_eq!(
"ducks/quacks",
"ducks/pond/quacks",
a_model.bindings[0].source.collection().as_str()
);
// Schema mode not set, so we expect schema to be left empty
assert_eq!(
None,
a_model.bindings[0].resource.to_value().pointer("/schema")
);
// Delta updates not set, so we expect delta to be left empty
assert_eq!(
None,
a_model.bindings[0].resource.to_value().pointer("/delta")
);
assert_eq!(
"quacks",
a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap()
);
let a_status = a_state.current_status.unwrap_materialization();
assert!(a_status.source_capture.as_ref().unwrap().up_to_date);
assert!(a_status
Expand Down Expand Up @@ -152,7 +166,11 @@ async fn test_source_captures_collection_name() {
},
"materializations": {
"ducks/materializeA": {
"sourceCapture": "ducks/capture",
"sourceCapture": {
"capture": "ducks/capture",
"schemaMode": "collectionSchema",
"deltaUpdates": true,
},
"endpoint": {
"connector": {
"image": "materialize/test:test",
Expand Down Expand Up @@ -196,6 +214,10 @@ async fn test_source_captures_collection_name() {
"pond",
a_model.bindings[0].resource.to_value().pointer("/schema").unwrap().as_str().unwrap()
);
assert_eq!(
true,
a_model.bindings[0].resource.to_value().pointer("/delta").unwrap().as_bool().unwrap()
);
assert_eq!(
"quacks",
a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap()
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn update_live_spec_flows<M: ModelDef>(
catalog_type,
Some(reads_from.iter().map(|c| c.as_str()).collect::<Vec<_>>()).filter(|a| !a.is_empty()),
Some(writes_to.iter().map(|c| c.as_str()).collect::<Vec<_>>()).filter(|a| !a.is_empty()),
source_capture.as_ref().map(|c| c.as_str()),
source_capture.as_ref().map(|c| c.capture.as_str()),
txn,
)
.await?;
Expand Down
32 changes: 25 additions & 7 deletions crates/agent/src/resource_configs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use models::{SourceCaptureDef, SourceCaptureSchemaMode};
use serde_json::Value;

///
/// # Panics
/// If the `full_collection_name` doesn't contain any `/` characters, which should never
/// be the case since we should have already validated the collection name.
pub fn update_materialization_resource_spec(
source_capture: &SourceCaptureDef,
resource_spec: &mut Value,
resource_spec_pointers: &ResourceSpecPointers,
full_collection_name: &str,
Expand All @@ -31,9 +33,19 @@ pub fn update_materialization_resource_spec(

let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into());

if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name {
if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into());
if source_capture.schema_mode == SourceCaptureSchemaMode::CollectionSchema {
if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name {
if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into());
}
}
}

if source_capture.delta_updates {
if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates {
if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_delta_updates_prev, true.into());
}
}
}

Expand All @@ -43,12 +55,14 @@ pub fn update_materialization_resource_spec(
pub struct ResourceSpecPointers {
x_collection_name: doc::Pointer,
x_schema_name: Option<doc::Pointer>,
x_delta_updates: Option<doc::Pointer>,
}

/// Runs inference on the given schema and searches for a location within the resource spec
/// that bears the `x-collection-name` and `x-schema-name` annotations. Returns the pointer to those location, or an
/// error if no such location exists. Errors from parsing the schema are returned directly.
/// The schema must be fully self-contained (a.k.a. bundled), or an error will be returned.
/// that bears the `x-collection-name`, `x-schema-name` or `x-delta-updates` annotations.
/// Returns the pointer to those location, or an error if no `x-collection-name` exists.
/// Errors from parsing the schema are returned directly. The schema must be fully self-contained (a.k.a. bundled),
/// or an error will be returned.
pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result<ResourceSpecPointers> {
// While all known connector resource spec schemas are self-contained, we don't
// actually do anything to guarantee that they are. This function may fail in that case.
Expand All @@ -60,18 +74,22 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result<ResourceSpecPoint

let mut x_collection_name: Option<doc::Pointer> = None;
let mut x_schema_name: Option<doc::Pointer> = None;
let mut x_delta_updates: Option<doc::Pointer> = None;
for (ptr, _, prop_shape, _) in shape.locations() {
if prop_shape.annotations.contains_key("x-collection-name") {
x_collection_name = Some(ptr)
} else if prop_shape.annotations.contains_key("x-schema-name") {
x_schema_name = Some(ptr)
} else if prop_shape.annotations.contains_key("x-delta-updates") {
x_delta_updates = Some(ptr)
}
}

if let Some(x_collection_name_ptr) = x_collection_name {
Ok(ResourceSpecPointers {
x_collection_name: x_collection_name_ptr,
x_schema_name
x_schema_name,
x_delta_updates,
})
} else {
Err(anyhow::anyhow!(
Expand Down
Loading

0 comments on commit c00ffa6

Please sign in to comment.