Skip to content

Commit

Permalink
Merge pull request #129 from bjester/mor-fixes-ango
Browse files Browse the repository at this point in the history
Fix handling of sleep for `proceed_to_and_wait_for`
  • Loading branch information
bjester authored Jul 27, 2021
2 parents 5ecca47 + 4a956d2 commit 33b9546
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

List of the most important changes for each release.

## 0.6.2

- Fixes slow performance due to excessive use of `sleep`

## 0.6.1

- Fix to set counters on `TransferSession` *after* serialization
Expand Down
2 changes: 1 addition & 1 deletion morango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from __future__ import unicode_literals

default_app_config = "morango.apps.MorangoConfig"
__version__ = "0.6.1"
__version__ = "0.6.2"
8 changes: 5 additions & 3 deletions morango/api/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,9 @@ def create(self, request): # noqa: C901
if self.async_allowed()
else transfer_stages.QUEUING
)
result = session_controller.proceed_to_and_wait_for(to_stage, context=context)
result = session_controller.proceed_to_and_wait_for(
to_stage, context=context, max_interval=2
)

if result == transfer_statuses.ERRORED:
if context.error:
Expand Down Expand Up @@ -422,7 +424,7 @@ def update(self, request, *args, **kwargs):
session_controller.proceed_to(update_stage, context=context)
else:
session_controller.proceed_to_and_wait_for(
update_stage, context=context
update_stage, context=context, max_interval=2
)

return super(TransferSessionViewSet, self).update(request, *args, **kwargs)
Expand All @@ -436,7 +438,7 @@ def perform_destroy(self, transfer_session):
session_controller.proceed_to(transfer_stages.CLEANUP, context=context)
else:
result = session_controller.proceed_to_and_wait_for(
transfer_stages.CLEANUP, context=context
transfer_stages.CLEANUP, context=context, max_interval=2
)
# raise an error for synchronous, if status is false
if result == transfer_statuses.ERRORED:
Expand Down
10 changes: 7 additions & 3 deletions morango/sync/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def proceed_to(self, target_stage, context=None):
# should always be a non-False status
return result

def proceed_to_and_wait_for(self, target_stage, context=None, interval=5):
def proceed_to_and_wait_for(self, target_stage, context=None, max_interval=5):
"""
Same as `proceed_to` but waits for a finished status to be returned by sleeping between
calls to `proceed_to` if status is not complete
Expand All @@ -194,15 +194,19 @@ def proceed_to_and_wait_for(self, target_stage, context=None, interval=5):
:type target_stage: str
:param context: Override controller context, or provide it if missing
:type context: morango.sync.context.SessionContext|None
:param interval: The time, in seconds, between repeat calls to `.proceed_to`
:param max_interval: The max time, in seconds, between repeat calls to `.proceed_to`
:return: transfer_status.* - The status of proceeding to that stage,
which should be `ERRORED` or `COMPLETE`
:rtype: str
"""
result = transfer_statuses.PENDING
tries = 0
while result not in transfer_statuses.FINISHED_STATES:
if tries > 0:
# exponential backoff up to max_interval
sleep(min(0.3 * (2 ** tries - 1), max_interval))
result = self.proceed_to(target_stage, context=context)
sleep(interval)
tries += 1
return result

def _invoke_middleware(self, context, middleware):
Expand Down
5 changes: 3 additions & 2 deletions morango/sync/syncsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ def proceed_to_and_wait_for(self, stage):
contexts = reversed(contexts)

for context in contexts:
result = self.controller.proceed_to_and_wait_for(stage, context=context)
max_interval = 1 if context is self.local_context else 5
result = self.controller.proceed_to_and_wait_for(stage, context=context, max_interval=max_interval)
if result == transfer_statuses.ERRORED:
raise_from(
MorangoError("Stage `{}` failed".format(stage)),
Expand All @@ -623,7 +624,7 @@ def initialize(self, sync_filter):

# initialize the transfer session locally
status = self.controller.proceed_to_and_wait_for(
transfer_stages.INITIALIZING, context=self.local_context
transfer_stages.INITIALIZING, context=self.local_context, max_interval=1
)
if status == transfer_statuses.ERRORED:
raise_from(
Expand Down
4 changes: 2 additions & 2 deletions tests/testapp/tests/sync/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def test_proceed_to_and_wait_for(self):
transfer_statuses.PENDING,
transfer_statuses.COMPLETED
]
result = self.controller.proceed_to_and_wait_for(transfer_stages.CLEANUP, interval=0.1)
result = self.controller.proceed_to_and_wait_for(transfer_stages.CLEANUP, max_interval=0.1)
self.assertEqual(result, transfer_statuses.COMPLETED)

def test_proceed_to_and_wait_for__errored(self):
Expand All @@ -722,7 +722,7 @@ def test_proceed_to_and_wait_for__errored(self):
transfer_statuses.PENDING,
transfer_statuses.ERRORED
]
result = self.controller.proceed_to_and_wait_for(transfer_stages.CLEANUP, interval=0.1)
result = self.controller.proceed_to_and_wait_for(transfer_stages.CLEANUP, max_interval=0.1)
self.assertEqual(result, transfer_statuses.ERRORED)

def test_invoke_middleware(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/testapp/tests/sync/test_syncsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def test_proceed_to_and_wait_for__error(self):
with self.assertRaises(MorangoError):
self.client.proceed_to_and_wait_for(transfer_stages.QUEUING)
self.controller.proceed_to_and_wait_for.assert_called_once_with(
transfer_stages.QUEUING, context=self.client.remote_context
transfer_stages.QUEUING, context=self.client.remote_context, max_interval=5
)

@mock.patch("morango.sync.syncsession.TransferClient.proceed_to_and_wait_for")
Expand All @@ -398,7 +398,7 @@ def test_initialize(self, mock_proceed):
self.assertEqual(sync_filter, self.client.local_context.filter)
self.assertEqual(sync_filter, self.client.remote_context.filter)
self.controller.proceed_to_and_wait_for.assert_any_call(
transfer_stages.INITIALIZING, context=self.client.local_context
transfer_stages.INITIALIZING, context=self.client.local_context, max_interval=1
)
self.assertEqual(self.transfer_session, self.client.remote_context.transfer_session)
session_started_handler.assert_called_once_with(transfer_session=self.transfer_session)
Expand Down

0 comments on commit 33b9546

Please sign in to comment.