Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatic conflict resolution for upsert #3270

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 89 additions & 81 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import uuid
import warnings
from abc import ABC, abstractmethod
from abc import ABC
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -26,6 +26,7 @@
Optional,
Sequence,
Set,
Tuple,
TypedDict,
Union,
)
Expand All @@ -49,9 +50,6 @@
CleanupStats,
_Dataset,
_MergeInsertBuilder,
_Operation,
_RewriteGroup,
_RewrittenIndex,
_Scanner,
_write_dataset,
)
Expand Down Expand Up @@ -105,9 +103,35 @@ def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):

return super(MergeInsertBuilder, self).execute(reader)

def execute_uncommitted(
self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None
) -> Tuple[Transaction, Dict[str, Any]]:
"""Executes the merge insert operation without committing

This function updates the original dataset and returns a dictionary with
information about merge statistics - i.e. the number of inserted, updated,
and deleted rows.

Parameters
----------

data_obj: ReaderLike
The new data to use as the source table for the operation. This parameter
can be any source of data (e.g. table / dataset) that
:func:`~lance.write_dataset` accepts.
schema: Optional[pa.Schema]
The schema of the data. This only needs to be supplied whenever the data
source is some kind of generator.
"""
reader = _coerce_reader(data_obj, schema)

return super(MergeInsertBuilder, self).execute_uncommitted(reader)

# These next three overrides exist only to document the methods

def when_matched_update_all(self, condition: Optional[str] = None):
def when_matched_update_all(
self, condition: Optional[str] = None
) -> "MergeInsertBuilder":
"""
Configure the operation to update matched rows

Expand All @@ -128,7 +152,7 @@ def when_matched_update_all(self, condition: Optional[str] = None):
"""
return super(MergeInsertBuilder, self).when_matched_update_all(condition)

def when_not_matched_insert_all(self):
def when_not_matched_insert_all(self) -> "MergeInsertBuilder":
"""
Configure the operation to insert not matched rows

Expand All @@ -138,7 +162,9 @@ def when_not_matched_insert_all(self):
"""
return super(MergeInsertBuilder, self).when_not_matched_insert_all()

def when_not_matched_by_source_delete(self, expr: Optional[str] = None):
def when_not_matched_by_source_delete(
self, expr: Optional[str] = None
) -> "MergeInsertBuilder":
"""
Configure the operation to delete source rows that do not match

Expand Down Expand Up @@ -2101,7 +2127,7 @@ def _commit(
@staticmethod
def commit(
base_uri: Union[str, Path, LanceDataset],
operation: LanceOperation.BaseOperation,
operation: Union[LanceOperation.BaseOperation, Transaction],
read_version: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -2206,24 +2232,44 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

if read_version is None and not isinstance(
operation, (LanceOperation.Overwrite, LanceOperation.Restore)
if (
isinstance(operation, LanceOperation.BaseOperation)
and read_version is None
and not isinstance(
operation, (LanceOperation.Overwrite, LanceOperation.Restore)
)
):
raise ValueError(
"read_version is required for all operations except "
"Overwrite and Restore"
)

new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
if isinstance(operation, Transaction):
new_ds = _Dataset.commit_transaction(
base_uri,
operation,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
elif isinstance(operation, LanceOperation.BaseOperation):
new_ds = _Dataset.commit(
base_uri,
operation,
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
else:
raise TypeError(
"operation must be a LanceOperation.BaseOperation or Transaction, "
f"got {type(operation)}"
)
ds = LanceDataset.__new__(LanceDataset)
ds._storage_options = storage_options
ds._ds = new_ds
Expand Down Expand Up @@ -2315,19 +2361,6 @@ def commit_batch(
detached=detached,
max_retries=max_retries,
)
merged = Transaction(**merged)
# This logic is specific to append, which is all that should
# be returned here.
# TODO: generalize this to all other transaction types.
merged.operation["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.operation["fragments"]
]
merged.operation = LanceOperation.Append(**merged.operation)
if merged.blobs_op:
merged.blobs_op["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.blobs_op["fragments"]
]
merged.blobs_op = LanceOperation.Append(**merged.blobs_op)
ds = LanceDataset.__new__(LanceDataset)
ds._ds = new_ds
ds._uri = new_ds.uri
Expand Down Expand Up @@ -2413,10 +2446,6 @@ class BaseOperation(ABC):
See available operations under :class:`LanceOperation`.
"""

@abstractmethod
def _to_inner(self):
raise NotImplementedError()

@dataclass
class Overwrite(BaseOperation):
"""
Expand Down Expand Up @@ -2460,7 +2489,7 @@ class Overwrite(BaseOperation):
3 4 d
"""

new_schema: pa.Schema
new_schema: LanceSchema | pa.Schema
fragments: Iterable[FragmentMetadata]

def __post_init__(self):
Expand All @@ -2470,10 +2499,6 @@ def __post_init__(self):
)
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
return _Operation.overwrite(self.new_schema, raw_fragments)

@dataclass
class Append(BaseOperation):
"""
Expand Down Expand Up @@ -2520,10 +2545,6 @@ class Append(BaseOperation):
def __post_init__(self):
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
return _Operation.append(raw_fragments)

@dataclass
class Delete(BaseOperation):
"""
Expand Down Expand Up @@ -2592,11 +2613,27 @@ class Delete(BaseOperation):
def __post_init__(self):
LanceOperation._validate_fragments(self.updated_fragments)

def _to_inner(self):
raw_updated_fragments = [f._metadata for f in self.updated_fragments]
return _Operation.delete(
raw_updated_fragments, self.deleted_fragment_ids, self.predicate
)
@dataclass
class Update(BaseOperation):
"""
Operation that updates rows in the dataset.
Attributes
----------
removed_fragment_ids: list[int]
The ids of the fragments that have been removed entirely.
updated_fragments: list[FragmentMetadata]
The fragments that have been updated with new deletion vectors.
new_fragments: list[FragmentMetadata]
The fragments that contain the new rows.
"""

removed_fragment_ids: List[int]
updated_fragments: List[FragmentMetadata]
new_fragments: List[FragmentMetadata]

def __post_init__(self):
LanceOperation._validate_fragments(self.updated_fragments)
LanceOperation._validate_fragments(self.new_fragments)

@dataclass
class Merge(BaseOperation):
Expand Down Expand Up @@ -2658,18 +2695,14 @@ class Merge(BaseOperation):
schema: LanceSchema | pa.Schema

def __post_init__(self):
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
if isinstance(self.schema, pa.Schema):
warnings.warn(
"Passing a pyarrow.Schema to Merge is deprecated. "
"Please use a LanceSchema instead.",
DeprecationWarning,
)
self.schema = LanceSchema.from_pyarrow(self.schema)
return _Operation.merge(raw_fragments, self.schema)
LanceOperation._validate_fragments(self.fragments)

@dataclass
class Restore(BaseOperation):
Expand All @@ -2679,9 +2712,6 @@ class Restore(BaseOperation):

version: int

def _to_inner(self):
return _Operation.restore(self.version)

@dataclass
class RewriteGroup:
"""
Expand All @@ -2691,11 +2721,6 @@ class RewriteGroup:
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]

def _to_inner(self):
old_fragments = [f._metadata for f in self.old_fragments]
new_fragments = [f._metadata for f in self.new_fragments]
return _RewriteGroup(old_fragments, new_fragments)

@dataclass
class RewrittenIndex:
"""
Expand All @@ -2705,9 +2730,6 @@ class RewrittenIndex:
old_id: str
new_id: str

def _to_inner(self):
return _RewrittenIndex(self.old_id, self.new_id)

@dataclass
class Rewrite(BaseOperation):
"""
Expand All @@ -2734,11 +2756,6 @@ def __post_init__(self):
all_frags += [new for group in self.groups for new in group.new_fragments]
LanceOperation._validate_fragments(all_frags)

def _to_inner(self):
groups = [group._to_inner() for group in self.groups]
rewritten_indices = [index._to_inner() for index in self.rewritten_indices]
return _Operation.rewrite(groups, rewritten_indices)

@dataclass
class CreateIndex(BaseOperation):
"""
Expand All @@ -2751,15 +2768,6 @@ class CreateIndex(BaseOperation):
dataset_version: int
fragment_ids: Set[int]

def _to_inner(self):
return _Operation.create_index(
self.uuid,
self.name,
self.fields,
self.dataset_version,
self.fragment_ids,
)


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
Loading
Loading