Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Export primary ingestion collection #30991

Merged
merged 7 commits into from
Jan 22, 2025

Conversation

jkosh44
Copy link
Contributor

@jkosh44 jkosh44 commented Jan 9, 2025

Previously, when the force_source_table_syntax flag was enabled, the primary source ingestion collection was not included in the source exports. This would cause the primary source ingestion collection's upper and since to be stuck at 0, and it would break some existing code.

This commit always includes the primary ingestion collection in the source exports. However, when the force_source_table_syntax flag is enabled, then the source export details are set to SourceExportDetails::None. The result is that all source types with the flag enabled behave similarly to how multi-output sources behave with the flag disabled in regard to the primary ingestion collection. Specifically, their upper's and since's move forward in time and querying them returns an empty result.

A downside of this commit is that a source ingestion is always scheduled with the flag enabled, even if there are no table exports. Previously, they would only be scheduled if there were table exports.

Works towards resolving #MaterializeInc/database-issues/issues/8620

Motivation

This PR adds a known-desirable feature.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

Previously, when the `force_source_table_syntax` flag was enabled,
the primary source ingestion collection was not included in the source
exports. This would cause the primary source ingestion collection's
upper and since to be stuck at 0, and it would break some existing
code.

This commit always includes the primary ingestion collection in the
source exports. However, when the `force_source_table_syntax` flag is
enabled, then the source export details are set to
`SourceExportDetails::None`. The result is that all source types with
the flag enabled behave similarly to how multi-output sources behave
with the flag disabled in regard to the primary ingestion collection.
Specifically, their upper's and since's move forward in time and
querying them returns an empty result.

A downside of this commit is that a source ingestion is always
scheduled with the flag enabled, even if there are no table exports.
Previously, they would only be scheduled if there were table exports.

Works towards resolving #MaterializeInc/database-issues/issues/8620
@jkosh44 jkosh44 marked this pull request as ready for review January 10, 2025 13:31
@jkosh44 jkosh44 requested a review from a team as a code owner January 10, 2025 13:31
@jkosh44 jkosh44 requested a review from petrosagg January 10, 2025 13:31
@@ -1549,6 +1550,19 @@ where
if let DataSource::Ingestion(ingestion) = &mut description.data_source {
if let Some(export) = ingestion.desc.primary_source_export() {
ingestion.source_exports.insert(id, export);
} else {
let export = SourceExport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the right logic but I wasn't expecting to happen here. Can you move this in the planner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried my best to move this to the planner. Let me know if that's what you had in mind. It involves not removing the primary export though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It involves not removing the primary export though.

I wasn't really sure how else to do it in the planner.

@jkosh44 jkosh44 requested a review from a team as a code owner January 15, 2025 15:57
@jkosh44 jkosh44 requested a review from aljoscha January 15, 2025 15:57
@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 15, 2025

@def- what I'd really like for this PR is to take all of the tests that involve the enable_create_table_from_source flag and run them a second time with the force_source_table_syntax flag enabled. Is there an easy way to do that?

@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 21, 2025

Looks like key-value load gen sources specifically are broken with the force_source_table_syntax flag.

testdrive-materialized-1     | thread 'timely:work-0' panicked at src/storage/src/source/generator/key_value.rs:98:18:
testdrive-materialized-1     | default output
testdrive-materialized-1     |    8: rust_begin_unwind
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/std/src/panicking.rs:665:5
testdrive-materialized-1     |    9: core::panicking::panic_fmt
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/panicking.rs:74:14
testdrive-materialized-1     |   10: core::panicking::panic_display
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/panicking.rs:264:5
testdrive-materialized-1     |   11: core::option::expect_failed
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/option.rs:2021:5
testdrive-materialized-1     |   12: <core::option::Option<&alloc::vec::Vec<usize>>>::expect
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/option.rs:933:21
testdrive-materialized-1     |   13: mz_storage::source::generator::key_value::render::<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}
testdrive-materialized-1     |              at src/storage/src/source/generator/key_value.rs:96:34
testdrive-materialized-1     |   14: <mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}> as core::future::future::Future>::poll
testdrive-materialized-1     |              at src/storage/src/source/types.rs:233:19
testdrive-materialized-1     |   15: <core::pin::Pin<alloc::boxed::Box<mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}>>> as core::future::future::Future>::poll
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/future/future.rs:123:9
testdrive-materialized-1     |   16: <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build::<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}, mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}>>::{closure#0}::{closure#0}
testdrive-postgres-1   | 2025-01-21 16:04:58.143 UTC [1057] FATAL:  role "root" does not exist
testdrive-materialized-1     |              at src/timely-util/src/builder_async.rs:596:32
testdrive-materialized-1     |   17: <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_reschedule::<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}, mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}>>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}, mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}>>::{closure#0}::{closure#0}>::{closure#0}
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/dataflow/operators/generic/builder_rc.rs:179:26
testdrive-materialized-1     |   18: <timely::dataflow::operators::generic::builder_raw::OperatorCore<mz_storage_types::sources::MzOffset, <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_reschedule<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}, mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}>>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}, mz_storage::source::types::SignaledFuture<mz_storage::source::generator::key_value::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection, tokio_stream::wrappers::watch::WatchStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::load_generator::LoadGeneratorSourceConnection>::{closure#0}>::{closure#1}::{closure#0}>>::{closure#0}::{closure#0}>::{closure#0}> as timely::scheduling::Schedule>::schedule
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/dataflow/operators/generic/builder_raw.rs:203:9
testdrive-materialized-1     |   19: <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/progress/subgraph.rs:591:28
testdrive-materialized-1     |   20: core::iter::traits::iterator::Iterator::for_each::call::<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/traits/iterator.rs:810:29
testdrive-materialized-1     |   21: core::iter::adapters::flatten::flatten_one::<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/flatten.rs:892:23
testdrive-materialized-1     |   22: core::iter::adapters::map::map_fold::<&mut timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}, core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>::{closure#0}
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/map.rs:88:21
testdrive-materialized-1     |   23: <core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::adapters::map::map_fold<&mut timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}, core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/slice/iter/macros.rs:232:27
testdrive-materialized-1     |   24: <core::iter::adapters::map::Map<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/map.rs:128:9
testdrive-materialized-1     |   25: <core::iter::adapters::fuse::Fuse<core::iter::adapters::map::Map<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}>> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/fuse.rs:98:19
testdrive-materialized-1     |   26: <core::iter::adapters::flatten::FlattenCompat<core::iter::adapters::map::Map<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}>, core::option::IntoIter<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/flatten.rs:970:9
testdrive-materialized-1     |   27: <core::iter::adapters::flatten::FlatMap<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/flatten.rs:87:9
testdrive-materialized-1     |   28: <core::iter::adapters::flatten::FlatMap<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<mz_storage_types::sources::MzOffset>>>, <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}> as core::iter::traits::iterator::Iterator>::for_each::<<timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/traits/iterator.rs:813:14
testdrive-materialized-1     |   29: <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::progress::operate::Operate<()>>::set_external_summary
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/progress/subgraph.rs:591:14
testdrive-materialized-1     |   30: <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/progress/subgraph.rs:591:28
testdrive-materialized-1     |   31: core::iter::traits::iterator::Iterator::for_each::call::<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/traits/iterator.rs:810:29
testdrive-materialized-1     |   32: core::iter::adapters::flatten::flatten_one::<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/flatten.rs:892:23
testdrive-materialized-1     |   33: core::iter::adapters::map::map_fold::<&mut timely::progress::subgraph::PerOperatorState<()>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}, core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>::{closure#0}
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/map.rs:88:21
testdrive-materialized-1     |   34: <core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<()>> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::adapters::map::map_fold<&mut timely::progress::subgraph::PerOperatorState<()>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}, core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/slice/iter/macros.rs:232:27
testdrive-materialized-1     |   35: <core::iter::adapters::map::Map<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/map.rs:128:9
testdrive-materialized-1     |   36: <core::iter::adapters::fuse::Fuse<core::iter::adapters::map::Map<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}>> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::adapters::flatten::flatten_one<core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, (), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/fuse.rs:98:19
testdrive-materialized-1     |   37: <core::iter::adapters::flatten::FlattenCompat<core::iter::adapters::map::Map<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}>, core::option::IntoIter<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/flatten.rs:970:9
testdrive-materialized-1     |   38: <core::iter::adapters::flatten::FlatMap<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<()>>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}> as core::iter::traits::iterator::Iterator>::fold::<(), core::iter::traits::iterator::Iterator::for_each::call<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>::{closure#0}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/adapters/flatten.rs:87:9
testdrive-materialized-1     |   39: <core::iter::adapters::flatten::FlatMap<core::slice::iter::IterMut<timely::progress::subgraph::PerOperatorState<()>>, core::option::Option<&mut alloc::boxed::Box<dyn timely::progress::operate::Operate<()>>>, <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#0}> as core::iter::traits::iterator::Iterator>::for_each::<<timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary::{closure#1}>
testdrive-materialized-1     |              at ./rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/iter/traits/iterator.rs:813:14
testdrive-materialized-1     |   40: <timely::progress::subgraph::Subgraph<(), ()> as timely::progress::operate::Operate<()>>::set_external_summary
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/progress/subgraph.rs:591:14
testdrive-materialized-1     |   41: <timely::worker::Worker<timely_communication::allocator::generic::Generic>>::dataflow_core::<(), (), mz_storage::render::build_ingestion_dataflow<timely_communication::allocator::generic::Generic>::{closure#0}, alloc::boxed::Box<()>>
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/worker.rs:657:9
testdrive-materialized-1     |   42: mz_storage::render::build_ingestion_dataflow::<timely_communication::allocator::generic::Generic>
testdrive-materialized-1     |              at src/storage/src/render.rs:243:5
testdrive-materialized-1     |   43: <mz_storage::storage_state::Worker<timely_communication::allocator::generic::Generic>>::handle_internal_storage_command
testdrive-materialized-1     |              at src/storage/src/storage_state.rs:692:17
testdrive-materialized-1     |   44: <mz_storage::storage_state::Worker<timely_communication::allocator::generic::Generic>>::run_client
testdrive-materialized-1     |              at src/storage/src/storage_state.rs:487:21
testdrive-materialized-1     |   45: <mz_storage::storage_state::Worker<timely_communication::allocator::generic::Generic>>::run
testdrive-materialized-1     |              at src/storage/src/storage_state.rs:395:21
testdrive-materialized-1     |   46: <mz_storage::server::Config as mz_cluster::types::AsRunnableWorker<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_and_run::<timely_communication::allocator::generic::Generic>
testdrive-materialized-1     |              at src/storage/src/server.rs:109:9
testdrive-materialized-1     |   47: <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse, std::thread::Thread>, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>, mz_storage::server::Config, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_timely::{closure#0}::{closure#2}
testdrive-materialized-1     |              at src/cluster/src/server.rs:241:13
testdrive-materialized-1     |   48: timely::execute::execute_from::<timely_communication::allocator::generic::GenericBuilder, (), <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse, std::thread::Thread>, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>, mz_storage::server::Config, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_timely::{closure#0}::{closure#2}>::{closure#0}
testdrive-materialized-1     |              at external/crates_io__timely-0.16.0/src/execute.rs:324:22
testdrive-materialized-1     |   49: timely_communication::initialize::initialize_from::<timely_communication::allocator::generic::GenericBuilder, (), timely::execute::execute_from<timely_communication::allocator::generic::GenericBuilder, (), <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse, std::thread::Thread>, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>, mz_storage::server::Config, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_timely::{closure#0}::{closure#2}>::{closure#0}>::{closure#0}
testdrive-materialized-1     |              at external/crates_io__timely_communication-0.16.1/src/initialize.rs:363:33
testdrive-materialized-1     | note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 21, 2025

The problem originates form here:

// figure out which output types from the generator belong to which output indexes
let mut output_map = BTreeMap::new();
for (idx, (_, export)) in config.source_exports.iter().enumerate() {
let output_type = match &export.details {
SourceExportDetails::LoadGenerator(details) => details.output,
// This is an export that doesn't need any data output to it.
SourceExportDetails::None => continue,
_ => panic!("unexpected source export details: {:?}", export.details),
};
output_map
.entry(output_type)
.or_insert_with(Vec::new)
.push(idx);
}

We only populate the output map with source exports that have a non None output. When force_source_table_syntax is enabled, the source has a single source export (the primary export) and the output details are SourceExportDetails::None. So the output map is empty.

Then in the key-value generator code, we assume that the output map is always non-empty, which is not true. So we panic.

// The key-value load generator only has one 'output' stream, which is the default output
// that needs to be emitted to all output indexes.
let output_indexes = output_map
.get(&LoadGeneratorOutput::Default)
.expect("default output");

Hopefully we can just return early if the output map is empty.

@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 21, 2025

Hopefully we can just return early if the output map is empty.

That does not work. For whatever reason that causes the upper to advance to the empty set.

materialize=> CREATE SOURCE keyvalue
  FROM LOAD GENERATOR KEY VALUE (
    KEYS 16,
    PARTITIONS 4,
    SNAPSHOT ROUNDS 3,
    SEED 123,
    VALUE SIZE 10,
    BATCH SIZE 2,
    TICK INTERVAL '1s'
  );
CREATE SOURCE
materialize=> SELECT * FROM keyvalue;
 partition | value 
-----------+-------
(0 rows)

materialize=> EXPLAIN TIMESTAMP FOR SELECT * FROM keyvalue;
                                 Timestamp                                 
---------------------------------------------------------------------------
                 query timestamp: 1737481538962 (2025-01-21 17:45:38.962) +
           oracle read timestamp: 1737481538962 (2025-01-21 17:45:38.962) +
 largest not in advance of upper: 18446744073709551615                    +
                           upper:[]                                       +
                           since:[            0 (1970-01-01 00:00:00.000)]+
         can respond immediately: true                                    +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1737481539508 (2025-01-21 17:45:39.508) +
                                                                          +
 source materialize.public.keyvalue (u2, storage):                        +
                   read frontier:[            0 (1970-01-01 00:00:00.000)]+
                  write frontier:[]                                       +
 
(1 row)

If we then try and create a table on the source, it gets stuck at a since and upper of 0.

materialize=> CREATE TABLE kv_1 FROM SOURCE keyvalue;
CREATE TABLE
materialize=> \timing
Timing is on.
materialize=> SELECT * FROM kv_1;
^CCancel request sent
ERROR:  canceling statement due to user request
Time: 18959.149 ms (00:18.959)
materialize=> EXPLAIN TIMESTAMP FOR SELECT * FROM kv_1;
                                 Timestamp                                 
---------------------------------------------------------------------------
                 query timestamp: 1737481747903 (2025-01-21 17:49:07.903) +
           oracle read timestamp: 1737481747903 (2025-01-21 17:49:07.903) +
 largest not in advance of upper:             0 (1970-01-01 00:00:00.000) +
                           upper:[            0 (1970-01-01 00:00:00.000)]+
                           since:[            0 (1970-01-01 00:00:00.000)]+
         can respond immediately: false                                   +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1737481748852 (2025-01-21 17:49:08.852) +
                                                                          +
 source materialize.public.kv_1 (u3, storage):                            +
                   read frontier:[            0 (1970-01-01 00:00:00.000)]+
                  write frontier:[            0 (1970-01-01 00:00:00.000)]+
 
(1 row)


@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 21, 2025

I tried a couple of quick and dirty fixes for the key-value load generator and nothing worked. I think we should address the issues in a follow-up PR and not this one. I opened the following issue for it: https://github.com/MaterializeInc/database-issues/issues/8904

@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 21, 2025

I tried a couple of quick and dirty fixes for the key-value load generator and nothing worked. I think we should address the issues in a follow-up PR and not this one. I opened the following issue for it: MaterializeInc/database-issues#8904

Of course I immediately thought of something that worked: #31134. I still think it's best saved for it's own PR, so this PR should be ready for review and merge.

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to merge this as is! The main goal is to unblock further work on the migrations/testing required around force-enabling table-from-source syntax, and we can iterate on the specifics of main source exports after that.

#

$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET force_source_table_syntax = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really is just a plain copy ... 😅

@jkosh44 jkosh44 merged commit f3ee8dd into MaterializeInc:main Jan 22, 2025
150 of 240 checks passed
@jkosh44 jkosh44 deleted the primary-export-details-none branch January 22, 2025 20:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants