diff --git a/CHANGELOG.md b/CHANGELOG.md index 24e6dad9..a03593a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ List of the most important changes for each release. +## 0.6.6 +- Adds an asymmetry to FSIC calculation to ensure all matching data is synced. + ## 0.6.5 - Sets queuing limit of 100k combined FSICs between client and server - Fixes SQL expression tree error when there are many FSICs, up to 100k limit diff --git a/morango/__init__.py b/morango/__init__.py index f8dfb72e..c9e1e81f 100644 --- a/morango/__init__.py +++ b/morango/__init__.py @@ -3,4 +3,4 @@ from __future__ import unicode_literals default_app_config = "morango.apps.MorangoConfig" -__version__ = "0.6.5" +__version__ = "0.6.6a0" diff --git a/morango/models/core.py b/morango/models/core.py index a52a9dfe..6bc9a13b 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -11,14 +11,18 @@ from django.db import transaction from django.db.models import F from django.db.models import Func +from django.db.models import Max from django.db.models import TextField from django.db.models import Value from django.db.models.deletion import Collector +from django.db.models.expressions import CombinedExpression from django.db.models.fields.related import ForeignKey from django.db.models.functions import Cast from django.utils import six from django.utils import timezone +from functools import reduce + from morango import proquint from morango.registry import syncable_models from morango.models.certificates import Certificate @@ -309,8 +313,7 @@ def delete_buffers(self): """ with connection.cursor() as cursor: cursor.execute( - "DELETE FROM morango_buffer WHERE transfer_session_id = %s", - (self.id,), + "DELETE FROM morango_buffer WHERE transfer_session_id = %s", (self.id,) ) cursor.execute( "DELETE FROM morango_recordmaxcounterbuffer WHERE transfer_session_id = %s", @@ -512,6 +515,30 @@ class Meta: abstract = True +class ValueStartsWithField(CombinedExpression): + """ + Django expression that's essentially a `startswith` comparison but comparing that a parameter value starts with a + table field. This also prevents Django from adding unnecessary SQL for the expression + """ + + def __init__(self, value, field): + """ + {value} LIKE {field} || '%' + + :param value: A str of the value LIKE field + :param field: A str of the field name comparing with + """ + # we don't use `Concat` for appending the `%` because it also adds unnecessary SQL + super(ValueStartsWithField, self).__init__( + Value(value, output_field=models.CharField()), + "LIKE", + CombinedExpression( + F(field), "||", Value("%", output_field=models.CharField()) + ), + output_field=models.BooleanField(), + ) + + class DatabaseMaxCounter(AbstractCounter): """ ``DatabaseMaxCounter`` is used to keep track of what data this database already has across all @@ -527,7 +554,9 @@ class Meta: @classmethod @transaction.atomic def update_fsics(cls, fsics, sync_filter): - internal_fsic = DatabaseMaxCounter.calculate_filter_max_counters(sync_filter) + internal_fsic = DatabaseMaxCounter.calculate_filter_specific_instance_counters( + sync_filter + ) updated_fsic = {} for key, value in six.iteritems(fsics): if key in internal_fsic: @@ -546,32 +575,75 @@ def update_fsics(cls, fsics, sync_filter): ) @classmethod - def calculate_filter_max_counters(cls, filters): + def calculate_filter_specific_instance_counters(cls, filters, is_producer=False): + """ + Returns a dict that maps instance_ids to their respective "high-water level" counters with + respect to the provided list of filter partitions, based on what the local database contains. + + First, for each partition in the filter, it calculates the maximum values the database has + received through any filters containing that partition. + + Then, it combines these dicts into a single dict, collapsing across the filter partitions. + In Morango 0.6.5 and below, this was always calculated based on the "minimum" values for + each instance_id, and with instance_ids that didn't exist in *each* of the partitions being + excluded entirely. When the producing side had records needing to be sent for an instance + under one of the filter partitions, but not under another, it would not be included in the + FSIC and thereby lead to the data not being sent, as showed up in: + https://github.com/learningequality/kolibri/issues/8439 + + The solution was to add an asymmetry in how FSICs are calculated, with the sending side + using a "max" instead of a "min" to ensure everything is included, and then the receiving + side still using a "min" (though after it has completed a sync, it updates its counters + such that the min and max should then be equivalent). + + One potential issue remains, but it is an edge case that can be worked around: + - We now take the maxes across the filter partitions and use those as the producer FSICs. + - When the receiver finishes integrating the received data, it updates its counters to match. + - If the sender had actually done a sync with just a subset of those filters in the past, it + might not actually have received everything available for the other filters, and hence the + receiver may not be correct in assuming it now has everything up to the levels of the + producer's FSICs (as it does by taking the "max" across the filter partition FSICs). + There are two ways to avoid this: + - Don't sync with differing subsets of the same partitions across multiple syncs. For + example, if you do syncs with filters "AB" and "AC", don't also do syncs with filters + "AC" and "AD". This is the approach that makes this work in Kolibri, for now. + - OR: Don't do syncs with more than one filter partition at a time. Do each one in sequence. + For example, rather than pushing "AB" and "AC" in a single transfer session, do one pull + for AB and then another one for AC. This has the disadvantage of a bit of extra overhead, + but would likely be the most robust option, and the easiest to enforce and reason about. + """ - # create string of prefixes to place into sql statement - condition = " UNION ".join( - ["SELECT CAST('{}' as TEXT) AS a".format(prefix) for prefix in filters] - ) + queryset = cls.objects.all() - filter_max_calculation = """ - SELECT PMC.instance, MIN(PMC.counter) - FROM - ( - SELECT dmc.instance_id as instance, MAX(dmc.counter) as counter, filter as filter_partition - FROM {dmc_table} as dmc, (SELECT T.a as filter FROM ({filter_list}) as T) as foo - WHERE filter LIKE dmc.partition || '%' - GROUP BY instance, filter_partition - ) as PMC - GROUP BY PMC.instance - HAVING {count} = COUNT(PMC.filter_partition) - """.format( - dmc_table=cls._meta.db_table, filter_list=condition, count=len(filters) - ) + per_filter_max = [] - with connection.cursor() as cursor: - cursor.execute(filter_max_calculation) - # try to get hex value because postgres returns values as uuid - return {getattr(tup[0], "hex", tup[0]): tup[1] for tup in cursor.fetchall()} + for filt in filters: + # {filt} LIKE partition || '%' + qs = queryset.annotate( + filter_matches=ValueStartsWithField(filt, "partition") + ) + qs = qs.filter(filter_matches=True) + filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter")) + per_filter_max.append({dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes}) + + instance_id_lists = [maxes.keys() for maxes in per_filter_max] + all_instance_ids = reduce(set.union, instance_id_lists, set()) + if is_producer: + # when we're sending, we want to make sure we include everything + result = { + instance_id: max([d.get(instance_id, 0) for d in per_filter_max]) + for instance_id in all_instance_ids + } + else: + # when we're receiving, we don't want to overpromise on what we have + result = { + instance_id: min([d.get(instance_id, 0) for d in per_filter_max]) + for instance_id in reduce( + set.intersection, instance_id_lists, all_instance_ids + ) + } + + return result class RecordMaxCounter(AbstractCounter): @@ -590,7 +662,7 @@ class Meta: class RecordMaxCounterBuffer(AbstractCounter): """ ``RecordMaxCounterBuffer`` is where combinations of instance ID and counters (from ``RecordMaxCounter``) are stored temporarily, - until they are sent or recieved by another morango instance. + until they are sent or received by another morango instance. """ transfer_session = models.ForeignKey(TransferSession) @@ -649,10 +721,7 @@ def delete( _assert( self._get_pk_val() is not None, "%s object can't be deleted because its %s attribute is set to None." - % ( - self._meta.object_name, - self._meta.pk.attname, - ), + % (self._meta.object_name, self._meta.pk.attname), ) collector = Collector(using=using) collector.collect([self], keep_parents=keep_parents) diff --git a/morango/sync/operations.py b/morango/sync/operations.py index c6a832ae..c0d08ba6 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -445,8 +445,7 @@ def _queue_into_buffer(transfersession): if fsics_len >= fsics_limit: raise MorangoLimitExceeded( "Limit of {limit} instance counters exceeded with {actual}".format( - limit=fsics_limit, - actual=fsics_len, + limit=fsics_limit, actual=fsics_len ) ) @@ -472,8 +471,7 @@ def _queue_into_buffer(transfersession): # combine conditions and filter by profile where_condition = _join_with_logical_operator( - profile_condition + last_saved_by_conditions + partition_conditions, - "AND", + profile_condition + last_saved_by_conditions + partition_conditions, "AND" ) # execute raw sql to take all records that match condition, to be put into buffer for transfer @@ -635,9 +633,7 @@ def handle(self, context): # attributes that we'll use to identify existing sessions. we really only want there to # be one of these at a time data = dict( - push=context.is_push, - sync_session_id=context.sync_session.id, - active=True, + push=context.is_push, sync_session_id=context.sync_session.id, active=True ) # get the most recent transfer session @@ -694,13 +690,12 @@ def handle(self, context): self._assert(context.filter is not None) if context.is_producer and SETTINGS.MORANGO_SERIALIZE_BEFORE_QUEUING: - _serialize_into_store( - context.sync_session.profile, - filter=context.filter, - ) + _serialize_into_store(context.sync_session.profile, filter=context.filter) fsic = json.dumps( - DatabaseMaxCounter.calculate_filter_max_counters(context.filter) + DatabaseMaxCounter.calculate_filter_specific_instance_counters( + context.filter, is_producer=context.is_producer + ) ) if context.is_server: context.transfer_session.server_fsic = fsic @@ -868,14 +863,8 @@ def handle(self, context): records_transferred = context.transfer_session.records_transferred or 0 if SETTINGS.MORANGO_DESERIALIZE_AFTER_DEQUEUING and records_transferred > 0: # we first serialize to avoid deserialization merge conflicts - _serialize_into_store( - context.sync_session.profile, - filter=context.filter, - ) - _deserialize_from_store( - context.sync_session.profile, - filter=context.filter, - ) + _serialize_into_store(context.sync_session.profile, filter=context.filter) + _deserialize_from_store(context.sync_session.profile, filter=context.filter) # update database max counters but use latest fsics from client/server if context.is_receiver: diff --git a/tests/testapp/tests/models/test_core.py b/tests/testapp/tests/models/test_core.py index b2fc5e07..a8988750 100644 --- a/tests/testapp/tests/models/test_core.py +++ b/tests/testapp/tests/models/test_core.py @@ -58,36 +58,50 @@ def setUp(self): ) def test_filter_not_in_dmc(self): - fmcs = DatabaseMaxCounter.calculate_filter_max_counters(Filter("ZZZ")) + fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters( + Filter("ZZZ") + ) self.assertEqual(fmcs, {}) def test_instances_for_one_partition_but_not_other(self): - fmcs = DatabaseMaxCounter.calculate_filter_max_counters( + fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters( Filter(self.user_prefix_a + "\n" + self.user_prefix_b) ) self.assertEqual(fmcs[self.instance_b], 10) def test_insufficient_instances_for_all_partitions(self): user_with_prefix = self.prefix_b + "user_id:richard" - fmcs = DatabaseMaxCounter.calculate_filter_max_counters( + fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters( Filter(self.prefix_a + "\n" + user_with_prefix) ) self.assertFalse(fmcs) def test_single_partition_with_all_instances(self): - fmcs = DatabaseMaxCounter.calculate_filter_max_counters( + fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters( Filter(self.user_prefix_a) ) self.assertEqual(fmcs[self.instance_a], 20) self.assertEqual(fmcs[self.instance_b], 10) def test_all_partitions_have_all_instances(self): - fmcs = DatabaseMaxCounter.calculate_filter_max_counters( + fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters( Filter(self.user_prefix_a + "\n" + self.user2_prefix_b) ) self.assertEqual(fmcs[self.instance_a], 17) self.assertEqual(fmcs[self.instance_b], 10) + def test_producer_vs_receiver_fsics(self): + fsic_producer = DatabaseMaxCounter.calculate_filter_specific_instance_counters( + Filter(self.user_prefix_a + "\n" + self.prefix_b), is_producer=True + ) + self.assertEqual(fsic_producer.get(self.instance_a, 0), 20) + self.assertEqual(fsic_producer.get(self.instance_b, 0), 12) + fsic_receiver = DatabaseMaxCounter.calculate_filter_specific_instance_counters( + Filter(self.user_prefix_a + "\n" + self.prefix_b), is_producer=False + ) + self.assertEqual(fsic_receiver.get(self.instance_a, 0), 0) + self.assertEqual(fsic_receiver.get(self.instance_b, 0), 10) + class DatabaseMaxCounterUpdateCalculation(TestCase): def setUp(self): @@ -233,10 +247,22 @@ def setUp(self): stores.update(last_transfer_session_id=self.instance.id) def test_get_touched_record_ids_for_model__instance(self): - self.assertEqual([self.user.id], list(self.instance.get_touched_record_ids_for_model(self.user))) + self.assertEqual( + [self.user.id], + list(self.instance.get_touched_record_ids_for_model(self.user)), + ) def test_get_touched_record_ids_for_model__class(self): - self.assertEqual([self.user.id], list(self.instance.get_touched_record_ids_for_model(MyUser))) + self.assertEqual( + [self.user.id], list(self.instance.get_touched_record_ids_for_model(MyUser)) + ) def test_get_touched_record_ids_for_model__string(self): - self.assertEqual([self.user.id], list(self.instance.get_touched_record_ids_for_model(MyUser.morango_model_name))) + self.assertEqual( + [self.user.id], + list( + self.instance.get_touched_record_ids_for_model( + MyUser.morango_model_name + ) + ), + )