Skip to content

Commit

Permalink
source-oracle-flashback: only validate and operate on enabled bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Oct 3, 2024
1 parent 0e42de3 commit 4fdf86d
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 11 deletions.
5 changes: 3 additions & 2 deletions source-oracle-flashback/source_oracle_flashback/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from .resources import (
all_resources,
enabled_resources,
validate_flashback
)
from .models import (
Expand Down Expand Up @@ -98,7 +99,7 @@ async def validate(
)
self.pool = create_pool(log, validate.config)
await validate_flashback(log, validate.config, self.pool, is_rds)
resources = await all_resources(log, self, validate.config, self.pool, is_rds)
resources = await enabled_resources(log, self, validate.config, self.pool, is_rds, validate.bindings)
resolved = common.resolve_bindings(validate.bindings, resources)
return common.validated(resolved)

Expand All @@ -125,7 +126,7 @@ async def open(
)
self.pool = create_pool(log, open.capture.config)
await validate_flashback(log, open.capture.config, self.pool, is_rds)
resources = await all_resources(log, self, open.capture.config, self.pool, is_rds)
resources = await enabled_resources(log, self, open.capture.config, self.pool, is_rds, open.capture.bindings)
resolved = common.resolve_bindings(open.capture.bindings, resources)
return common.open(open, resolved)

Expand Down
82 changes: 73 additions & 9 deletions source-oracle-flashback/source_oracle_flashback/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,9 @@ async def validate_flashback(
log.warn("We recommend guaranteeing retention of the undo tablespace. See go.estuary.dev/source-oracle for more information.")


async def all_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool,
) -> list[common.Resource]:
resources_list = []

async def get_tables(
log: Logger, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, bindings: list[common._ResolvableBinding] | None,
) -> list[Table]:
oracle_tables = await fetch_tables(log, pool, config.advanced.schemas)
owners = set([t.owner for t in oracle_tables])
oracle_columns = await fetch_columns(log, pool, owners)
Expand All @@ -114,13 +112,40 @@ async def all_resources(
log.debug("current scn", current_scn)

tables = []
for ot in oracle_tables:
columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner]
t = build_table(log, config, ot.owner, ot.table_name, columns)
tables.append(t)
if bindings is None:
for ot in oracle_tables:
columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner]
t = build_table(log, config, ot.owner, ot.table_name, columns)
tables.append(t)
else:
for binding in bindings:
path = binding.resourceConfig.path()

# Find a resource which matches this binding.
for ot in oracle_tables:
if path == [ot.owner, ot.table_name]:
columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner]
t = build_table(log, config, ot.owner, ot.table_name, columns)
tables.append(t)
break

return tables


async def tables_to_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool, tables: list[Table],
) -> list[common.Resource]:
resources_list = []
max_rowids = []

current_scn = None
async with pool.acquire() as conn:
with conn.cursor() as c:
await c.execute("SELECT current_scn FROM V$DATABASE")
current_scn = (await c.fetchone())[0]

log.debug("current scn", current_scn)

async with pool.acquire() as conn:
with conn.cursor() as c:
table_list = ""
Expand Down Expand Up @@ -210,3 +235,42 @@ def open(
))

return resources_list


async def enabled_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool,
bindings: list[common._ResolvableBinding],
) -> list[common.Resource]:
oracle_tables = await fetch_tables(log, pool, config.advanced.schemas)
owners = set([t.owner for t in oracle_tables])
oracle_columns = await fetch_columns(log, pool, owners)

tables = []
for binding in bindings:
path = binding.resourceConfig.path()

# Find a resource which matches this binding.
for ot in oracle_tables:
if path == [ot.owner, ot.table_name]:
columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner]
t = build_table(log, config, ot.owner, ot.table_name, columns)
tables.append(t)
break

return await tables_to_resources(log, http, config, pool, is_rds, tables)


async def all_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig, pool: oracledb.AsyncConnectionPool, is_rds: bool,
) -> list[common.Resource]:
oracle_tables = await fetch_tables(log, pool, config.advanced.schemas)
owners = set([t.owner for t in oracle_tables])
oracle_columns = await fetch_columns(log, pool, owners)

tables = []
for ot in oracle_tables:
columns = [col for col in oracle_columns if col.table_name == ot.table_name and col.owner == ot.owner]
t = build_table(log, config, ot.owner, ot.table_name, columns)
tables.append(t)

return await tables_to_resources(log, http, config, pool, is_rds, tables)

0 comments on commit 4fdf86d

Please sign in to comment.