Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing broken unit tests in cvat/app/events #8867

Merged
merged 20 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ __pycache__
.coverage
.husky/
.python-version
tmp*cvat/
temp*/

# Ignore generated test files
docker-compose.tests.yml

# Ignore npm logs file
npm-debug.log*
Expand Down
Empty file.
Empty file added cvat/apps/events/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions cvat/apps/events/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (C) 2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import datetime
archibald1418 marked this conversation as resolved.
Show resolved Hide resolved

MAX_EVENT_DURATION = datetime.timedelta(seconds=100)
WORKING_TIME_RESOLUTION = datetime.timedelta(milliseconds=1)
WORKING_TIME_SCOPE = 'send:working_time'
COLLAPSED_EVENT_SCOPES = frozenset(("change:frame",))
Copy link
Contributor

@zhiltsov-max zhiltsov-max Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, it makes sense to rename either this constant or the compressed_* tests to use the same term. I'd vote for the constant renaming, because the word "collapse" primarily means something different.

49 changes: 4 additions & 45 deletions cvat/apps/events/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#
# SPDX-License-Identifier: MIT

import datetime
import traceback
from typing import Any, Optional, Union

Expand Down Expand Up @@ -31,6 +30,8 @@

from .cache import get_cache
from .event import event_scope, record_server_event
from .const import WORKING_TIME_RESOLUTION, WORKING_TIME_SCOPE
from .utils import compute_working_time_per_ids


def project_id(instance):
Expand Down Expand Up @@ -619,53 +620,11 @@ def handle_viewset_exception(exc, context):

return response


def handle_client_events_push(request, data: dict):
TIME_THRESHOLD = datetime.timedelta(seconds=100)
WORKING_TIME_SCOPE = 'send:working_time'
WORKING_TIME_RESOLUTION = datetime.timedelta(milliseconds=1)
COLLAPSED_EVENT_SCOPES = frozenset(("change:frame",))
org = request.iam_context["organization"]

def read_ids(event: dict) -> tuple[int | None, int | None, int | None]:
return event.get("job_id"), event.get("task_id"), event.get("project_id")

def get_end_timestamp(event: dict) -> datetime.datetime:
if event["scope"] in COLLAPSED_EVENT_SCOPES:
return event["timestamp"] + datetime.timedelta(milliseconds=event["duration"])
return event["timestamp"]

if previous_event := data["previous_event"]:
previous_end_timestamp = get_end_timestamp(previous_event)
previous_ids = read_ids(previous_event)
elif data["events"]:
previous_end_timestamp = data["events"][0]["timestamp"]
previous_ids = read_ids(data["events"][0])

working_time_per_ids = {}
for event in data["events"]:
working_time = datetime.timedelta()
timestamp = event["timestamp"]

if timestamp > previous_end_timestamp:
t_diff = timestamp - previous_end_timestamp
if t_diff < TIME_THRESHOLD:
working_time += t_diff

previous_end_timestamp = timestamp

end_timestamp = get_end_timestamp(event)
if end_timestamp > previous_end_timestamp:
working_time += end_timestamp - previous_end_timestamp
previous_end_timestamp = end_timestamp

if previous_ids not in working_time_per_ids:
working_time_per_ids[previous_ids] = {
"value": datetime.timedelta(),
"timestamp": timestamp,
}

working_time_per_ids[previous_ids]["value"] += working_time
previous_ids = read_ids(event)
working_time_per_ids = compute_working_time_per_ids(data)

if data["events"]:
common = {
Expand Down
126 changes: 77 additions & 49 deletions cvat/apps/events/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#
# SPDX-License-Identifier: MIT

import json
import unittest
from datetime import datetime, timedelta, timezone
from typing import Optional
Expand All @@ -12,13 +11,15 @@

from cvat.apps.events.serializers import ClientEventsSerializer
from cvat.apps.organizations.models import Organization
from cvat.apps.events.const import MAX_EVENT_DURATION, WORKING_TIME_RESOLUTION
from cvat.apps.events.utils import compute_working_time_per_ids, is_contained

class WorkingTimeTestCase(unittest.TestCase):
_START_TIMESTAMP = datetime(2024, 1, 1, 12)
_SHORT_GAP = ClientEventsSerializer._TIME_THRESHOLD - timedelta(milliseconds=1)
_SHORT_GAP_INT = _SHORT_GAP / ClientEventsSerializer._WORKING_TIME_RESOLUTION
_LONG_GAP = ClientEventsSerializer._TIME_THRESHOLD
_LONG_GAP_INT = _LONG_GAP / ClientEventsSerializer._WORKING_TIME_RESOLUTION
_SHORT_GAP = MAX_EVENT_DURATION - timedelta(milliseconds=1)
_SHORT_GAP_INT = _SHORT_GAP / WORKING_TIME_RESOLUTION
_LONG_GAP = MAX_EVENT_DURATION
_LONG_GAP_INT = _LONG_GAP / WORKING_TIME_RESOLUTION

@staticmethod
def _instant_event(timestamp: datetime) -> dict:
Expand All @@ -33,16 +34,27 @@ def _compressed_event(timestamp: datetime, duration: timedelta) -> dict:
return {
"scope": "change:frame",
"timestamp": timestamp.isoformat(),
"duration": duration // ClientEventsSerializer._WORKING_TIME_RESOLUTION,
"duration": duration // WORKING_TIME_RESOLUTION,
}


@staticmethod
def _working_time(event: dict) -> int:
payload = json.loads(event["payload"])
return payload["working_time"]
def _get_actual_working_times(data: dict) -> list[int]:
data_copy = data.copy()
working_times = []
for event in data['events']:
data_copy['events'] = [event]
event_working_time = compute_working_time_per_ids(data_copy)
for working_time in event_working_time.values():
working_times.append((working_time['value'] // WORKING_TIME_RESOLUTION))
if data_copy['previous_event'] and is_contained(event, data_copy['previous_event']):
continue
data_copy['previous_event'] = event
return working_times


@staticmethod
def _deserialize(events: list[dict], previous_event: Optional[dict] = None) -> list[dict]:
def _deserialize(events: list[dict], previous_event: Optional[dict] = None) -> dict:
request = RequestFactory().post("/api/events")
request.user = get_user_model()(id=100, username="testuser", email="[email protected]")
request.iam_context = {
Expand All @@ -60,118 +72,134 @@ def _deserialize(events: list[dict], previous_event: Optional[dict] = None) -> l

s.is_valid(raise_exception=True)

return s.validated_data["events"]
return s.validated_data

def test_instant(self):
events = self._deserialize([
data = self._deserialize([
self._instant_event(self._START_TIMESTAMP),
])
self.assertEqual(self._working_time(events[0]), 0)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 0)

def test_compressed(self):
events = self._deserialize([
data = self._deserialize([
self._compressed_event(self._START_TIMESTAMP, self._LONG_GAP),
])
self.assertEqual(self._working_time(events[0]), self._LONG_GAP_INT)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], self._LONG_GAP_INT)

def test_instants_with_short_gap(self):
events = self._deserialize([
data = self._deserialize([
self._instant_event(self._START_TIMESTAMP),
self._instant_event(self._START_TIMESTAMP + self._SHORT_GAP),
])
self.assertEqual(self._working_time(events[0]), 0)
self.assertEqual(self._working_time(events[1]), self._SHORT_GAP_INT)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 0)
self.assertEqual(event_times[1], self._SHORT_GAP_INT)

def test_instants_with_long_gap(self):
events = self._deserialize([
data = self._deserialize([
self._instant_event(self._START_TIMESTAMP),
self._instant_event(self._START_TIMESTAMP + self._LONG_GAP),
])
self.assertEqual(self._working_time(events[0]), 0)
self.assertEqual(self._working_time(events[1]), 0)

event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 0)
self.assertEqual(event_times[1], 0)

def test_compressed_with_short_gap(self):
events = self._deserialize([
data = self._deserialize([
self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=1)),
self._compressed_event(
self._START_TIMESTAMP + timedelta(seconds=1) + self._SHORT_GAP,
timedelta(seconds=5)
),
])
self.assertEqual(self._working_time(events[0]), 1000)
self.assertEqual(self._working_time(events[1]), self._SHORT_GAP_INT + 5000)

event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 1000)
self.assertEqual(event_times[1], self._SHORT_GAP_INT + 5000)

def test_compressed_with_long_gap(self):
events = self._deserialize([
data = self._deserialize([
self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=1)),
self._compressed_event(
self._START_TIMESTAMP + timedelta(seconds=1) + self._LONG_GAP,
timedelta(seconds=5)
),
])
self.assertEqual(self._working_time(events[0]), 1000)
self.assertEqual(self._working_time(events[1]), 5000)

event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 1000)
self.assertEqual(event_times[1], 5000)

def test_compressed_contained(self):
events = self._deserialize([
data = self._deserialize([
self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=5)),
self._compressed_event(
self._START_TIMESTAMP + timedelta(seconds=3),
timedelta(seconds=1)
),
])
self.assertEqual(self._working_time(events[0]), 5000)
self.assertEqual(self._working_time(events[1]), 0)

event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 5000)
self.assertEqual(event_times[1], 0)

def test_compressed_overlapping(self):
events = self._deserialize([
data = self._deserialize([
self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=5)),
self._compressed_event(
self._START_TIMESTAMP + timedelta(seconds=3),
timedelta(seconds=6)
),
])
self.assertEqual(self._working_time(events[0]), 5000)
self.assertEqual(self._working_time(events[1]), 4000)

event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 5000)
self.assertEqual(event_times[1], 4000)

def test_instant_inside_compressed(self):
events = self._deserialize([
data = self._deserialize([
self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=5)),
self._instant_event(self._START_TIMESTAMP + timedelta(seconds=3)),
self._instant_event(self._START_TIMESTAMP + timedelta(seconds=6)),
])
self.assertEqual(self._working_time(events[0]), 5000)
self.assertEqual(self._working_time(events[1]), 0)
self.assertEqual(self._working_time(events[2]), 1000)

event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 5000)
self.assertEqual(event_times[1], 0)
self.assertEqual(event_times[2], 1000)


def test_previous_instant_short_gap(self):
events = self._deserialize(
data = self._deserialize(
[self._instant_event(self._START_TIMESTAMP + self._SHORT_GAP)],
previous_event=self._instant_event(self._START_TIMESTAMP),
)

self.assertEqual(self._working_time(events[0]), self._SHORT_GAP_INT)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], self._SHORT_GAP_INT)

def test_previous_instant_long_gap(self):
events = self._deserialize(
data = self._deserialize(
[self._instant_event(self._START_TIMESTAMP + self._LONG_GAP)],
previous_event=self._instant_event(self._START_TIMESTAMP),
)

self.assertEqual(self._working_time(events[0]), 0)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 0)

def test_previous_compressed_short_gap(self):
events = self._deserialize(
data = self._deserialize(
[self._instant_event(self._START_TIMESTAMP + timedelta(seconds=1) + self._SHORT_GAP)],
previous_event=self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=1)),
)

self.assertEqual(self._working_time(events[0]), self._SHORT_GAP_INT)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], self._SHORT_GAP_INT)

def test_previous_compressed_long_gap(self):
events = self._deserialize(
data = self._deserialize(
[self._instant_event(self._START_TIMESTAMP + timedelta(seconds=1) + self._LONG_GAP)],
previous_event=self._compressed_event(self._START_TIMESTAMP, timedelta(seconds=1)),
)

self.assertEqual(self._working_time(events[0]), 0)
event_times = self._get_actual_working_times(data)
self.assertEqual(event_times[0], 0)
53 changes: 53 additions & 0 deletions cvat/apps/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
#
# SPDX-License-Identifier: MIT

import datetime


from .const import MAX_EVENT_DURATION, COLLAPSED_EVENT_SCOPES
from .cache import clear_cache


def _prepare_objects_to_delete(object_to_delete):
from cvat.apps.engine.models import Project, Task, Segment, Job, Issue, Comment

Expand Down Expand Up @@ -63,3 +68,51 @@ def wrap(self, *args, **kwargs):
finally:
clear_cache()
return wrap


def get_end_timestamp(event: dict) -> datetime.datetime:
if event["scope"] in COLLAPSED_EVENT_SCOPES:
return event["timestamp"] + datetime.timedelta(milliseconds=event["duration"])
return event["timestamp"]

def is_contained(event1: dict, event2: dict) -> bool:
return event1['timestamp'] < get_end_timestamp(event2)

def compute_working_time_per_ids(data: dict) -> dict:
def read_ids(event: dict) -> tuple[int | None, int | None, int | None]:
return event.get("job_id"), event.get("task_id"), event.get("project_id")

if previous_event := data["previous_event"]:
previous_end_timestamp = get_end_timestamp(previous_event)
previous_ids = read_ids(previous_event)
elif data["events"]:
previous_end_timestamp = data["events"][0]["timestamp"]
previous_ids = read_ids(data["events"][0])

working_time_per_ids = {}
for event in data["events"]:
working_time = datetime.timedelta()
timestamp = event["timestamp"]

if timestamp > previous_end_timestamp:
t_diff = timestamp - previous_end_timestamp
if t_diff < MAX_EVENT_DURATION:
working_time += t_diff

previous_end_timestamp = timestamp

end_timestamp = get_end_timestamp(event)
if end_timestamp > previous_end_timestamp:
working_time += end_timestamp - previous_end_timestamp
previous_end_timestamp = end_timestamp

if previous_ids not in working_time_per_ids:
working_time_per_ids[previous_ids] = {
"value": datetime.timedelta(),
"timestamp": timestamp,
}

working_time_per_ids[previous_ids]["value"] += working_time
previous_ids = read_ids(event)

return working_time_per_ids
Loading