Skip to content

Commit

Permalink
agent: refactoring SourceCapture and SourceCaptureDef
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 17, 2024
1 parent e3d6191 commit 1c40d8b
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 38 deletions.
12 changes: 4 additions & 8 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use anyhow::Context;
use itertools::Itertools;
use models::{ModelDef, OnIncompatibleSchemaChange, SourceCaptureDef};
use models::{ModelDef, OnIncompatibleSchemaChange, SourceCapture};
use proto_flow::materialize::response::validated::constraint::Type as ConstraintType;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -66,10 +66,7 @@ impl MaterializationStatus {
dependencies.live.captures.get_by_key(&source_capture.capture_name())
{
if self.source_capture.is_none() {
self.source_capture = Some(SourceCaptureStatus {
source_capture: source_capture.def(),
..Default::default()
});
self.source_capture = Some(SourceCaptureStatus::default())
}
let source_capture_status = self.source_capture.as_mut().unwrap();
// Source capture errors are terminal
Expand Down Expand Up @@ -292,7 +289,6 @@ fn is_false(b: &bool) -> bool {
/// Status information about the `sourceCapture`
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, JsonSchema)]
pub struct SourceCaptureStatus {
pub source_capture: SourceCaptureDef,
/// Whether the materialization bindings are up-to-date with respect to
/// the `sourceCapture` bindings. In normal operation, this should always
/// be `true`. Otherwise, there will be a controller `error` and the
Expand Down Expand Up @@ -373,7 +369,7 @@ impl SourceCaptureStatus {

// Failures here are terminal
update_linked_materialization(
&self.source_capture,
model.source_capture.as_ref().unwrap(),
resource_spec_pointers,
&self.add_bindings,
draft_row.model.as_mut().unwrap(),
Expand Down Expand Up @@ -402,7 +398,7 @@ fn get_bindings_to_add(
}

fn update_linked_materialization(
source_capture: &SourceCaptureDef,
source_capture: &SourceCapture,
resource_spec_pointers: ResourceSpecPointers,
bindings_to_add: &BTreeSet<models::Collection>,
materialization: &mut models::MaterializationDef,
Expand Down
6 changes: 1 addition & 5 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ mod test {
use std::collections::{BTreeSet, VecDeque};

use chrono::TimeZone;
use models::{SourceCaptureDef, Capture};
use models::Capture;

use super::*;
use crate::controllers::materialization::SourceCaptureStatus;
Expand Down Expand Up @@ -506,10 +506,6 @@ mod test {
source_capture: Some(SourceCaptureStatus {
up_to_date: false,
add_bindings,
source_capture: SourceCaptureDef {
capture: Capture::new("snails/capture"),
..Default::default()
}
}),
publications: PublicationStatus {
max_observed_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
---
source: crates/agent/src/controllers/mod.rs
expression: "StatusSnapshot { starting: status, json: as_json, parsed: round_tripped }"
expression: "StatusSnapshot { starting: status, json: as_json, parsed: round_tripped, }"
---
StatusSnapshot {
starting: Materialization(
MaterializationStatus {
source_capture: Some(
SourceCaptureStatus {
source_capture: SourceCaptureDef {
capture: Capture(
"snails/capture",
),
schema_mode: LeaveEmpty,
delta_updates: false,
},
up_to_date: false,
add_bindings: {
Collection(
Expand Down Expand Up @@ -85,13 +78,6 @@ StatusSnapshot {
MaterializationStatus {
source_capture: Some(
SourceCaptureStatus {
source_capture: SourceCaptureDef {
capture: Capture(
"snails/capture",
),
schema_mode: LeaveEmpty,
delta_updates: false,
},
up_to_date: false,
add_bindings: {
Collection(
Expand Down
4 changes: 2 additions & 2 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ async fn update_live_spec_flows<B: tables::BuiltRow>(

let reads_from = model.reads_from();
let writes_to = model.writes_to();
let source_capture = model.materialization_source_capture();
let source_capture = model.materialization_source_capture_name();

agent_sql::publications::insert_live_spec_flows(
built.control_id().into(),
catalog_type,
Some(reads_from.iter().map(|c| c.as_str()).collect::<Vec<_>>()).filter(|a| !a.is_empty()),
Some(writes_to.iter().map(|c| c.as_str()).collect::<Vec<_>>()).filter(|a| !a.is_empty()),
source_capture.as_ref().map(|c| c.capture.as_str()),
source_capture.as_ref().map(|c| c.as_str()),
txn,
)
.await?;
Expand Down
18 changes: 14 additions & 4 deletions crates/agent/src/resource_configs.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use models::{SourceCaptureDef, SourceCaptureSchemaMode};
use models::{SourceCaptureSchemaMode, SourceCapture, SourceCaptureDef};
use serde_json::Value;

///
/// # Panics
/// If the `full_collection_name` doesn't contain any `/` characters, which should never
/// be the case since we should have already validated the collection name.
pub fn update_materialization_resource_spec(
source_capture: &SourceCaptureDef,
source_capture: &SourceCapture,
resource_spec: &mut Value,
resource_spec_pointers: &ResourceSpecPointers,
full_collection_name: &str,
Expand All @@ -33,18 +33,28 @@ pub fn update_materialization_resource_spec(

let _ = std::mem::replace(x_collection_name_prev, x_collection_name.into());

if source_capture.schema_mode == SourceCaptureSchemaMode::CollectionSchema {
let source_capture_def = source_capture.to_normalized_def();

if source_capture_def.schema_mode == SourceCaptureSchemaMode::CollectionSchema {
if let Some(x_schema_name_ptr) = &resource_spec_pointers.x_schema_name {
if let Some(x_schema_name_prev) = x_schema_name_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_schema_name_prev, x_schema_name.into());
} else {
anyhow::bail!(
"cannot create location '{x_schema_name_ptr}' in resource spec '{resource_spec}'"
);
}
}
}

if source_capture.delta_updates {
if source_capture_def.delta_updates {
if let Some(x_delta_updates_ptr) = &resource_spec_pointers.x_delta_updates {
if let Some(x_delta_updates_prev) = x_delta_updates_ptr.create_value(resource_spec) {
let _ = std::mem::replace(x_delta_updates_prev, true.into());
} else {
anyhow::bail!(
"cannot create location '{x_delta_updates_ptr}' in resource spec '{resource_spec}'"
);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint};
pub use catalogs::{Capability, Catalog, CatalogType};
pub use collections::{CollectionDef, Projection};
pub use connector::{split_image_tag, ConnectorConfig, LocalConfig};
pub use source_capture::{SourceCaptureDef, SourceCaptureSchemaMode};
pub use source_capture::{SourceCaptureDef, SourceCapture, SourceCaptureSchemaMode};
pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef};
pub use derive_sqlite::DeriveUsingSqlite;
pub use derive_typescript::DeriveUsingTypescript;
Expand Down Expand Up @@ -290,6 +290,10 @@ fn is_false(b: &bool) -> bool {
!*b
}

fn is_default<D: Default + PartialEq>(b: &D) -> bool {
D::default() == *b
}

fn is_u32_zero(u: &u32) -> bool {
*u == 0
}
2 changes: 1 addition & 1 deletion crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Capture;
use crate::{source::OnIncompatibleSchemaChange, Collection, Id};

use crate::source_capture::{SourceCapture, SourceCaptureDef};
use crate::source_capture::SourceCapture;

use super::{
ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source,
Expand Down
5 changes: 3 additions & 2 deletions crates/models/src/source_capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct SourceCaptureDef {

/// When adding new bindings from a source capture to a materialization, how should the schema
/// of the materialization binding be set
#[serde(default)]
#[serde(default, skip_serializing_if = "super::is_default")]
pub schema_mode: SourceCaptureSchemaMode,

/// When adding new bindings from a source capture to a materialization, should the new
Expand All @@ -52,7 +52,8 @@ impl SourceCapture {
}
}

pub fn def(&self) -> SourceCaptureDef {
/// Convert the enum to a normalized SourceCaptureDef by normalizing the Simple case
pub fn to_normalized_def(&self) -> SourceCaptureDef {
match self {
SourceCapture::Simple(capture) => SourceCaptureDef {
capture: capture.clone(),
Expand Down

0 comments on commit 1c40d8b

Please sign in to comment.