Skip to content

Commit

Permalink
Merge pull request #130 from bjester/no-mor-fixes-ango-pls
Browse files Browse the repository at this point in the history
Fix counter handling causing repeated syncs of same data
  • Loading branch information
rtibbles authored Jul 28, 2021
2 parents 33b9546 + b28e72e commit fe1c4a9
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 51 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

List of the most important changes for each release.

## 0.6.3

- Fixes issue handling database counters which caused repeat syncing of unchanged data

## 0.6.2

- Fixes slow performance due to excessive use of `sleep`
Expand Down
2 changes: 1 addition & 1 deletion morango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from __future__ import unicode_literals

default_app_config = "morango.apps.MorangoConfig"
__version__ = "0.6.2"
__version__ = "0.6.3"
3 changes: 1 addition & 2 deletions morango/constants/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
"morango.sync.operations:NetworkInitializeOperation",
)
MORANGO_SERIALIZE_OPERATIONS = (
"morango.sync.operations:ProducerSerializeOperation",
"morango.sync.operations:ReceiverSerializeOperation",
"morango.sync.operations:SerializeOperation",
"morango.sync.operations:LegacyNetworkSerializeOperation",
"morango.sync.operations:NetworkSerializeOperation",
)
Expand Down
61 changes: 23 additions & 38 deletions morango/sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def __call__(self, context):
debug_msg = "[morango:{}] {} -> {}".format(
"pull" if context.is_pull else "push",
context.__class__.__name__,
self.__class__.__name__
self.__class__.__name__,
)
result = False
try:
Expand Down Expand Up @@ -628,21 +628,20 @@ def handle(self, context):
return transfer_statuses.COMPLETED


class ProducerSerializeOperation(LocalOperation):
class SerializeOperation(LocalOperation):
"""
Performs serialization if enabled through configuration, for contexts that are producing
transfer data
Performs serialization related steps which affect the counters involved in the sync, including
serialization of data if enabled through configuration
"""

def handle(self, context):
"""
:type context: LocalSessionContext
"""
assert context.is_producer
assert context.sync_session is not None
assert context.filter is not None

if SETTINGS.MORANGO_SERIALIZE_BEFORE_QUEUING:
if context.is_producer and SETTINGS.MORANGO_SERIALIZE_BEFORE_QUEUING:
_serialize_into_store(
context.sync_session.profile,
filter=context.filter,
Expand All @@ -653,26 +652,15 @@ def handle(self, context):
)
if context.is_server:
context.transfer_session.server_fsic = fsic
context.transfer_session.client_fsic = context.request.data.get(
"client_fsic", "{}"
)
else:
context.transfer_session.client_fsic = fsic
context.transfer_session.save()
return transfer_statuses.COMPLETED


class ReceiverSerializeOperation(LocalOperation):
"""
Receivers of transfer data do not need serialize any data
"""

def handle(self, context):
"""
:type context: LocalSessionContext
"""
assert context.is_receiver
# TODO: move updating fsic from request to here instead of viewset serializer
return transfer_statuses.COMPLETED


class ProducerQueueOperation(LocalOperation):
"""
Performs queuing of data for as local instance
Expand Down Expand Up @@ -837,15 +825,14 @@ def handle(self, context):
filter=context.filter,
)

# update database max counters but use latest fsics on client
fsic = (
context.transfer_session.client_fsic
if context.is_server
else context.transfer_session.server_fsic
)

# update database max counters but use latest fsics on client
DatabaseMaxCounter.update_fsics(json.loads(fsic), context.filter)
# update database max counters but use latest fsics from client/server
if context.is_receiver:
fsic = (
context.transfer_session.client_fsic
if context.is_server
else context.transfer_session.server_fsic
)
DatabaseMaxCounter.update_fsics(json.loads(fsic), context.filter)

return transfer_statuses.COMPLETED

Expand Down Expand Up @@ -1039,9 +1026,9 @@ def handle(self, context):
assert ASYNC_OPERATIONS in context.capabilities

# if local stage is transferring or beyond, we definitely don't need to initialize
if context.stage is not None and transfer_stages.stage(context.stage) < transfer_stages.stage(
transfer_stages.TRANSFERRING
):
if context.stage is not None and transfer_stages.stage(
context.stage
) < transfer_stages.stage(transfer_stages.TRANSFERRING):
self.create_transfer_session(context)

return transfer_statuses.COMPLETED
Expand Down Expand Up @@ -1081,15 +1068,13 @@ def handle(self, context):
assert context.transfer_session is not None
assert ASYNC_OPERATIONS in context.capabilities

update_kwargs = {}
if context.is_push:
update_kwargs.update(client_fsic=context.transfer_session.client_fsic)

remote_status, data = self.remote_proceed_to(
context, transfer_stages.SERIALIZING, **update_kwargs
context,
transfer_stages.SERIALIZING,
client_fsic=context.transfer_session.client_fsic,
)

if context.is_pull and remote_status == transfer_statuses.COMPLETED:
if remote_status == transfer_statuses.COMPLETED:
context.transfer_session.server_fsic = data.get("server_fsic")
context.transfer_session.save()

Expand Down
10 changes: 4 additions & 6 deletions morango/sync/syncsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,14 +600,12 @@ def __init__(self, sync_connection, sync_session, controller):
)

def proceed_to_and_wait_for(self, stage):
contexts = (self.remote_context, self.local_context)
if self.local_context.is_push:
# reverse contexts if a push to operate on local first
contexts = reversed(contexts)

contexts = (self.local_context, self.remote_context)
for context in contexts:
max_interval = 1 if context is self.local_context else 5
result = self.controller.proceed_to_and_wait_for(stage, context=context, max_interval=max_interval)
result = self.controller.proceed_to_and_wait_for(
stage, context=context, max_interval=max_interval
)
if result == transfer_statuses.ERRORED:
raise_from(
MorangoError("Stage `{}` failed".format(stage)),
Expand Down
39 changes: 39 additions & 0 deletions tests/testapp/tests/integration/test_syncsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,45 @@ def test_pull(self):
self.assertEqual(5, SummaryLog.objects.filter(user=self.local_user).count())
self.assertEqual(5, InteractionLog.objects.filter(user=self.local_user).count())

def test_full_flow_and_repeat(self):
with second_environment():
for _ in range(5):
SummaryLog.objects.create(user=self.remote_user)
InteractionLog.objects.create(user=self.remote_user)

self.assertEqual(0, SummaryLog.objects.filter(user=self.local_user).count())
self.assertEqual(0, InteractionLog.objects.filter(user=self.local_user).count())

# first pull
pull_client = self.client.get_pull_client()
pull_client.initialize(self.filter)
transfer_session = pull_client.local_context.transfer_session
self.assertNotEqual(0, transfer_session.records_total)
self.assertEqual(0, transfer_session.records_transferred)
pull_client.run()
self.assertNotEqual(0, transfer_session.records_transferred)
pull_client.finalize()

# sanity check pull worked
self.assertEqual(5, SummaryLog.objects.filter(user=self.local_user).count())
self.assertEqual(5, InteractionLog.objects.filter(user=self.local_user).count())

# now do a push after pull, but nothing to actually transfer
push_client = self.client.get_push_client()
push_client.initialize(self.filter)
transfer_session = push_client.local_context.transfer_session
self.assertEqual(0, transfer_session.records_total)
self.assertEqual(0, transfer_session.records_transferred)
push_client.run()
self.assertEqual(0, transfer_session.records_transferred)
push_client.finalize()

# second pass for pull, only do initialize to make sure nothing gets queued for sync
second_pull_client = self.client.get_pull_client()
second_pull_client.initialize(self.filter)
transfer_session = second_pull_client.local_context.transfer_session
self.assertEqual(0, transfer_session.records_total)

def test_resume(self):
# create data
for _ in range(5):
Expand Down
6 changes: 3 additions & 3 deletions tests/testapp/tests/sync/test_syncsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,9 @@ def test_proceed_to_and_wait_for__pull(self):
mock_proceed_calls = mock_proceed.call_args_list
self.assertEqual(2, len(mock_proceed_calls))
self.assertEqual(transfer_stages.QUEUING, mock_proceed_calls[0][0][0])
self.assertEqual(self.client.remote_context, mock_proceed_calls[0][1].get("context"))
self.assertEqual(self.client.local_context, mock_proceed_calls[0][1].get("context"))
self.assertEqual(transfer_stages.QUEUING, mock_proceed_calls[1][0][0])
self.assertEqual(self.client.local_context, mock_proceed_calls[1][1].get("context"))
self.assertEqual(self.client.remote_context, mock_proceed_calls[1][1].get("context"))

def test_proceed_to_and_wait_for__error(self):
self.client.local_context.is_push = False
Expand All @@ -382,7 +382,7 @@ def test_proceed_to_and_wait_for__error(self):
with self.assertRaises(MorangoError):
self.client.proceed_to_and_wait_for(transfer_stages.QUEUING)
self.controller.proceed_to_and_wait_for.assert_called_once_with(
transfer_stages.QUEUING, context=self.client.remote_context, max_interval=5
transfer_stages.QUEUING, context=self.client.local_context, max_interval=1
)

@mock.patch("morango.sync.syncsession.TransferClient.proceed_to_and_wait_for")
Expand Down
2 changes: 1 addition & 1 deletion tests/testapp/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_defaults(self):
self.assertEqual(SETTINGS.MORANGO_DESERIALIZE_AFTER_DEQUEUING, True)
self.assertEqual(SETTINGS.MORANGO_DISALLOW_ASYNC_OPERATIONS, False)
self.assertLength(3, SETTINGS.MORANGO_INITIALIZE_OPERATIONS)
self.assertLength(4, SETTINGS.MORANGO_SERIALIZE_OPERATIONS)
self.assertLength(3, SETTINGS.MORANGO_SERIALIZE_OPERATIONS)
self.assertLength(4, SETTINGS.MORANGO_QUEUE_OPERATIONS)
self.assertLength(4, SETTINGS.MORANGO_DEQUEUE_OPERATIONS)
self.assertLength(4, SETTINGS.MORANGO_DESERIALIZE_OPERATIONS)
Expand Down

0 comments on commit fe1c4a9

Please sign in to comment.