Skip to content

Commit

Permalink
source-oracle-flashback: order first, then limit row count
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Dec 23, 2024
1 parent 61669d0 commit 9a7645c
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions source-oracle-flashback/source_oracle_flashback/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ async def fetch_page(
c.arraysize = backfill_chunk_size
c.prefetchrows = backfill_chunk_size + 1
c.outputtypehandler = number_to_decimal
query = template_env.get_template("backfill").render(table=table, rowid=last_rowid or page, max_rowid=cutoff[0])
query = template_env.get_template("backfill").render(table=table, rowid=last_rowid or page, max_rowid=cutoff[0], chunk_size=backfill_chunk_size)
log.debug("fetch_page", query, "page", last_rowid or page, "cutoff", cutoff)
await c.execute(query, rownum_end=backfill_chunk_size)
await c.execute(query)
cols = [col[0] for col in c.description]
c.rowfactory = functools.partial(backfill_rowfactory, table.table_name, cols)

Expand All @@ -212,15 +212,12 @@ async def fetch_page(

i += 1
if i % CHECKPOINT_EVERY == 0:
log.debug("emitting checkpoint at CHECKPOINT_EVERY", i, "last_rowid", last_rowid)
yield last_rowid

if c.rowcount < backfill_chunk_size:
log.debug("had less than backfill_chunk_size documents", c.rowcount, "last_rowid", last_rowid)
break

if last_rowid is not None and (i % CHECKPOINT_EVERY) != 0:
log.debug("emitting final checkpoint of backfill", last_rowid, "total documents", i)
yield last_rowid

op_mapping = {
Expand Down Expand Up @@ -366,15 +363,19 @@ def cast_column(c: OracleColumn) -> str:
# an all-uppercase ROWID is a reserved keyword that cannot be used
# as a column identifier, however other casings of the same word
# can be used as a column name.
# NOTE: we use FETCH FIRST here which is supported from 12c onwards
# if we want to support older versions we can use a subquery and apply
# a rownum filter on the wrapper query. This approach was not implemented
# due to potential performance implications.
'backfill': """
SELECT ROWID, {% for c in table.columns -%}
{%- if not loop.first %}, {% endif -%}
{{ c | cast }}
{%- endfor %} FROM {{ table.quoted_owner }}.{{ table.quoted_table_name }}
WHERE ROWID > '{{ rowid }}'
AND ROWID <= '{{ max_rowid }}'
AND ROWNUM <= :rownum_end
ORDER BY ROWID ASC
FETCH FIRST {{ chunk_size }} ROWS ONLY
""",
'inc': """
SELECT /*+parallel */ VERSIONS_STARTSCN, VERSIONS_OPERATION, ROWID, {% for c in table.columns -%}
Expand Down

0 comments on commit 9a7645c

Please sign in to comment.