Skip to content

Commit

Permalink
storage: Export primary ingestion collection (#30991)
Browse files Browse the repository at this point in the history
Previously, when the `force_source_table_syntax` flag was enabled, the
primary source ingestion collection was not included in the source
exports. This would cause the primary source ingestion collection's
upper and since to be stuck at 0, and it would break some existing code.

This commit always includes the primary ingestion collection in the
source exports. However, when the `force_source_table_syntax` flag is
enabled, then the source export details are set to
`SourceExportDetails::None`. The result is that all source types with
the flag enabled behave similarly to how multi-output sources behave
with the flag disabled in regard to the primary ingestion collection.
Specifically, their upper's and since's move forward in time and
querying them returns an empty result.

A downside of this commit is that a source ingestion is always scheduled
with the flag enabled, even if there are no table exports. Previously,
they would only be scheduled if there were table exports.

Works towards resolving #MaterializeInc/database-issues/issues/8620
  • Loading branch information
jkosh44 authored Jan 22, 2025
1 parent a077232 commit f3ee8dd
Show file tree
Hide file tree
Showing 10 changed files with 615 additions and 47 deletions.
4 changes: 1 addition & 3 deletions src/adapter/src/optimize/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ impl<'a> DataflowBuilder<'a> {
ingestion_desc
.desc
.primary_export
.as_ref()
.map(|e| e.monotonic(&ingestion_desc.desc.connection))
.unwrap_or(false)
.monotonic(&ingestion_desc.desc.connection)
}
DataSourceDesc::Webhook { .. } => true,
DataSourceDesc::IngestionExport {
Expand Down
19 changes: 7 additions & 12 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,13 +972,10 @@ impl DataSourceDesc {
pub fn formats(&self) -> (Option<&str>, Option<&str>) {
match &self {
DataSourceDesc::Ingestion { ingestion_desc, .. } => {
match &ingestion_desc.desc.primary_export {
Some(export) => match export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
match &ingestion_desc.desc.primary_export.encoding.as_ref() {
Some(encoding) => match &encoding.key {
Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
None => (None, Some(encoding.value.type_())),
},
None => (None, None),
}
Expand Down Expand Up @@ -1030,11 +1027,9 @@ impl DataSourceDesc {
// `SourceEnvelope` itself, but that one feels more like an internal
// thing and adapter should own how we represent envelopes as a
// string? It would not be hard to convince me otherwise, though.
DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
.desc
.primary_export
.as_ref()
.map(|export| envelope_string(&export.envelope)),
DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
&ingestion_desc.desc.primary_export.envelope,
)),
DataSourceDesc::IngestionExport { data_config, .. } => {
Some(envelope_string(&data_config.envelope))
}
Expand Down
29 changes: 22 additions & 7 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,18 +1104,33 @@ pub fn plan_create_source(
None => scx.catalog.config().timestamp_interval,
};

let (primary_export, primary_export_details) = if force_source_table_syntax {
(
SourceExportDataConfig {
encoding: None,
envelope: SourceEnvelope::None(NoneEnvelope {
key_envelope: KeyEnvelope::None,
key_arity: 0,
}),
},
SourceExportDetails::None,
)
} else {
(
SourceExportDataConfig {
encoding,
envelope: envelope.clone(),
},
external_connection.primary_export_details(),
)
};
let source_desc = SourceDesc::<ReferencedConnection> {
connection: external_connection,
// We only define primary-export details for this source if we are still supporting
// the legacy source syntax. Otherwise, we will not output to the primary collection.
// TODO(database-issues#8620): Remove this field once the new syntax is enabled everywhere
primary_export: match force_source_table_syntax {
false => Some(SourceExportDataConfig {
encoding,
envelope: envelope.clone(),
}),
true => None,
},
primary_export,
primary_export_details,
timestamp_interval,
};

Expand Down
5 changes: 2 additions & 3 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1870,9 +1870,8 @@ where
// TODO(database-issues#8620): This will be removed once sources no longer export
// to primary collections and only export to explicit SourceExports (tables).
if let DataSource::Ingestion(ingestion) = &mut description.data_source {
if let Some(export) = ingestion.desc.primary_source_export() {
ingestion.source_exports.insert(id, export);
}
let export = ingestion.desc.primary_source_export();
ingestion.source_exports.insert(id, export);
}

let write_frontier = write_handle.upper();
Expand Down
25 changes: 15 additions & 10 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ mod tests {
LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
};
use mz_storage_types::sources::{
GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection, SourceDesc,
SourceEnvelope, SourceExport, SourceExportDataConfig, SourceExportDetails,
GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection,
SourceConnection, SourceDesc, SourceEnvelope, SourceExport, SourceExportDataConfig,
SourceExportDetails,
};
use timely::progress::Antichain;

Expand Down Expand Up @@ -301,18 +302,22 @@ mod tests {
})
.collect();

let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
load_generator: LoadGenerator::Auction,
tick_micros: Default::default(),
as_of: Default::default(),
up_to: Default::default(),
});
let primary_export_details = connection.primary_export_details();

IngestionDescription {
desc: SourceDesc {
connection: GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
load_generator: LoadGenerator::Auction,
tick_micros: Default::default(),
as_of: Default::default(),
up_to: Default::default(),
}),
primary_export: Some(SourceExportDataConfig {
connection,
primary_export: SourceExportDataConfig {
encoding: Default::default(),
envelope: SourceEnvelope::CdcV2,
}),
},
primary_export_details,
timestamp_interval: Default::default(),
},
ingestion_metadata: CollectionMetadata {
Expand Down
5 changes: 2 additions & 3 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,9 +746,8 @@ where
// TODO(database-issues#8620): This will be removed once sources no longer export
// to primary collections and only export to explicit SourceExports (tables).
if let DataSource::Ingestion(ingestion) = &mut data_source {
if let Some(export) = ingestion.desc.primary_source_export() {
ingestion.source_exports.insert(id, export);
}
let export = ingestion.desc.primary_source_export();
ingestion.source_exports.insert(id, export);
}

let write_frontier = write.upper();
Expand Down
1 change: 1 addition & 0 deletions src/storage-types/src/sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message ProtoSourceDesc {
ProtoSourceConnection connection = 1;
mz_proto.ProtoDuration timestamp_interval = 5;
ProtoSourceExportDataConfig primary_export = 6;
ProtoSourceExportDetails primary_export_details = 7;
}

message ProtoSourceConnection {
Expand Down
32 changes: 23 additions & 9 deletions src/storage-types/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,8 @@ pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
/// primary collection for this source.
/// TODO(database-issues#8620): This will be removed once sources no longer export
/// to primary collections and only export to explicit SourceExports (tables).
pub primary_export: Option<SourceExportDataConfig<C>>,
pub primary_export: SourceExportDataConfig<C>,
pub primary_export_details: SourceExportDetails,
}

impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
Expand All @@ -799,12 +800,14 @@ impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
let SourceDesc {
connection,
primary_export,
primary_export_details,
timestamp_interval,
} = self;

SourceDesc {
connection: connection.into_inline_connection(&r),
primary_export: primary_export.map(|e| e.into_inline_connection(r)),
primary_export: primary_export.into_inline_connection(r),
primary_export_details,
timestamp_interval,
}
}
Expand All @@ -814,7 +817,8 @@ impl RustType<ProtoSourceDesc> for SourceDesc {
fn into_proto(&self) -> ProtoSourceDesc {
ProtoSourceDesc {
connection: Some(self.connection.into_proto()),
primary_export: self.primary_export.as_ref().map(|e| e.into_proto()),
primary_export: Some(self.primary_export.into_proto()),
primary_export_details: Some(self.primary_export_details.into_proto()),
timestamp_interval: Some(self.timestamp_interval.into_proto()),
}
}
Expand All @@ -824,7 +828,12 @@ impl RustType<ProtoSourceDesc> for SourceDesc {
connection: proto
.connection
.into_rust_if_some("ProtoSourceDesc::connection")?,
primary_export: proto.primary_export.into_rust()?,
primary_export: proto
.primary_export
.into_rust_if_some("ProtoSourceDesc::primary_export")?,
primary_export_details: proto
.primary_export_details
.into_rust_if_some("ProtoSourceDesc::primary_export_details")?,
timestamp_interval: proto
.timestamp_interval
.into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
Expand All @@ -843,6 +852,7 @@ impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
let Self {
connection,
primary_export,
primary_export_details,
timestamp_interval,
} = &self;

Expand All @@ -852,6 +862,10 @@ impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
"connection",
),
(primary_export == &other.primary_export, "primary_export"),
(
primary_export_details == &other.primary_export_details,
"primary_export_details",
),
(
timestamp_interval == &other.timestamp_interval,
"timestamp_interval",
Expand All @@ -878,12 +892,12 @@ impl SourceDesc<InlinedConnection> {
/// Returns the SourceExport details for the primary export.
/// TODO(database-issues#8620): This will be removed once sources no longer export
/// to primary collections and only export to explicit SourceExports (tables).
pub fn primary_source_export(&self) -> Option<SourceExport<(), InlinedConnection>> {
self.primary_export.clone().map(|data_config| SourceExport {
pub fn primary_source_export(&self) -> SourceExport<(), InlinedConnection> {
SourceExport {
storage_metadata: (),
details: self.connection.primary_export_details(),
data_config,
})
details: self.primary_export_details.clone(),
data_config: self.primary_export.clone(),
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/render/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
connection: _,
timestamp_interval: _,
primary_export: _,
primary_export_details: _,
} = description.desc;

let (decoded_stream, decode_health) = match encoding {
Expand Down
Loading

0 comments on commit f3ee8dd

Please sign in to comment.