Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Export primary ingestion collection #30991

Merged
merged 7 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/adapter/src/optimize/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ impl<'a> DataflowBuilder<'a> {
ingestion_desc
.desc
.primary_export
.as_ref()
.map(|e| e.monotonic(&ingestion_desc.desc.connection))
.unwrap_or(false)
.monotonic(&ingestion_desc.desc.connection)
}
DataSourceDesc::Webhook { .. } => true,
DataSourceDesc::IngestionExport {
Expand Down
19 changes: 7 additions & 12 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,13 +972,10 @@ impl DataSourceDesc {
pub fn formats(&self) -> (Option<&str>, Option<&str>) {
match &self {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.primary_export {
Some(export) => match export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
match &ingestion_desc.desc.primary_export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
}
Expand Down Expand Up @@ -1030,11 +1027,9 @@ impl DataSourceDesc {
// `SourceEnvelope` itself, but that one feels more like an internal
// thing and adapter should own how we represent envelopes as a
// string? It would not be hard to convince me otherwise, though.
DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
.desc
.primary_export
.as_ref()
.map(|export| envelope_string(&export.envelope)),
DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
&ingestion_desc.desc.primary_export.envelope,
)),
DataSourceDesc::IngestionExport { data_config, .. } => {
Some(envelope_string(&data_config.envelope))
}
Expand Down
29 changes: 22 additions & 7 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,18 +1104,33 @@ pub fn plan_create_source(
None => scx.catalog.config().timestamp_interval,
};

let (primary_export, primary_export_details) = if force_source_table_syntax {
(
SourceExportDataConfig {
encoding: None,
envelope: SourceEnvelope::None(NoneEnvelope {
key_envelope: KeyEnvelope::None,
key_arity: 0,
}),
},
SourceExportDetails::None,
)
} else {
(
SourceExportDataConfig {
encoding,
envelope: envelope.clone(),
},
external_connection.primary_export_details(),
)
};
let source_desc = SourceDesc::<ReferencedConnection> {
connection: external_connection,
// We only define primary-export details for this source if we are still supporting
// the legacy source syntax. Otherwise, we will not output to the primary collection.
// TODO(database-issues#8620): Remove this field once the new syntax is enabled everywhere
primary_export: match force_source_table_syntax {
false => Some(SourceExportDataConfig {
encoding,
envelope: envelope.clone(),
}),
true => None,
},
primary_export,
primary_export_details,
timestamp_interval,
};

Expand Down
5 changes: 2 additions & 3 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1870,9 +1870,8 @@ where
// TODO(database-issues#8620): This will be removed once sources no longer export
// to primary collections and only export to explicit SourceExports (tables).
if let DataSource::Ingestion(ingestion) = &mut description.data_source {
if let Some(export) = ingestion.desc.primary_source_export() {
ingestion.source_exports.insert(id, export);
}
let export = ingestion.desc.primary_source_export();
ingestion.source_exports.insert(id, export);
}

let write_frontier = write_handle.upper();
Expand Down
25 changes: 15 additions & 10 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ mod tests {
LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
};
use mz_storage_types::sources::{
GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection, SourceDesc,
SourceEnvelope, SourceExport, SourceExportDataConfig, SourceExportDetails,
GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection,
SourceConnection, SourceDesc, SourceEnvelope, SourceExport, SourceExportDataConfig,
SourceExportDetails,
};
use timely::progress::Antichain;

Expand Down Expand Up @@ -301,18 +302,22 @@ mod tests {
})
.collect();

let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
load_generator: LoadGenerator::Auction,
tick_micros: Default::default(),
as_of: Default::default(),
up_to: Default::default(),
});
let primary_export_details = connection.primary_export_details();

IngestionDescription {
desc: SourceDesc {
connection: GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
load_generator: LoadGenerator::Auction,
tick_micros: Default::default(),
as_of: Default::default(),
up_to: Default::default(),
}),
primary_export: Some(SourceExportDataConfig {
connection,
primary_export: SourceExportDataConfig {
encoding: Default::default(),
envelope: SourceEnvelope::CdcV2,
}),
},
primary_export_details,
timestamp_interval: Default::default(),
},
ingestion_metadata: CollectionMetadata {
Expand Down
5 changes: 2 additions & 3 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,9 +746,8 @@ where
// TODO(database-issues#8620): This will be removed once sources no longer export
// to primary collections and only export to explicit SourceExports (tables).
if let DataSource::Ingestion(ingestion) = &mut data_source {
if let Some(export) = ingestion.desc.primary_source_export() {
ingestion.source_exports.insert(id, export);
}
let export = ingestion.desc.primary_source_export();
ingestion.source_exports.insert(id, export);
}

let write_frontier = write.upper();
Expand Down
1 change: 1 addition & 0 deletions src/storage-types/src/sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message ProtoSourceDesc {
ProtoSourceConnection connection = 1;
mz_proto.ProtoDuration timestamp_interval = 5;
ProtoSourceExportDataConfig primary_export = 6;
ProtoSourceExportDetails primary_export_details = 7;
}

message ProtoSourceConnection {
Expand Down
32 changes: 23 additions & 9 deletions src/storage-types/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,8 @@ pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
/// primary collection for this source.
/// TODO(database-issues#8620): This will be removed once sources no longer export
/// to primary collections and only export to explicit SourceExports (tables).
pub primary_export: Option<SourceExportDataConfig<C>>,
pub primary_export: SourceExportDataConfig<C>,
pub primary_export_details: SourceExportDetails,
}

impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
Expand All @@ -799,12 +800,14 @@ impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
let SourceDesc {
connection,
primary_export,
primary_export_details,
timestamp_interval,
} = self;

SourceDesc {
connection: connection.into_inline_connection(&r),
primary_export: primary_export.map(|e| e.into_inline_connection(r)),
primary_export: primary_export.into_inline_connection(r),
primary_export_details,
timestamp_interval,
}
}
Expand All @@ -814,7 +817,8 @@ impl RustType<ProtoSourceDesc> for SourceDesc {
fn into_proto(&self) -> ProtoSourceDesc {
ProtoSourceDesc {
connection: Some(self.connection.into_proto()),
primary_export: self.primary_export.as_ref().map(|e| e.into_proto()),
primary_export: Some(self.primary_export.into_proto()),
primary_export_details: Some(self.primary_export_details.into_proto()),
timestamp_interval: Some(self.timestamp_interval.into_proto()),
}
}
Expand All @@ -824,7 +828,12 @@ impl RustType<ProtoSourceDesc> for SourceDesc {
connection: proto
.connection
.into_rust_if_some("ProtoSourceDesc::connection")?,
primary_export: proto.primary_export.into_rust()?,
primary_export: proto
.primary_export
.into_rust_if_some("ProtoSourceDesc::primary_export")?,
primary_export_details: proto
.primary_export_details
.into_rust_if_some("ProtoSourceDesc::primary_export_details")?,
timestamp_interval: proto
.timestamp_interval
.into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
Expand All @@ -843,6 +852,7 @@ impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
let Self {
connection,
primary_export,
primary_export_details,
timestamp_interval,
} = &self;

Expand All @@ -852,6 +862,10 @@ impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
"connection",
),
(primary_export == &other.primary_export, "primary_export"),
(
primary_export_details == &other.primary_export_details,
"primary_export_details",
),
(
timestamp_interval == &other.timestamp_interval,
"timestamp_interval",
Expand All @@ -878,12 +892,12 @@ impl SourceDesc<InlinedConnection> {
/// Returns the SourceExport details for the primary export.
/// TODO(database-issues#8620): This will be removed once sources no longer export
/// to primary collections and only export to explicit SourceExports (tables).
pub fn primary_source_export(&self) -> Option<SourceExport<(), InlinedConnection>> {
self.primary_export.clone().map(|data_config| SourceExport {
pub fn primary_source_export(&self) -> SourceExport<(), InlinedConnection> {
SourceExport {
storage_metadata: (),
details: self.connection.primary_export_details(),
data_config,
})
details: self.primary_export_details.clone(),
data_config: self.primary_export.clone(),
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/render/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
connection: _,
timestamp_interval: _,
primary_export: _,
primary_export_details: _,
} = description.desc;

let (decoded_stream, decode_health) = match encoding {
Expand Down
Loading
Loading