Skip to content

Commit

Permalink
agent: add built specs to draft (bugfix)
Browse files Browse the repository at this point in the history
Fixes a bug that was introduced during the publciations refactor, where
the `built` and `validated` columns of `draft_specs` were no longer
populated after a `dry_run` publication. This adds that back in, so that
the "Field Selection" UI will work again.
  • Loading branch information
psFried committed Oct 8, 2024
1 parent 077a57b commit 4670440
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
5 changes: 5 additions & 0 deletions crates/agent/src/publications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ impl Handler for Publisher {

let (status, draft_errors, final_pub_id) = match self.process(row).await {
Ok(result) => {
if dry_run {
specs::add_built_specs_to_draft_specs(draft_id, &result.built, &self.db)
.await
.context("adding built specs to draft")?;
}
let errors = result.draft_errors();
let final_id = if result.status.is_success() {
Some(result.pub_id)
Expand Down
38 changes: 25 additions & 13 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,34 +307,46 @@ pub async fn check_source_capture_annotations(
let mut errors = tables::Errors::default();

for materialization in draft.materializations.iter() {
let Some(model) = materialization.model() else { continue };
let Some(image) = model.connector_image() else { continue };
let Some(model) = materialization.model() else {
continue;
};
let Some(image) = model.connector_image() else {
continue;
};
let (image_name, image_tag) = split_image_tag(image);

let Some(source_capture) = &model.source_capture else { continue };
let Some(source_capture) = &model.source_capture else {
continue;
};

// SourceCaptures require a connector_tags row in any case. To avoid an error down the line
// in the controller we validate that here. This should only happen for test connector
// tags, hence the technical error message
let Some(connector_spec) = agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await? else {
let Some(connector_spec) =
agent_sql::connector_tags::fetch_connector_spec(&image_name, &image_tag, pool).await?
else {
errors.insert(tables::Error {
scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()),
error: anyhow::anyhow!("materializations with a sourceCapture only work for known connector tags. {image} is not known to the control plane"),
});
continue
continue;
};
if let SourceCapture::Configured(source_capture_def) = source_capture {
let resource_config_schema = connector_spec.resource_config_schema;
let resource_spec_pointers = crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?;
let resource_spec_pointers =
crate::resource_configs::pointer_for_schema(resource_config_schema.0.get())?;

if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none() {
if source_capture_def.delta_updates && resource_spec_pointers.x_delta_updates.is_none()
{
errors.insert(tables::Error {
scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()),
error: anyhow::anyhow!("sourceCapture.deltaUpdates set but the connector '{image_name}' does not support delta updates"),
});
}

if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName && resource_spec_pointers.x_schema_name.is_none() {
if source_capture_def.target_schema == SourceCaptureSchemaMode::FromSourceName
&& resource_spec_pointers.x_schema_name.is_none()
{
errors.insert(tables::Error {
scope: tables::synthetic_scope(model.catalog_type(), materialization.catalog_name()),
error: anyhow::anyhow!("sourceCapture.targetSchema set but the connector '{image_name}' does not support resource schemas"),
Expand Down Expand Up @@ -957,12 +969,12 @@ pub async fn load_draft(
// changing in this publication per the list of spec_rows.
pub async fn add_built_specs_to_draft_specs(
draft_id: agent_sql::Id,
build_output: &build::Output,
build_output: &tables::Validations,
db: &sqlx::PgPool,
) -> Result<(), sqlx::Error> {
// Possible optimization, which I'm not doing right now: collect vecs of all the
// prepared statement parameters and update all draft specs in a single query.
for collection in build_output.built.built_collections.iter() {
for collection in build_output.built_collections.iter() {
if !collection.is_delete() {
agent_sql::drafts::add_built_spec(
draft_id,
Expand All @@ -975,7 +987,7 @@ pub async fn add_built_specs_to_draft_specs(
}
}

for capture in build_output.built.built_captures.iter() {
for capture in build_output.built_captures.iter() {
if !capture.is_delete() {
agent_sql::drafts::add_built_spec(
draft_id,
Expand All @@ -988,7 +1000,7 @@ pub async fn add_built_specs_to_draft_specs(
}
}

for materialization in build_output.built.built_materializations.iter() {
for materialization in build_output.built_materializations.iter() {
if !materialization.is_delete() {
agent_sql::drafts::add_built_spec(
draft_id,
Expand All @@ -1001,7 +1013,7 @@ pub async fn add_built_specs_to_draft_specs(
}
}

for test in build_output.built.built_tests.iter() {
for test in build_output.built_tests.iter() {
if !test.is_delete() {
agent_sql::drafts::add_built_spec(
draft_id,
Expand Down

0 comments on commit 4670440

Please sign in to comment.