Skip to content

Commit

Permalink
WIP (trying to fix import speed issues)
Browse files Browse the repository at this point in the history
  • Loading branch information
niconoe committed Jan 17, 2025
1 parent a62b3e8 commit 96d163a
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 46 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,12 @@ See [INSTALL.md](INSTALL.md) for more information.



# TODO next
- talk damiano
- check I still receive mails from dev-alert
- install django debug toolbar to monitor queries on alert list page (for example)
- check if nightly nownload still work and if data appear consistent after
- test rq pprocesses (m![img.png](img.png)rk as seen in batch)


Import speed: c'est le "migrating comments" qui prends trop de temps, probablement à cause des unseen migrations
47 changes: 32 additions & 15 deletions dashboard/management/commands/import_observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@
from maintenance_mode.core import set_maintenance_mode # type: ignore

from dashboard.management.commands.helpers import get_dataset_name_from_gbif_api
from dashboard.models import Species, Observation, DataImport, Dataset
from dashboard.models import (
Species,
Observation,
DataImport,
Dataset,
create_unseen_observations,
migrate_unseen_observations,
)
from dashboard.views.helpers import (
create_or_refresh_materialized_views,
)
Expand Down Expand Up @@ -233,26 +240,30 @@ def _import_all_observations_from_dwca(
skipped_observations_counter = skipped_observations_counter + 1
self.stdout.write("x", ending="")

if index % BULK_CREATE_CHUNK_SIZE == 0:
if index > 0 and index % BULK_CREATE_CHUNK_SIZE == 0:
self.log_with_time("Bulk size reached...")
self.batch_insert_observations(observations_to_insert)
observations_to_insert = []

# Insert the last chunk
self.batch_insert_observations(observations_to_insert)
if observations_to_insert:
self.batch_insert_observations(observations_to_insert)

return skipped_observations_counter

def batch_insert_observations(self, observations_to_insert: list[Observation]):
self.log_with_time("Bulk creation")
inserted_observations = Observation.objects.bulk_create(observations_to_insert)
self.log_with_time("Migrating linked entities")
self.log_with_time("Migrating comments")
new_obs_ids = []
for obs in inserted_observations:
self.stdout.write("/", ending="")
replaced = obs.migrate_linked_entities()
if not replaced:
# That's a new observation in the system, it should be marked as unseen
# for every user (if older than delay + matching an alert)
obs.mark_as_unseen_for_all_users_if_needed()
new_obs_ids.append(obs.id)

self.log_with_time("Creating unseen observations for new observations")
create_unseen_observations(Observation.objects.filter(id__in=new_obs_ids))

def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
Expand Down Expand Up @@ -335,12 +346,13 @@ def handle(self, *args, **options) -> None:
datasets_referenced_in_dwca[gbif_dataset_key] = dataset_name

# 3.2 Fix the empty names (see GBIF bug)
self.log_with_time("3.2 Fixing empty dataset names")
for dataset_key, dataset_name in datasets_referenced_in_dwca.items():
if dataset_name == "":
datasets_referenced_in_dwca[
dataset_key
] = get_dataset_name_from_gbif_api(dataset_key)
# self.log_with_time("3.2 Fixing empty dataset names")
# TODO: uncomment this after GBIF outage
# for dataset_key, dataset_name in datasets_referenced_in_dwca.items():
# if dataset_name == "":
# datasets_referenced_in_dwca[
# dataset_key
# ] = get_dataset_name_from_gbif_api(dataset_key)

# 3.3 Create/update the Dataset objects
self.log_with_time("3.3 Creating/updating the Dataset objects")
Expand Down Expand Up @@ -376,10 +388,15 @@ def handle(self, *args, **options) -> None:
)
)

self.log_with_time("All observations imported")

# Migrate the unseen objects, or delete them if they are not relevant anymore
self.log_with_time("Migrating unseen observations")
migrate_unseen_observations()

self.log_with_time(
"All observations imported, now deleting observations linked to previous data imports..."
"now deleting observations linked to previous data imports..."
)

# 6. Remove previous observations
Observation.objects.exclude(data_import=current_data_import).delete()
self.log_with_time("Previous observations deleted")
Expand Down
200 changes: 171 additions & 29 deletions dashboard/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,50 @@ def as_dict(self) -> dict[str, Any]:
}


def create_unseen_observations(observation_queryset: QuerySet["Observation"]) -> None:
"""
Create ObservationUnseen entries for all users that have alerts matching the
provided observations and that are not older than the user's notification delay
!!! Only applicable to new observations !!!
"""
# Get the current date
today = timezone.now().date()

# Iterate over all users
for user in User.objects.all():
# Calculate the threshold date based on the user's notification delay
threshold_date = today - datetime.timedelta(days=user.notification_delay_days)

# Filter the provided observation queryset to get recent observations
recent_observations = observation_queryset.filter(date__gt=threshold_date)

# If no observation are more recent than the user threshold, we can take a shortcut
if not recent_observations:
continue

user_alerts = user.alert_set.all()

observation_ids_in_alerts = set()
for alert in user_alerts:
for observation in alert.observations():
observation_ids_in_alerts.add(observation.id)

# Iterate over all recent observations
for observation in recent_observations:
# Check if the observation is included in at least one of the user's alerts
if observation.id in observation_ids_in_alerts:
# Create a new entry in ObservationUnseen
ObservationUnseen.objects.create(observation=observation, user=user)
# print(
# f"Created ObservationUnseen for observation {observation.id} and user {user.id}"
# )
# else:
# print(
# f"Skipping observation {observation.id} for user {user.id} (not in any alert)"
# )


class ObservationManager(models.Manager["Observation"]):
def filtered_from_my_params(
self,
Expand Down Expand Up @@ -270,6 +314,73 @@ def filtered_from_my_params(
return qs


# def migrate_unseen_observations() -> None:
# # TODO: this one is bugged and sometimes delete freshly created unseen observations
# """Migrate unseen observations to new observations or delete them if they are no longer relevant."""
# unseen_observations = ObservationUnseen.objects.select_related(
# "observation", "user"
# ).all()
# to_delete = []
# to_update = []
#
# for unseen in unseen_observations:
# new_observation = (
# Observation.objects.filter(
# stable_id=unseen.observation.stable_id,
# data_import__gt=unseen.observation.data_import,
# )
# .order_by("data_import")
# .first()
# )
#
# if new_observation:
# if unseen.relevant_for_user(date_new_observation=new_observation.date):
# unseen.observation = new_observation
# to_update.append(unseen)
# else:
# to_delete.append(unseen.pk)
# else:
# to_delete.append(unseen.pk)
#
# if to_delete:
# ObservationUnseen.objects.filter(pk__in=to_delete).delete()
# if to_update:
# ObservationUnseen.objects.bulk_update(to_update, ["observation"])


def migrate_unseen_observations() -> None:
"""Migrate unseen observations to new observations or delete them if they are no longer relevant."""
unseen_observations = ObservationUnseen.objects.select_related(
"observation", "user"
).all()
to_delete = []
to_update = []

for unseen in unseen_observations:
new_observation = (
Observation.objects.filter(
stable_id=unseen.observation.stable_id,
data_import__gt=unseen.observation.data_import,
)
.order_by("data_import")
.first()
)

if new_observation:
if unseen.relevant_for_user(date_new_observation=new_observation.date):
unseen.observation = new_observation
to_update.append(unseen)
else:
# Only mark for deletion if the observation is not relevant anymore
if not unseen.relevant_for_user(date_new_observation=timezone.now().date()):
to_delete.append(unseen.pk)

if to_delete:
ObservationUnseen.objects.filter(pk__in=to_delete).delete()
if to_update:
ObservationUnseen.objects.bulk_update(to_update, ["observation"])


class Observation(models.Model):
# Pay attention to the fact that this model actually has 4(!) different "identifiers" which serve different
# purposes. gbif_id, occurrence_id and stable_id are documented below, Django also adds the usual and implicit "pk"
Expand Down Expand Up @@ -396,23 +507,55 @@ def date_older_than_user_delay(user: User, the_date) -> bool:
today - datetime.timedelta(days=user.notification_delay_days)
)

def mark_as_unseen_for_all_users_if_needed(self) -> None:
"""Mark the observation as unseen for all users if:
- it's more recent than the user's notification delay
- it matches at least one alert of the user
!! It doesn't look into the status (to check if the user has already seen
it), so it is only suitable for new observations, not replaced ones !!
"""

# TODO: test this logic !!
for user in get_user_model().objects.all():
if not self.date_older_than_user_delay(
user, the_date=self.date
) and user.obs_match_alerts(self):
self.mark_as_unseen_by(user)

# def mark_as_unseen_for_all_users_if_needed(self) -> None:
# """Mark the observation as unseen for all users if:
# - it's more recent than the user's notification delay
# - it matches at least one alert of the user
#
# !! It doesn't look into the status (to check if the user has already seen
# it), so it is only suitable for new observations, not replaced ones !!
#
# """
#
# # TODO: test this logic !!
# for user in get_user_model().objects.all():
# if not self.date_older_than_user_delay(
# user, the_date=self.date
# ) and user.obs_match_alerts(self):
# self.mark_as_unseen_by(user)

# def migrate_linked_entities(self) -> bool:
# """Migrate existing entities (comments, ...) linked to a previous observation that share the stable ID
#
# Does nothing if there's no replaced observation.
#
# Returns True if it migrated an existing observation, False otherwise
#
# Note: in case an Unseen object isn't relevant anymore (because the observation
# is too old, or it does not belong to an alert), it will be deleted rather than migrated
# """
# replaced_observation = self.replaced_observation
# if replaced_observation is not None:
# # 1. Migrating comments
# for comment in replaced_observation.observationcomment_set.all():
# comment.observation = self
# comment.save()
# # 2. Migrating seen/unseen status
# for observation_unseen in replaced_observation.observationunseen_set.all():
# # TODO: extensively test this
# if not observation_unseen.relevant_for_user(
# date_new_observation=self.date
# ):
# observation_unseen.delete()
# else:
# observation_unseen.observation = self
# observation_unseen.save()
#
# return True
# else:
# return False
# TODO: rename this to migrate_linked_Comments since the scope changed
# TODO: make sure all calling code also calls migrate_unseen_observations
def migrate_linked_entities(self) -> bool:
"""Migrate existing entities (comments, ...) linked to a previous observation that share the stable ID
Expand All @@ -426,19 +569,18 @@ def migrate_linked_entities(self) -> bool:
replaced_observation = self.replaced_observation
if replaced_observation is not None:
# 1. Migrating comments
for comment in replaced_observation.observationcomment_set.all():
comment.observation = self
comment.save()
replaced_observation.observationcomment_set.update(observation=self)

# 2. Migrating seen/unseen status
for observation_unseen in replaced_observation.observationunseen_set.all():
# TODO: extensively test this
if not observation_unseen.relevant_for_user(
date_new_observation=self.date
):
observation_unseen.delete()
else:
observation_unseen.observation = self
observation_unseen.save()
# for observation_unseen in replaced_observation.observationunseen_set.all():
# # TODO: extensively test this
# if not observation_unseen.relevant_for_user(
# date_new_observation=self.date
# ):
# observation_unseen.delete()
# else:
# observation_unseen.observation = self
# observation_unseen.save()

return True
else:
Expand Down
7 changes: 5 additions & 2 deletions dashboard/tests/models/test_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ObservationComment,
ObservationUnseen,
Alert,
migrate_unseen_observations,
)

SAMPLE_DATASET_KEY = "940821c0-3269-11df-855a-b8a03c50a862"
Expand Down Expand Up @@ -230,7 +231,7 @@ def test_replace_observation(self):
"""High-level test: after creating a new observation with the same stable_id, make sure we can migrate the
linked entities then and then delete the initial observation"""

# 2. Create a new one
# 2. Create a new observation that replaces one from the fixtures
new_di = DataImport.objects.create(start=timezone.now())
new_observation = Observation.objects.create(
gbif_id=1,
Expand All @@ -247,8 +248,9 @@ def test_replace_observation(self):

# Migrate entities
new_observation.migrate_linked_entities()
migrate_unseen_observations()

# Make sure the counts are correct
# Make sure the counts for comments are correct
self.assertEqual(new_observation.observationcomment_set.count(), 2)
self.assertEqual(old_observation.observationcomment_set.count(), 0)

Expand Down Expand Up @@ -293,6 +295,7 @@ def test_replace_observation_unseen(self):
)

replacement_second_obs.migrate_linked_entities()
migrate_unseen_observations()
self.obs2_unseen_obj.refresh_from_db()

self.assertEqual(self.obs2_unseen_obj.observation, replacement_second_obs)
Expand Down

0 comments on commit 96d163a

Please sign in to comment.