Skip to content

Commit

Permalink
agent: refactoring SourceCapture and SourceCaptureDef
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 17, 2024
1 parent ca6988e commit 70c9d98
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 20 deletions.
12 changes: 4 additions & 8 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use anyhow::Context;
use itertools::Itertools;
use models::{ModelDef, OnIncompatibleSchemaChange, SourceCaptureDef};
use models::{ModelDef, OnIncompatibleSchemaChange, SourceCapture};
use proto_flow::materialize::response::validated::constraint::Type as ConstraintType;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -64,10 +64,7 @@ impl MaterializationStatus {
dependencies.live.captures.get_by_key(&source_capture.capture_name())
{
if self.source_capture.is_none() {
self.source_capture = Some(SourceCaptureStatus {
source_capture: source_capture.def(),
..Default::default()
});
self.source_capture = Some(SourceCaptureStatus::default())
}
let source_capture_status = self.source_capture.as_mut().unwrap();
// Source capture errors are terminal
Expand Down Expand Up @@ -287,7 +284,6 @@ fn is_false(b: &bool) -> bool {
/// Status information about the `sourceCapture`
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, JsonSchema)]
pub struct SourceCaptureStatus {
pub source_capture: SourceCaptureDef,
/// Whether the materialization bindings are up-to-date with respect to
/// the `sourceCapture` bindings. In normal operation, this should always
/// be `true`. Otherwise, there will be a controller `error` and the
Expand Down Expand Up @@ -367,7 +363,7 @@ impl SourceCaptureStatus {

// Failures here are terminal
update_linked_materialization(
&self.source_capture,
model.source_capture.as_ref().unwrap(),
resource_spec_pointers,
&self.add_bindings,
draft_row.model.as_mut().unwrap(),
Expand Down Expand Up @@ -396,7 +392,7 @@ fn get_bindings_to_add(
}

fn update_linked_materialization(
source_capture: &SourceCaptureDef,
source_capture: &SourceCapture,
resource_spec_pointers: ResourceSpecPointers,
bindings_to_add: &BTreeSet<models::Collection>,
materialization: &mut models::MaterializationDef,
Expand Down
6 changes: 1 addition & 5 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod test {
use std::collections::{BTreeSet, VecDeque};

use chrono::TimeZone;
use models::{SourceCaptureDef, Capture};
use models::Capture;

use super::*;
use crate::controllers::materialization::SourceCaptureStatus;
Expand Down Expand Up @@ -509,10 +509,6 @@ mod test {
source_capture: Some(SourceCaptureStatus {
up_to_date: false,
add_bindings,
source_capture: SourceCaptureDef {
capture: Capture::new("snails/capture"),
..Default::default()
}
}),
publications: PublicationStatus {
target_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]),
Expand Down
18 changes: 14 additions & 4 deletions crates/agent/src/resource_configs.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use models::{SourceCaptureDef, SourceCaptureSchemaMode};
use models::{SourceCaptureSchemaMode, SourceCapture, SourceCaptureDef};
use serde_json::Value;

///
/// # Panics
/// If the `full_collection_name` doesn't contain any `/` characters, which should never
/// be the case since we should have already validated the collection name.
pub fn update_materialization_resource_spec(
source_capture: &SourceCaptureDef,
source_capture: &SourceCapture,
resource_spec: &mut Value,
resource_spec_pointers: &ResourceSpecPointers,
full_collection_name: &str,
Expand All @@ -33,18 +33,28 @@ pub fn update_materialization_resource_spec(

let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into());

if source_capture.schema_mode == SourceCaptureSchemaMode::CollectionSchema {
let source_capture_def = source_capture.to_normalized_def();

if source_capture_def.schema_mode == SourceCaptureSchemaMode::CollectionSchema {
if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name {
if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into());
} else {
anyhow::bail!(
"cannot create location '{x_schema_name_ptr}' in resource spec '{resource_spec}'"
);
}
}
}

if source_capture.delta_updates {
if source_capture_def.delta_updates {
if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates {
if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_delta_updates_prev, true.into());
} else {
anyhow::bail!(
"cannot create location '{x_delta_updates_ptr}' in resource spec '{resource_spec}'"
);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint};
pub use catalogs::Catalog;
pub use collections::{CollectionDef, Projection};
pub use connector::{split_image_tag, ConnectorConfig, LocalConfig};
pub use source_capture::{SourceCaptureDef, SourceCaptureSchemaMode};
pub use source_capture::{SourceCaptureDef, SourceCapture, SourceCaptureSchemaMode};
pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef};
pub use derive_sqlite::DeriveUsingSqlite;
pub use derive_typescript::DeriveUsingTypescript;
Expand Down Expand Up @@ -316,6 +316,10 @@ fn is_false(b: &bool) -> bool {
!*b
}

fn is_default<D: Default + PartialEq>(b: &D) -> bool {
D::default() == *b
}

fn is_u32_zero(u: &u32) -> bool {
*u == 0
}
5 changes: 3 additions & 2 deletions crates/models/src/source_capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct SourceCaptureDef {

/// When adding new bindings from a source capture to a materialization, how should the schema
/// of the materialization binding be set
#[serde(default)]
#[serde(default, skip_serializing_if = "super::is_default")]
pub schema_mode: SourceCaptureSchemaMode,

/// When adding new bindings from a source capture to a materialization, should the new
Expand All @@ -52,7 +52,8 @@ impl SourceCapture {
}
}

pub fn def(&self) -> SourceCaptureDef {
/// Convert the enum to a normalized SourceCaptureDef by normalizing the Simple case
pub fn to_normalized_def(&self) -> SourceCaptureDef {
match self {
SourceCapture::Simple(capture) => SourceCaptureDef {
capture: capture.clone(),
Expand Down

0 comments on commit 70c9d98

Please sign in to comment.