diff --git a/source-oracle-flashback/source_oracle_flashback/__init__.py b/source-oracle-flashback/source_oracle_flashback/__init__.py index c70ef4f21..0f1f6602b 100644 --- a/source-oracle-flashback/source_oracle_flashback/__init__.py +++ b/source-oracle-flashback/source_oracle_flashback/__init__.py @@ -25,6 +25,7 @@ from .resources import ( all_resources, + enabled_resources, validate_flashback ) from .models import ( @@ -98,7 +99,7 @@ async def validate( ) self.pool = create_pool(log, validate.config) await validate_flashback(log, validate.config, self.pool, is_rds) - resources = await all_resources(log, self, validate.config, self.pool, is_rds) + resources = await enabled_resources(log, self, validate.config, self.pool, is_rds, validate.bindings) resolved = common.resolve_bindings(validate.bindings, resources) return common.validated(resolved) @@ -125,7 +126,7 @@ async def open( ) self.pool = create_pool(log, open.capture.config) await validate_flashback(log, open.capture.config, self.pool, is_rds) - resources = await all_resources(log, self, open.capture.config, self.pool, is_rds) + resources = await enabled_resources(log, self, open.capture.config, self.pool, is_rds, open.capture.bindings) resolved = common.resolve_bindings(open.capture.bindings, resources) return common.open(open, resolved) diff --git a/source-oracle-flashback/source_oracle_flashback/resources.py b/source-oracle-flashback/source_oracle_flashback/resources.py index a41afe594..9d8283664 100644 --- a/source-oracle-flashback/source_oracle_flashback/resources.py +++ b/source-oracle-flashback/source_oracle_flashback/resources.py @@ -96,14 +96,11 @@ async def validate_flashback( log.warn("We recommend guaranteeing retention of the undo tablespace. See go.estuary.dev/source-oracle for more information.") -async def all_resources( - log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool, +async def tables_to_resources( + log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool, tables: list[Table], ) -> list[common.Resource]: resources_list = [] - - oracle_tables = await fetch_tables(log, pool, config.advanced.schemas) - owners = set([t.owner for t in oracle_tables]) - oracle_columns = await fetch_columns(log, pool, owners) + max_rowids = [] current_scn = None async with pool.acquire() as conn: @@ -113,14 +110,6 @@ async def all_resources( log.debug("current scn", current_scn) - tables = [] - for ot in oracle_tables: - columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner] - t = build_table(log, config, ot.owner, ot.table_name, columns) - tables.append(t) - - max_rowids = [] - async with pool.acquire() as conn: with conn.cursor() as c: table_list = "" @@ -210,3 +199,42 @@ def open( )) return resources_list + + +async def enabled_resources( + log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool, + bindings: list[common._ResolvableBinding], +) -> list[common.Resource]: + oracle_tables = await fetch_tables(log, pool, config.advanced.schemas) + owners = set([t.owner for t in oracle_tables]) + oracle_columns = await fetch_columns(log, pool, owners) + + tables = [] + for binding in bindings: + path = binding.resourceConfig.path() + + # Find a resource which matches this binding. + for ot in oracle_tables: + if path == [ot.owner, ot.table_name]: + columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner] + t = build_table(log, config, ot.owner, ot.table_name, columns) + tables.append(t) + break + + return await tables_to_resources(log, http, config, pool, is_rds, tables) + + +async def all_resources( + log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool, +) -> list[common.Resource]: + oracle_tables = await fetch_tables(log, pool, config.advanced.schemas) + owners = set([t.owner for t in oracle_tables]) + oracle_columns = await fetch_columns(log, pool, owners) + + tables = [] + for ot in oracle_tables: + columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner] + t = build_table(log, config, ot.owner, ot.table_name, columns) + tables.append(t) + + return await tables_to_resources(log, http, config, pool, is_rds, tables)