Skip to content

Commit

Permalink
Merge pull request #125 from bjester/last-activity-timestamp
Browse files Browse the repository at this point in the history
Trigger update to `last_activity_timestamp`
  • Loading branch information
bjester authored Jul 20, 2021
2 parents 2e18c26 + 06e9a47 commit 3ba119b
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 48 deletions.
21 changes: 17 additions & 4 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,22 @@ def update_state(self, stage=None, stage_status=None):
:type stage_status: morango.constants.transfer_statuses.*|None
"""
if stage is not None:
if self.transfer_stage and transfer_stages.stage(self.transfer_stage) > transfer_stages.stage(stage):
raise ValueError("Update stage is behind current stage | current={}, new={}".format(self.transfer_stage, stage))
if self.transfer_stage and transfer_stages.stage(
self.transfer_stage
) > transfer_stages.stage(stage):
raise ValueError(
"Update stage is behind current stage | current={}, new={}".format(
self.transfer_stage, stage
)
)
self.transfer_stage = stage
if stage_status is not None:
self.transfer_stage_status = stage_status
if stage is not None or stage_status is not None:
self.last_activity_timestamp = timezone.now()
self.save()
self.sync_session.last_activity_timestamp = timezone.now()
self.sync_session.save()

def delete_buffers(self):
"""
Expand All @@ -311,7 +320,9 @@ def get_touched_record_ids_for_model(self, model):
if isinstance(model, SyncableModel):
model = model.morango_model_name
assert isinstance(model, six.string_types)
return Store.objects.filter(model_name=model, last_transfer_session_id=self.id).values_list("id", flat=True)
return Store.objects.filter(
model_name=model, last_transfer_session_id=self.id
).values_list("id", flat=True)


class DeletedModels(models.Model):
Expand Down Expand Up @@ -395,7 +406,9 @@ class Store(AbstractStore):
dirty_bit = models.BooleanField(default=False)
deserialization_error = models.TextField(blank=True)

last_transfer_session_id = UUIDField(blank=True, null=True, default=None, db_index=True)
last_transfer_session_id = UUIDField(
blank=True, null=True, default=None, db_index=True
)

objects = StoreManager()

Expand Down
92 changes: 73 additions & 19 deletions tests/testapp/tests/integration/test_syncsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,31 @@ def second_environment():
assert instance1.id != instance2.id


@pytest.mark.skipif(getattr(settings, "MORANGO_TEST_POSTGRESQL", False), reason="Not supported")
@pytest.mark.skipif(
getattr(settings, "MORANGO_TEST_POSTGRESQL", False), reason="Not supported"
)
class PushPullClientTestCase(LiveServerTestCase):
multi_db = True
profile = "facilitydata"

def setUp(self):
super(PushPullClientTestCase, self).setUp()
self.profile_controller = MorangoProfileController(self.profile)
self.conn = self.profile_controller.create_network_connection(self.live_server_url)
self.conn = self.profile_controller.create_network_connection(
self.live_server_url
)
self.conn.chunk_size = 3

self.remote_user = self._setUpServer()
self.filter = Filter("{}:user".format(self.remote_user.id))
self.client = self._setUpClient(self.remote_user.id)
self.session = self.client.sync_session
self.last_session_activity = self.session.last_activity_timestamp
self.last_transfer_activity = None

self.local_user = MyUser.objects.create(id=self.remote_user.id, username="bob", is_superuser=True)
self.local_user = MyUser.objects.create(
id=self.remote_user.id, username="bob", is_superuser=True
)

def _setUpCertScopes(self):
root_scope = ScopeDefinition.objects.create(
Expand Down Expand Up @@ -85,7 +93,9 @@ def _setUpServer(self):
root_scope, subset_scope = self._setUpCertScopes()
root_cert = Certificate.generate_root_certificate(root_scope.id)

remote_user = MyUser.objects.create(id=root_cert.id, username="bob", is_superuser=True)
remote_user = MyUser.objects.create(
id=root_cert.id, username="bob", is_superuser=True
)
remote_user.set_password("password")
remote_user.save()

Expand All @@ -94,9 +104,7 @@ def _setUpServer(self):
profile=self.profile,
scope_definition=subset_scope,
scope_version=subset_scope.version,
scope_params=json.dumps(
{"user": remote_user.id, "sub": "user"}
),
scope_params=json.dumps({"user": remote_user.id, "sub": "user"}),
private_key=Key(),
)
root_cert.sign_certificate(subset_cert)
Expand All @@ -106,19 +114,41 @@ def _setUpServer(self):
def _setUpClient(self, primary_partition):
root_scope, subset_scope = self._setUpCertScopes()

server_certs = self.conn.get_remote_certificates(primary_partition, root_scope.id)
server_certs = self.conn.get_remote_certificates(
primary_partition, root_scope.id
)
server_cert = server_certs[0]
client_cert = self.conn.certificate_signing_request(
server_cert, subset_scope.id, {"user": primary_partition, "sub": "user"},
userargs="bob", password="password"
server_cert,
subset_scope.id,
{"user": primary_partition, "sub": "user"},
userargs="bob",
password="password",
)
return self.conn.create_sync_session(client_cert, server_cert)

@classmethod
def _create_server_thread(cls, connections_override):
# override default to point to second environment database
connections_override["default"] = connections["default2"]
return super(PushPullClientTestCase, cls)._create_server_thread(connections_override)
return super(PushPullClientTestCase, cls)._create_server_thread(
connections_override
)

def assertLastActivityUpdate(self, transfer_session=None):
"""A signal callable that asserts `last_activity_timestamp`s are updated"""
if self.last_transfer_activity is not None:
self.assertLess(
self.last_transfer_activity, transfer_session.last_activity_timestamp
)
self.assertLess(
self.last_session_activity,
transfer_session.sync_session.last_activity_timestamp,
)
self.last_transfer_activity = transfer_session.last_activity_timestamp
self.last_session_activity = (
transfer_session.sync_session.last_activity_timestamp
)

def test_push(self):
for _ in range(5):
Expand All @@ -127,26 +157,42 @@ def test_push(self):
self.profile_controller.serialize_into_store(self.filter)

with second_environment():
self.assertEqual(0, SummaryLog.objects.filter(user=self.remote_user).count())
self.assertEqual(0, InteractionLog.objects.filter(user=self.remote_user).count())
self.assertEqual(
0, SummaryLog.objects.filter(user=self.remote_user).count()
)
self.assertEqual(
0, InteractionLog.objects.filter(user=self.remote_user).count()
)

client = self.client.get_push_client()
client.signals.queuing.completed.connect(self.assertLastActivityUpdate)
client.signals.transferring.in_progress.connect(self.assertLastActivityUpdate)
client.signals.dequeuing.completed.connect(self.assertLastActivityUpdate)

self.assertEqual(0, TransferSession.objects.filter(active=True).count())
client.initialize(self.filter)
self.assertEqual(1, TransferSession.objects.filter(active=True).count())
transfer_session = client.local_context.transfer_session
self.assertNotEqual(0, transfer_session.records_total)
self.assertEqual(0, transfer_session.records_transferred)
self.assertLessEqual(1, Buffer.objects.filter(transfer_session=transfer_session).count())
self.assertLessEqual(
1, Buffer.objects.filter(transfer_session=transfer_session).count()
)
client.run()
self.assertNotEqual(0, transfer_session.records_transferred)
client.finalize()
self.assertEqual(0, Buffer.objects.filter(transfer_session=transfer_session).count())
self.assertEqual(
0, Buffer.objects.filter(transfer_session=transfer_session).count()
)
self.assertEqual(0, TransferSession.objects.filter(active=True).count())

with second_environment():
self.assertEqual(5, SummaryLog.objects.filter(user=self.remote_user).count())
self.assertEqual(5, InteractionLog.objects.filter(user=self.remote_user).count())
self.assertEqual(
5, SummaryLog.objects.filter(user=self.remote_user).count()
)
self.assertEqual(
5, InteractionLog.objects.filter(user=self.remote_user).count()
)

def test_pull(self):
with second_environment():
Expand All @@ -159,6 +205,10 @@ def test_pull(self):
self.assertEqual(0, InteractionLog.objects.filter(user=self.local_user).count())

client = self.client.get_pull_client()
client.signals.queuing.completed.connect(self.assertLastActivityUpdate)
client.signals.transferring.in_progress.connect(self.assertLastActivityUpdate)
client.signals.dequeuing.completed.connect(self.assertLastActivityUpdate)

self.assertEqual(0, TransferSession.objects.filter(active=True).count())
client.initialize(self.filter)
self.assertEqual(1, TransferSession.objects.filter(active=True).count())
Expand All @@ -167,9 +217,13 @@ def test_pull(self):
self.assertEqual(0, transfer_session.records_transferred)
client.run()
self.assertNotEqual(0, transfer_session.records_transferred)
self.assertLessEqual(1, Buffer.objects.filter(transfer_session=transfer_session).count())
self.assertLessEqual(
1, Buffer.objects.filter(transfer_session=transfer_session).count()
)
client.finalize()
self.assertEqual(0, Buffer.objects.filter(transfer_session=transfer_session).count())
self.assertEqual(
0, Buffer.objects.filter(transfer_session=transfer_session).count()
)
self.assertEqual(0, TransferSession.objects.filter(active=True).count())

self.assertEqual(5, SummaryLog.objects.filter(user=self.local_user).count())
Expand Down
Loading

0 comments on commit 3ba119b

Please sign in to comment.