Skip to content

Commit

Permalink
Merge pull request #134 from jamalex/frikn_fsic_fix
Browse files Browse the repository at this point in the history
Add asymmetry to FSIC calculation to ensure all matching data is synced
  • Loading branch information
jamalex authored Sep 20, 2021
2 parents 0541457 + d8a1461 commit 5f609d9
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 60 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

List of the most important changes for each release.

## 0.6.6
- Adds an asymmetry to FSIC calculation to ensure all matching data is synced.

## 0.6.5
- Sets queuing limit of 100k combined FSICs between client and server
- Fixes SQL expression tree error when there are many FSICs, up to 100k limit
Expand Down
2 changes: 1 addition & 1 deletion morango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from __future__ import unicode_literals

default_app_config = "morango.apps.MorangoConfig"
__version__ = "0.6.5"
__version__ = "0.6.6a0"
131 changes: 100 additions & 31 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
from django.db import transaction
from django.db.models import F
from django.db.models import Func
from django.db.models import Max
from django.db.models import TextField
from django.db.models import Value
from django.db.models.deletion import Collector
from django.db.models.expressions import CombinedExpression
from django.db.models.fields.related import ForeignKey
from django.db.models.functions import Cast
from django.utils import six
from django.utils import timezone

from functools import reduce

from morango import proquint
from morango.registry import syncable_models
from morango.models.certificates import Certificate
Expand Down Expand Up @@ -309,8 +313,7 @@ def delete_buffers(self):
"""
with connection.cursor() as cursor:
cursor.execute(
"DELETE FROM morango_buffer WHERE transfer_session_id = %s",
(self.id,),
"DELETE FROM morango_buffer WHERE transfer_session_id = %s", (self.id,)
)
cursor.execute(
"DELETE FROM morango_recordmaxcounterbuffer WHERE transfer_session_id = %s",
Expand Down Expand Up @@ -512,6 +515,30 @@ class Meta:
abstract = True


class ValueStartsWithField(CombinedExpression):
"""
Django expression that's essentially a `startswith` comparison but comparing that a parameter value starts with a
table field. This also prevents Django from adding unnecessary SQL for the expression
"""

def __init__(self, value, field):
"""
{value} LIKE {field} || '%'
:param value: A str of the value LIKE field
:param field: A str of the field name comparing with
"""
# we don't use `Concat` for appending the `%` because it also adds unnecessary SQL
super(ValueStartsWithField, self).__init__(
Value(value, output_field=models.CharField()),
"LIKE",
CombinedExpression(
F(field), "||", Value("%", output_field=models.CharField())
),
output_field=models.BooleanField(),
)


class DatabaseMaxCounter(AbstractCounter):
"""
``DatabaseMaxCounter`` is used to keep track of what data this database already has across all
Expand All @@ -527,7 +554,9 @@ class Meta:
@classmethod
@transaction.atomic
def update_fsics(cls, fsics, sync_filter):
internal_fsic = DatabaseMaxCounter.calculate_filter_max_counters(sync_filter)
internal_fsic = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
sync_filter
)
updated_fsic = {}
for key, value in six.iteritems(fsics):
if key in internal_fsic:
Expand All @@ -546,32 +575,75 @@ def update_fsics(cls, fsics, sync_filter):
)

@classmethod
def calculate_filter_max_counters(cls, filters):
def calculate_filter_specific_instance_counters(cls, filters, is_producer=False):
"""
Returns a dict that maps instance_ids to their respective "high-water level" counters with
respect to the provided list of filter partitions, based on what the local database contains.
First, for each partition in the filter, it calculates the maximum values the database has
received through any filters containing that partition.
Then, it combines these dicts into a single dict, collapsing across the filter partitions.
In Morango 0.6.5 and below, this was always calculated based on the "minimum" values for
each instance_id, and with instance_ids that didn't exist in *each* of the partitions being
excluded entirely. When the producing side had records needing to be sent for an instance
under one of the filter partitions, but not under another, it would not be included in the
FSIC and thereby lead to the data not being sent, as showed up in:
https://github.com/learningequality/kolibri/issues/8439
The solution was to add an asymmetry in how FSICs are calculated, with the sending side
using a "max" instead of a "min" to ensure everything is included, and then the receiving
side still using a "min" (though after it has completed a sync, it updates its counters
such that the min and max should then be equivalent).
One potential issue remains, but it is an edge case that can be worked around:
- We now take the maxes across the filter partitions and use those as the producer FSICs.
- When the receiver finishes integrating the received data, it updates its counters to match.
- If the sender had actually done a sync with just a subset of those filters in the past, it
might not actually have received everything available for the other filters, and hence the
receiver may not be correct in assuming it now has everything up to the levels of the
producer's FSICs (as it does by taking the "max" across the filter partition FSICs).
There are two ways to avoid this:
- Don't sync with differing subsets of the same partitions across multiple syncs. For
example, if you do syncs with filters "AB" and "AC", don't also do syncs with filters
"AC" and "AD". This is the approach that makes this work in Kolibri, for now.
- OR: Don't do syncs with more than one filter partition at a time. Do each one in sequence.
For example, rather than pushing "AB" and "AC" in a single transfer session, do one pull
for AB and then another one for AC. This has the disadvantage of a bit of extra overhead,
but would likely be the most robust option, and the easiest to enforce and reason about.
"""

# create string of prefixes to place into sql statement
condition = " UNION ".join(
["SELECT CAST('{}' as TEXT) AS a".format(prefix) for prefix in filters]
)
queryset = cls.objects.all()

filter_max_calculation = """
SELECT PMC.instance, MIN(PMC.counter)
FROM
(
SELECT dmc.instance_id as instance, MAX(dmc.counter) as counter, filter as filter_partition
FROM {dmc_table} as dmc, (SELECT T.a as filter FROM ({filter_list}) as T) as foo
WHERE filter LIKE dmc.partition || '%'
GROUP BY instance, filter_partition
) as PMC
GROUP BY PMC.instance
HAVING {count} = COUNT(PMC.filter_partition)
""".format(
dmc_table=cls._meta.db_table, filter_list=condition, count=len(filters)
)
per_filter_max = []

with connection.cursor() as cursor:
cursor.execute(filter_max_calculation)
# try to get hex value because postgres returns values as uuid
return {getattr(tup[0], "hex", tup[0]): tup[1] for tup in cursor.fetchall()}
for filt in filters:
# {filt} LIKE partition || '%'
qs = queryset.annotate(
filter_matches=ValueStartsWithField(filt, "partition")
)
qs = qs.filter(filter_matches=True)
filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter"))
per_filter_max.append({dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes})

instance_id_lists = [maxes.keys() for maxes in per_filter_max]
all_instance_ids = reduce(set.union, instance_id_lists, set())
if is_producer:
# when we're sending, we want to make sure we include everything
result = {
instance_id: max([d.get(instance_id, 0) for d in per_filter_max])
for instance_id in all_instance_ids
}
else:
# when we're receiving, we don't want to overpromise on what we have
result = {
instance_id: min([d.get(instance_id, 0) for d in per_filter_max])
for instance_id in reduce(
set.intersection, instance_id_lists, all_instance_ids
)
}

return result


class RecordMaxCounter(AbstractCounter):
Expand All @@ -590,7 +662,7 @@ class Meta:
class RecordMaxCounterBuffer(AbstractCounter):
"""
``RecordMaxCounterBuffer`` is where combinations of instance ID and counters (from ``RecordMaxCounter``) are stored temporarily,
until they are sent or recieved by another morango instance.
until they are sent or received by another morango instance.
"""

transfer_session = models.ForeignKey(TransferSession)
Expand Down Expand Up @@ -649,10 +721,7 @@ def delete(
_assert(
self._get_pk_val() is not None,
"%s object can't be deleted because its %s attribute is set to None."
% (
self._meta.object_name,
self._meta.pk.attname,
),
% (self._meta.object_name, self._meta.pk.attname),
)
collector = Collector(using=using)
collector.collect([self], keep_parents=keep_parents)
Expand Down
29 changes: 9 additions & 20 deletions morango/sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,7 @@ def _queue_into_buffer(transfersession):
if fsics_len >= fsics_limit:
raise MorangoLimitExceeded(
"Limit of {limit} instance counters exceeded with {actual}".format(
limit=fsics_limit,
actual=fsics_len,
limit=fsics_limit, actual=fsics_len
)
)

Expand All @@ -472,8 +471,7 @@ def _queue_into_buffer(transfersession):

# combine conditions and filter by profile
where_condition = _join_with_logical_operator(
profile_condition + last_saved_by_conditions + partition_conditions,
"AND",
profile_condition + last_saved_by_conditions + partition_conditions, "AND"
)

# execute raw sql to take all records that match condition, to be put into buffer for transfer
Expand Down Expand Up @@ -635,9 +633,7 @@ def handle(self, context):
# attributes that we'll use to identify existing sessions. we really only want there to
# be one of these at a time
data = dict(
push=context.is_push,
sync_session_id=context.sync_session.id,
active=True,
push=context.is_push, sync_session_id=context.sync_session.id, active=True
)

# get the most recent transfer session
Expand Down Expand Up @@ -694,13 +690,12 @@ def handle(self, context):
self._assert(context.filter is not None)

if context.is_producer and SETTINGS.MORANGO_SERIALIZE_BEFORE_QUEUING:
_serialize_into_store(
context.sync_session.profile,
filter=context.filter,
)
_serialize_into_store(context.sync_session.profile, filter=context.filter)

fsic = json.dumps(
DatabaseMaxCounter.calculate_filter_max_counters(context.filter)
DatabaseMaxCounter.calculate_filter_specific_instance_counters(
context.filter, is_producer=context.is_producer
)
)
if context.is_server:
context.transfer_session.server_fsic = fsic
Expand Down Expand Up @@ -868,14 +863,8 @@ def handle(self, context):
records_transferred = context.transfer_session.records_transferred or 0
if SETTINGS.MORANGO_DESERIALIZE_AFTER_DEQUEUING and records_transferred > 0:
# we first serialize to avoid deserialization merge conflicts
_serialize_into_store(
context.sync_session.profile,
filter=context.filter,
)
_deserialize_from_store(
context.sync_session.profile,
filter=context.filter,
)
_serialize_into_store(context.sync_session.profile, filter=context.filter)
_deserialize_from_store(context.sync_session.profile, filter=context.filter)

# update database max counters but use latest fsics from client/server
if context.is_receiver:
Expand Down
42 changes: 34 additions & 8 deletions tests/testapp/tests/models/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,50 @@ def setUp(self):
)

def test_filter_not_in_dmc(self):
fmcs = DatabaseMaxCounter.calculate_filter_max_counters(Filter("ZZZ"))
fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter("ZZZ")
)
self.assertEqual(fmcs, {})

def test_instances_for_one_partition_but_not_other(self):
fmcs = DatabaseMaxCounter.calculate_filter_max_counters(
fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter(self.user_prefix_a + "\n" + self.user_prefix_b)
)
self.assertEqual(fmcs[self.instance_b], 10)

def test_insufficient_instances_for_all_partitions(self):
user_with_prefix = self.prefix_b + "user_id:richard"
fmcs = DatabaseMaxCounter.calculate_filter_max_counters(
fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter(self.prefix_a + "\n" + user_with_prefix)
)
self.assertFalse(fmcs)

def test_single_partition_with_all_instances(self):
fmcs = DatabaseMaxCounter.calculate_filter_max_counters(
fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter(self.user_prefix_a)
)
self.assertEqual(fmcs[self.instance_a], 20)
self.assertEqual(fmcs[self.instance_b], 10)

def test_all_partitions_have_all_instances(self):
fmcs = DatabaseMaxCounter.calculate_filter_max_counters(
fmcs = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter(self.user_prefix_a + "\n" + self.user2_prefix_b)
)
self.assertEqual(fmcs[self.instance_a], 17)
self.assertEqual(fmcs[self.instance_b], 10)

def test_producer_vs_receiver_fsics(self):
fsic_producer = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter(self.user_prefix_a + "\n" + self.prefix_b), is_producer=True
)
self.assertEqual(fsic_producer.get(self.instance_a, 0), 20)
self.assertEqual(fsic_producer.get(self.instance_b, 0), 12)
fsic_receiver = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
Filter(self.user_prefix_a + "\n" + self.prefix_b), is_producer=False
)
self.assertEqual(fsic_receiver.get(self.instance_a, 0), 0)
self.assertEqual(fsic_receiver.get(self.instance_b, 0), 10)


class DatabaseMaxCounterUpdateCalculation(TestCase):
def setUp(self):
Expand Down Expand Up @@ -233,10 +247,22 @@ def setUp(self):
stores.update(last_transfer_session_id=self.instance.id)

def test_get_touched_record_ids_for_model__instance(self):
self.assertEqual([self.user.id], list(self.instance.get_touched_record_ids_for_model(self.user)))
self.assertEqual(
[self.user.id],
list(self.instance.get_touched_record_ids_for_model(self.user)),
)

def test_get_touched_record_ids_for_model__class(self):
self.assertEqual([self.user.id], list(self.instance.get_touched_record_ids_for_model(MyUser)))
self.assertEqual(
[self.user.id], list(self.instance.get_touched_record_ids_for_model(MyUser))
)

def test_get_touched_record_ids_for_model__string(self):
self.assertEqual([self.user.id], list(self.instance.get_touched_record_ids_for_model(MyUser.morango_model_name)))
self.assertEqual(
[self.user.id],
list(
self.instance.get_touched_record_ids_for_model(
MyUser.morango_model_name
)
),
)

0 comments on commit 5f609d9

Please sign in to comment.