Skip to content

Commit

Permalink
airbyte-to-flow: set resource_path_pointers in spec response
Browse files Browse the repository at this point in the history
This allows these connectors to take advantage of the new discover merge behavior.
  • Loading branch information
psFried committed Dec 4, 2023
1 parent ea8d41e commit c20b4a9
Showing 1 changed file with 2 additions and 31 deletions.
33 changes: 2 additions & 31 deletions airbyte-to-flow/src/interceptors/airbyte_source_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl AirbyteSourceInterceptor {
.map(|spec| serde_json::from_value(spec))
.transpose()?,
documentation_url: documentation_url.unwrap_or_default(),
resource_path_pointers: vec!["/namespace".to_string(), "/stream".to_string()],
}),
..Default::default()
})?;
Expand Down Expand Up @@ -200,7 +201,6 @@ impl AirbyteSourceInterceptor {
}
}
let recommended_name = stream_to_recommended_name(&stream.name);
let resource_path = stream_to_resource_path(&stream);

let has_incremental = stream
.supported_sync_modes
Expand Down Expand Up @@ -306,7 +306,7 @@ impl AirbyteSourceInterceptor {
key: key.clone(),
document_schema_json: fix_document_schema_keys(doc_schema, key)?.to_string(),
disable,
resource_path,
resource_path: Vec::new(), // this is deprecated and unused
})
}

Expand Down Expand Up @@ -676,19 +676,6 @@ fn resource_spec_to_resource_path(res: &ResourceSpec) -> Vec<String> {
path
}

/// Returns a resource path for the given stream. This is really just the
/// `name` of the stream, which corresponds to the `stream` of a resource spec.
/// This must be consistent with `resource_spec_to_resource_path`, so that the
/// resource paths returned by `Discover` and `Validate` are the same.
fn stream_to_resource_path(stream: &airbyte_catalog::Stream) -> Vec<String> {
let mut path = Vec::new();
if let Some(ns) = &stream.namespace {
path.push(ns.clone());
}
path.push(stream.name.clone());
path
}

// stream names have no constraints.
// Strip and sanitize them to be valid collection names.
fn stream_to_recommended_name(stream: &str) -> String {
Expand Down Expand Up @@ -727,30 +714,14 @@ mod test {
cursor_field: None,
};

let stream_a = airbyte_catalog::Stream {
name: "foo".to_string(),
json_schema: Default::default(),
supported_sync_modes: None,
source_defined_cursor: None,
default_cursor_field: None,
source_defined_primary_key: None,
namespace: None,
};

let expected = vec!["foo".to_string()];
assert_eq!(&expected, &resource_spec_to_resource_path(&spec_a));
assert_eq!(&expected, &stream_to_resource_path(&stream_a));

let spec_b = ResourceSpec {
namespace: Some("toe".to_string()),
..spec_a
};
let stream_b = airbyte_catalog::Stream {
namespace: Some("toe".to_string()),
..stream_a
};
let expected = vec!["toe".to_string(), "foo".to_string()];
assert_eq!(&expected, &resource_spec_to_resource_path(&spec_b));
assert_eq!(&expected, &stream_to_resource_path(&stream_b));
}
}

0 comments on commit c20b4a9

Please sign in to comment.