Skip to content

Commit

Permalink
flow: x-schema-name annotation for 2nd-to-last resource path
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Aug 27, 2024
1 parent 1331f93 commit 5afcb73
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{
use crate::{
controllers::publication_status::PublicationStatus,
publications::{PublicationResult, RejectedField},
resource_configs::ResourceSpecPointers,
};
use anyhow::Context;
use itertools::Itertools;
Expand Down Expand Up @@ -329,7 +330,7 @@ impl SourceCaptureStatus {
.get_connector_spec(config.image.clone())
.await
.context("failed to fetch connector spec")?;
let collection_name_pointer = crate::resource_configs::pointer_for_schema(
let resource_spec_pointers = crate::resource_configs::pointer_for_schema(
connector_spec.resource_config_schema.get(),
)?;

Expand Down Expand Up @@ -362,7 +363,7 @@ impl SourceCaptureStatus {

// Failures here are terminal
update_linked_materialization(
collection_name_pointer,
resource_spec_pointers,
&self.add_bindings,
draft_row.model.as_mut().unwrap(),
)?;
Expand Down Expand Up @@ -390,15 +391,15 @@ fn get_bindings_to_add(
}

fn update_linked_materialization(
resource_collection_name_ptr: doc::Pointer,
resource_spec_pointers: ResourceSpecPointers,
bindings_to_add: &BTreeSet<models::Collection>,
materialization: &mut models::MaterializationDef,
) -> anyhow::Result<()> {
for collection_name in bindings_to_add {
let mut resource_spec = serde_json::json!({});
crate::resource_configs::update_materialization_resource_spec(
&mut resource_spec,
&resource_collection_name_ptr,
&resource_spec_pointers,
&collection_name,
)?;

Expand Down
8 changes: 7 additions & 1 deletion crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl TestHarness {
'materialization',
'http://test.test/',
'{"type": "object"}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}}}',
'{"type": "object", "properties": {"id": {"type": "string", "x-collection-name": true}, "schema": {"type": "string", "x-schema-name": true}}}',
'{/id}',
'{"type": "success"}'
) on conflict do nothing
Expand Down Expand Up @@ -236,6 +236,12 @@ impl TestHarness {
),
del_daily_stats as (
delete from catalog_stats_daily
),
del_connectors as (
delete from connectors
),
del_connector_tags as (
delete from connector_tags
)
delete from catalog_stats_monthly;"#,
system_user_id
Expand Down
120 changes: 120 additions & 0 deletions crates/agent/src/integration_tests/source_captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,123 @@ async fn test_source_captures() {
);
assert!(no_source_status.source_capture.is_none());
}

#[tokio::test]
#[serial_test::serial]
async fn test_source_captures_collection_name() {
let mut harness = TestHarness::init("test_source_captures_collection_name").await;

let user_id = harness.setup_tenant("ducks").await;

let draft = draft_catalog(serde_json::json!({
"collections": {
"ducks/pond/quacks": {
"schema": {
"type": "object",
"properties": {
"id": { "type": "string" }
}
},
"key": ["/id"]
}
},
"captures": {
"ducks/capture": {
"endpoint": {
"connector": {
"image": "source/test:test",
"config": {}
}
},
"bindings": [
{
"resource": {
"name": "greetings",
"prefix": "Hello {}!"
},
"target": "ducks/pond/quacks"
}
]
}
},
"materializations": {
"ducks/materializeA": {
"sourceCapture": "ducks/capture",
"endpoint": {
"connector": {
"image": "materialize/test:test",
"config": {}
}
},
"bindings": [ ]
},
"ducks/materializeNoSource": {
"sourceCapture": "ducks/notARealCapture",
"endpoint": {
"connector": {
"image": "materialize/test:test",
"config": {}
}
},
"bindings": [ ]
}
}
}));

let result = harness
.user_publication(user_id, "test sourceCapture", draft)
.await;
assert!(result.status.is_success());

harness.run_pending_controllers(None).await;
let a_state = harness.get_controller_state("ducks/materializeA").await;
let a_model = a_state
.live_spec
.as_ref()
.unwrap()
.as_materialization()
.unwrap();
assert_eq!(1, a_model.bindings.len());
assert_eq!(
"ducks/pond/quacks",
a_model.bindings[0].source.collection().as_str()
);
assert_eq!(
"pond",
a_model.bindings[0].resource.to_value().pointer("/schema").unwrap().as_str().unwrap()
);
assert_eq!(
"quacks",
a_model.bindings[0].resource.to_value().pointer("/id").unwrap().as_str().unwrap()
);
let a_status = a_state.current_status.unwrap_materialization();
assert!(a_status.source_capture.as_ref().unwrap().up_to_date);
assert!(a_status
.source_capture
.as_ref()
.unwrap()
.add_bindings
.is_empty());
assert_eq!(
Some("adding binding(s) to match the sourceCapture: [ducks/pond/quacks]"),
a_status.publications.history[0].detail.as_deref()
);

let no_source_state = harness
.get_controller_state("ducks/materializeNoSource")
.await;
let no_source_model = no_source_state
.live_spec
.as_ref()
.unwrap()
.as_materialization()
.unwrap();
assert!(no_source_model.bindings.is_empty());
assert!(no_source_model.source_capture.is_none());
let no_source_status = no_source_state.current_status.unwrap_materialization();
assert_eq!(
Some("in response to publication of one or more depencencies, removed sourceCapture: \"ducks/notARealCapture\" because the capture was deleted"),
no_source_status.publications.history[0].detail.as_deref()
);
assert!(no_source_status.source_capture.is_none());
}
65 changes: 49 additions & 16 deletions crates/agent/src/resource_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,50 @@ use serde_json::Value;
/// be the case since we should have already validated the collection name.
pub fn update_materialization_resource_spec(
resource_spec: &mut Value,
collection_name_ptr: &doc::Pointer,
resource_spec_pointers: &ResourceSpecPointers,
full_collection_name: &str,
) -> anyhow::Result<Value> {
let resource_name = full_collection_name
.rsplit_once('/')
.expect("collection name is invalid (does not contain '/')")
.1
.to_owned();

let Some(prev) = collection_name_ptr.create_value(resource_spec) else {
) -> anyhow::Result<()> {
let split: Vec<&str> = full_collection_name
.rsplit('/')
.take(2)
.collect();

if split.len() < 2 {
return Err(anyhow::anyhow!("collection name is invalid (does not contain '/')"))
}

let collection_name = split[0];
let task_name = split[1];

let collection_name_ptr = &resource_spec_pointers.collection_name;

let Some(collection_name_prev) = collection_name_ptr.create_value(resource_spec) else {
anyhow::bail!(
"cannot create location '{collection_name_ptr}' in resource spec '{resource_spec}'"
);
};

Ok(std::mem::replace(prev, resource_name.into()))
let _ = std::mem::replace(collection_name_prev, collection_name.into());

if let Some(task_name_ptr) = &resource_spec_pointers.task_name {
if let Some(task_name_prev) = task_name_ptr.create_value(resource_spec) {
let _ = std::mem::replace(task_name_prev, task_name.into());
}
}

Ok(())
}

pub struct ResourceSpecPointers {
collection_name: doc::Pointer,
task_name: Option<doc::Pointer>,
}

/// Runs inference on the given schema and searches for a location within the resource spec
/// that bears the `x-collection-name` annotation. Returns the pointer to that location, or an
/// that bears the `x-collection-name` and `x-schema-name` annotations. Returns the pointer to those location, or an
/// error if no such location exists. Errors from parsing the schema are returned directly.
/// The schema must be fully self-contained (a.k.a. bundled), or an error will be returned.
pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result<doc::Pointer> {
pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result<ResourceSpecPointers> {
// While all known connector resource spec schemas are self-contained, we don't
// actually do anything to guarantee that they are. This function may fail in that case.
let schema = doc::validation::build_bundle(schema_json)?;
Expand All @@ -37,12 +58,24 @@ pub fn pointer_for_schema(schema_json: &str) -> anyhow::Result<doc::Pointer> {
let index = builder.into_index();
let shape = doc::Shape::infer(&schema, &index);

let mut collection_name: Option<doc::Pointer> = None;
let mut task_name: Option<doc::Pointer> = None;
for (ptr, _, prop_shape, _) in shape.locations() {
if prop_shape.annotations.contains_key("x-collection-name") {
return Ok(ptr);
collection_name = Some(ptr)
} else if prop_shape.annotations.contains_key("x-schema-name") {
task_name = Some(ptr)
}
}
Err(anyhow::anyhow!(
"resource spec schema does not contain any location annotated with x-collection-name"
))

if let Some(collection_name_ptr) = collection_name {
Ok(ResourceSpecPointers {
collection_name: collection_name_ptr,
task_name
})
} else {
Err(anyhow::anyhow!(
"resource spec schema does not contain any location annotated with x-collection-name"
))
}
}
1 change: 1 addition & 0 deletions crates/flowctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ warp = { workspace = true }
[dev-dependencies]
assert_cmd = { workspace = true }
tempfile = { workspace = true }
insta = { workspace = true }
57 changes: 56 additions & 1 deletion crates/flowctl/src/generate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ fn stub_config(shape: &doc::Shape, collection: Option<&models::Collection>) -> s
let mut properties = serde_json::Map::new();

for p in &shape.object.properties {
if p.is_required {
if p.is_required || p.shape.annotations.get("x-schema-name").is_some() {
properties.insert(p.name.to_string(), stub_config(&p.shape, collection));
}
}
Expand Down Expand Up @@ -424,6 +424,17 @@ fn stub_config(shape: &doc::Shape, collection: Option<&models::Collection>) -> s
.rsplit("/")
.next()
.expect("collection names always have a slash"))
} else if shape
.annotations
.get("x-schema-name")
.is_some_and(|v| matches!(v, serde_json::Value::Bool(true)))
&& collection.is_some()
{
json!(collection
.unwrap()
.rsplit("/")
.nth(1)
.expect("collection names always have a slash"))
} else if shape.type_.overlaps(types::STRING) {
json!("")
} else if shape.type_.overlaps(types::INTEGER) {
Expand All @@ -436,3 +447,47 @@ fn stub_config(shape: &doc::Shape, collection: Option<&models::Collection>) -> s
json!(null)
}
}

#[cfg(test)]
mod test {
use super::*;
use doc::Shape;

// Map a JSON schema, in YAML form, into a Shape.
fn shape_from(schema_yaml: &str) -> Shape {

let url = url::Url::parse("http://example/schema").unwrap();
let schema: serde_json::Value = serde_yaml::from_str(schema_yaml).unwrap();
let schema =
json::schema::build::build_schema::<doc::Annotation>(url.clone(), &schema).unwrap();

let mut index = json::schema::index::IndexBuilder::new();
index.add(&schema).unwrap();
index.verify_references().unwrap();
let index = index.into_index();

Shape::infer(index.must_fetch(&url).unwrap(), &index)
}

#[test]
fn test_stub_config_resource_spec_pointers() {
let obj = shape_from(
r#"
type: object
properties:
stream:
type: string
x-collection-name: true
schema:
type: string
x-schema-name: true
required:
- stream
"#,
);

let cfg = stub_config(&obj, Some(&models::Collection::new("my-tenant/my-task/my-collection")));

insta::assert_json_snapshot!(cfg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
source: crates/flowctl/src/generate/mod.rs
expression: cfg
---
{
"schema": "my-task",
"stream": "my-collection"
}

0 comments on commit 5afcb73

Please sign in to comment.