Skip to content

Commit

Permalink
[r] Track table names in AnVIL bundles (#6639, PR #6640)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsotirho-ucsc committed Oct 21, 2024
2 parents 3e1c851 + ef0637e commit 1b1b473
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 241 deletions.
10 changes: 6 additions & 4 deletions src/azul/azulclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,12 @@ def remote_reindex_partition(self, message: JSON) -> None:
validate_uuid_prefix(prefix)
source = self.repository_plugin(catalog).source_from_json(source)
bundle_fqids = self.list_bundles(catalog, source, prefix)
bundle_fqids = self.filter_obsolete_bundle_versions(bundle_fqids)
log.info('After filtering obsolete versions, '
'%i bundles remain in prefix %r of source %r in catalog %r',
len(bundle_fqids), prefix, str(source.spec), catalog)
# All AnVIL bundles and entities use the same version
if not config.is_anvil_enabled(catalog):
bundle_fqids = self.filter_obsolete_bundle_versions(bundle_fqids)
log.info('After filtering obsolete versions, '
'%i bundles remain in prefix %r of source %r in catalog %r',
len(bundle_fqids), prefix, str(source.spec), catalog)
messages = (
self.bundle_message(catalog, bundle_fqid)
for bundle_fqid in bundle_fqids
Expand Down
9 changes: 3 additions & 6 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -1528,13 +1528,10 @@ class Replica(Document[ReplicaCoordinates[E]]):
"""

#: The type of replica, i.e., what sort of metadata document from the
#: underlying data repository we are storing a copy of. Conceptually related
#: to the entity type, but its value may be different from the entity type.
#: For example, AnVIL replicas use the name of the data table that contains
#: the entity, e.g. 'anvil_file', instead of just 'file'.
#: underlying data repository we are storing a copy of. In practice, this is
#: the same as `self.coordinates.entity.entity_type`, but this isn't
#: necessarily the case.
#:
#: We can't model replica types as entity types because we want to hold all
#: replicas in a single index per catalog to facilitate quick retrieval.
#: Typically, all replicas of the same type have similar shapes, just like
#: contributions for entities of the same type. However, mixing replicas of
#: different types results in an index containing documents of heterogeneous
Expand Down
4 changes: 4 additions & 0 deletions src/azul/plugins/metadata/anvil/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def to_entity_link(self,

@attrs.define(kw_only=True)
class AnvilBundle(Bundle[BUNDLE_FQID], ABC):
# The `entity_type` attribute of these keys contains the entities' BigQuery
# table name (e.g. `anvil_sequencingactivity`), not the entity type used for
# the contributions (e.g. `activities`). The metadata plugin converts from
# the former to the latter during transformation.
entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)
links: set[EntityLink] = attrs.field(factory=set)

Expand Down
73 changes: 41 additions & 32 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,26 @@ def _transform(self,
raise NotImplementedError

def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
return f'anvil_{entity.entity_type}', self.bundle.entities[entity]
return entity.entity_type, self.bundle.entities[entity]

def _pluralize(self, entity_type: str) -> str:
if entity_type == 'diagnosis':
def _convert_entity_type(self, entity_type: str) -> str:
assert entity_type == 'bundle' or entity_type.startswith('anvil_'), entity_type
if entity_type == 'anvil_diagnosis':
# Irregular plural form
return 'diagnoses'
elif entity_type.endswith('activity'):
# Polymorphic. Could be `anvil_sequencingactivity`,
# `anvil_assayactivity`, `anvil_activity`, etc
return 'activities'
else:
return pluralize(entity_type)
return pluralize(entity_type.removeprefix('anvil_'))

def _contains(self,
partition: BundlePartition,
entity: EntityReference
) -> bool:
return (
self._pluralize(entity.entity_type).endswith(self.entity_type())
self._convert_entity_type(entity.entity_type) == self.entity_type()
and partition.contains(UUID(entity.entity_id))
)

Expand Down Expand Up @@ -358,7 +364,7 @@ def _activity(self, activity: EntityReference) -> MutableJSON:
field_types = self._activity_types()
common_fields = {
'activity_table': activity.entity_type,
'activity_id': metadata[f'{activity.entity_type}_id']
'activity_id': metadata[f'{activity.entity_type.removeprefix("anvil_")}_id']
}
# Activities are unique in that they may not contain every field defined
# in their field types due to polymorphism, so we need to pad the field
Expand Down Expand Up @@ -400,15 +406,18 @@ def _file(self, file: EntityReference) -> MutableJSON:
uuid=file.entity_id)

def _only_dataset(self) -> EntityReference:
return one(self._entities_by_type['dataset'])
return one(self._entities_by_type['anvil_dataset'])

_activity_polymorphic_types = {
'activity',
'alignmentactivity',
'assayactivity',
'sequencingactivity',
'variantcallingactivity'
}
@cached_property
def _activity_polymorphic_types(self) -> AbstractSet[str]:
from azul.plugins.metadata.anvil import (
anvil_schema,
)
return {
table['name']
for table in anvil_schema['tables']
if table['name'].endswith('activity')
}

@classmethod
def inner_entity_id(cls, entity_type: EntityType, entity: JSON) -> EntityID:
Expand Down Expand Up @@ -462,11 +471,11 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, self._entities_by_type['biosample']),
biosamples=self._entities(self._biosample, self._entities_by_type['anvil_biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']),
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file'])
diagnoses=self._entities(self._diagnosis, self._entities_by_type['anvil_diagnosis']),
donors=self._entities(self._donor, self._entities_by_type['anvil_donor']),
files=self._entities(self._file, self._entities_by_type['anvil_file'])
)
yield self._contribution(contents, entity.entity_id)

Expand Down Expand Up @@ -514,11 +523,11 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
linked = self._linked_entities(entity)
contents = dict(
activities=[self._activity(entity)],
biosamples=self._entities(self._biosample, linked['biosample']),
biosamples=self._entities(self._biosample, linked['anvil_biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, linked['file'])
diagnoses=self._entities(self._diagnosis, linked['anvil_diagnosis']),
donors=self._entities(self._donor, linked['anvil_donor']),
files=self._entities(self._file, linked['anvil_file'])
)
yield self._contribution(contents, entity.entity_id)

Expand All @@ -538,9 +547,9 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
)),
biosamples=[self._biosample(entity)],
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, linked['file']),
diagnoses=self._entities(self._diagnosis, linked['anvil_diagnosis']),
donors=self._entities(self._donor, linked['anvil_donor']),
files=self._entities(self._file, linked['anvil_file']),
)
yield self._contribution(contents, entity.entity_id)

Expand Down Expand Up @@ -586,11 +595,11 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
linked[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, linked['biosample']),
biosamples=self._entities(self._biosample, linked['anvil_biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
diagnoses=self._entities(self._diagnosis, linked['anvil_diagnosis']),
donors=[self._donor(entity)],
files=self._entities(self._file, linked['file']),
files=self._entities(self._file, linked['anvil_file']),
)
yield self._contribution(contents, entity.entity_id)

Expand All @@ -610,10 +619,10 @@ def _transform(self,
linked[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, linked['biosample']),
biosamples=self._entities(self._biosample, linked['anvil_biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
diagnoses=self._entities(self._diagnosis, linked['anvil_diagnosis']),
donors=self._entities(self._donor, linked['anvil_donor']),
files=[self._file(entity)],
)
yield self._contribution(contents, entity.entity_id)
Expand All @@ -627,5 +636,5 @@ def _transform(self,
# redundant and impractically large. Therefore, we leave the
# hub IDs field empty for datasets and rely on the tenet
# that every file is an implicit hub of its parent dataset.
file_hub=None if linked_entity.entity_type == 'dataset' else entity.entity_id,
file_hub=None if linked_entity.entity_type == 'anvil_dataset' else entity.entity_id,
)
Loading

0 comments on commit 1b1b473

Please sign in to comment.