diff --git a/onadata/apps/api/permissions.py b/onadata/apps/api/permissions.py
index efe64ad4cc..07485f9903 100644
--- a/onadata/apps/api/permissions.py
+++ b/onadata/apps/api/permissions.py
@@ -370,7 +370,7 @@ def has_object_permission(self, request, view, obj):
model_cls = XForm
user = request.user
- return self._has_object_permission(request, model_cls, user, obj.instance.xform)
+ return self._has_object_permission(request, model_cls, user, obj.xform)
class ConnectViewsetPermissions(IsAuthenticated):
diff --git a/onadata/apps/api/tests/viewsets/test_attachment_viewset.py b/onadata/apps/api/tests/viewsets/test_attachment_viewset.py
index 4026d6a500..64d7a9562f 100644
--- a/onadata/apps/api/tests/viewsets/test_attachment_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_attachment_viewset.py
@@ -89,10 +89,11 @@ def test_attachment_pagination(self):
extension="JPG",
name=filename,
media_file=media_file,
+ xform=self.xform,
)
# not using pagination params
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.list_view(request)
self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
@@ -100,7 +101,9 @@ def test_attachment_pagination(self):
self.assertEqual(len(response.data), 2)
# valid page and page_size
- request = self.factory.get("/", data={"page": 1, "page_size": 1}, **self.extra)
+ request = self.factory.get(
+ "/", data={"xform": self.xform.pk, "page": 1, "page_size": 1}, **self.extra
+ )
response = self.list_view(request)
self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
@@ -108,12 +111,16 @@ def test_attachment_pagination(self):
self.assertEqual(len(response.data), 1)
# invalid page type
- request = self.factory.get("/", data={"page": "invalid"}, **self.extra)
+ request = self.factory.get(
+ "/", data={"xform": self.xform.pk, "page": "invalid"}, **self.extra
+ )
response = self.list_view(request)
self.assertEqual(response.status_code, 404)
# invalid page size type
- request = self.factory.get("/", data={"page_size": "invalid"}, **self.extra)
+ request = self.factory.get(
+ "/", data={"xform": self.xform.pk, "page_size": "invalid"}, **self.extra
+ )
response = self.list_view(request)
self.assertEqual(response.status_code, 200)
self.assertTrue(isinstance(response.data, list))
@@ -121,13 +128,17 @@ def test_attachment_pagination(self):
# invalid page and page_size types
request = self.factory.get(
- "/", data={"page": "invalid", "page_size": "invalid"}, **self.extra
+ "/",
+ data={"xform": self.xform.pk, "page": "invalid", "page_size": "invalid"},
+ **self.extra,
)
response = self.list_view(request)
self.assertEqual(response.status_code, 404)
# invalid page size
- request = self.factory.get("/", data={"page": 4, "page_size": 1}, **self.extra)
+ request = self.factory.get(
+ "/", data={"xform": self.xform.pk, "page": 4, "page_size": 1}, **self.extra
+ )
response = self.list_view(request)
self.assertEqual(response.status_code, 404)
@@ -170,7 +181,7 @@ def test_retrieve_and_list_views_with_anonymous_user(self):
def test_list_view(self):
self._submit_transport_instance_w_attachment()
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.list_view(request)
self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
@@ -181,7 +192,7 @@ def test_list_view(self):
self.attachment.instance.deleted_at = timezone.now()
self.attachment.instance.save()
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.list_view(request)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 0)
@@ -189,7 +200,7 @@ def test_list_view(self):
def test_data_list_with_xform_in_delete_async(self):
self._submit_transport_instance_w_attachment()
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.list_view(request)
self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
@@ -198,7 +209,7 @@ def test_data_list_with_xform_in_delete_async(self):
self.xform.deleted_at = timezone.now()
self.xform.save()
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.list_view(request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), initial_count - 1)
@@ -276,6 +287,7 @@ def test_list_view_filter_by_attachment_type(self):
extension="MP4",
name=filename,
media_file=media_file,
+ xform=self.xform,
)
Attachment.objects.create(
@@ -284,6 +296,7 @@ def test_list_view_filter_by_attachment_type(self):
extension="PDF",
name=filename,
media_file=media_file,
+ xform=self.xform,
)
Attachment.objects.create(
instance=self.xform.instances.first(),
@@ -291,6 +304,7 @@ def test_list_view_filter_by_attachment_type(self):
extension="TXT",
name=filename,
media_file=media_file,
+ xform=self.xform,
)
Attachment.objects.create(
instance=self.xform.instances.first(),
@@ -298,6 +312,7 @@ def test_list_view_filter_by_attachment_type(self):
extension="MP3",
name=filename,
media_file=media_file,
+ xform=self.xform,
)
Attachment.objects.create(
instance=self.xform.instances.first(),
@@ -305,12 +320,13 @@ def test_list_view_filter_by_attachment_type(self):
extension="GEOJSON",
name=geojson_filename,
media_file=geojson_media_file,
+ xform=self.xform,
)
- data = {}
+ data = {"xform": self.xform.pk}
request = self.factory.get("/", data, **self.extra)
response = self.list_view(request)
- self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
+ self.assertNotEqual(response.get("Cache-Control"), None)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 6)
@@ -318,8 +334,8 @@ def test_list_view_filter_by_attachment_type(self):
data["type"] = "image"
request = self.factory.get("/", data, **self.extra)
response = self.list_view(request)
- self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
+ self.assertNotEqual(response.get("Cache-Control"), None)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["mimetype"], "image/jpeg")
@@ -328,8 +344,8 @@ def test_list_view_filter_by_attachment_type(self):
data["type"] = "audio"
request = self.factory.get("/", data, **self.extra)
response = self.list_view(request)
- self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
+ self.assertNotEqual(response.get("Cache-Control"), None)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["mimetype"], "audio/mp3")
@@ -338,8 +354,8 @@ def test_list_view_filter_by_attachment_type(self):
data["type"] = "video"
request = self.factory.get("/", data, **self.extra)
response = self.list_view(request)
- self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
+ self.assertNotEqual(response.get("Cache-Control"), None)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["mimetype"], "video/mp4")
@@ -348,8 +364,8 @@ def test_list_view_filter_by_attachment_type(self):
data["type"] = "document"
request = self.factory.get("/", data, **self.extra)
response = self.list_view(request)
- self.assertNotEqual(response.get("Cache-Control"), None)
self.assertEqual(response.status_code, 200)
+ self.assertNotEqual(response.get("Cache-Control"), None)
self.assertTrue(isinstance(response.data, list))
self.assertEqual(len(response.data), 3)
self.assertEqual(response.data[0]["mimetype"], "application/pdf")
diff --git a/onadata/apps/api/tests/viewsets/test_export_viewset.py b/onadata/apps/api/tests/viewsets/test_export_viewset.py
index 24399b48cc..f9005f5fe6 100644
--- a/onadata/apps/api/tests/viewsets/test_export_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_export_viewset.py
@@ -18,8 +18,7 @@
from onadata.apps.main.models import MetaData, UserProfile
from onadata.apps.main.tests.test_base import TestBase
from onadata.apps.viewer.models.export import Export
-from onadata.libs.permissions import (
- DataEntryMinorRole, ReadOnlyRole, EditorMinorRole)
+from onadata.libs.permissions import DataEntryMinorRole, ReadOnlyRole, EditorMinorRole
from onadata.libs.utils.export_tools import generate_export
@@ -31,9 +30,8 @@ class TestExportViewSet(TestBase):
def setUp(self):
super(TestExportViewSet, self).setUp()
self.factory = APIRequestFactory()
- self.formats = ['csv', 'csvzip', 'kml', 'osm', 'savzip', 'xls',
- 'xlsx', 'zip']
- self.view = ExportViewSet.as_view({'get': 'retrieve'})
+ self.formats = ["csv", "csvzip", "kml", "osm", "savzip", "xls", "xlsx", "zip"]
+ self.view = ExportViewSet.as_view({"get": "retrieve"})
def test_export_response(self):
"""
@@ -42,17 +40,17 @@ def test_export_response(self):
self._create_user_and_login()
self._publish_transportation_form()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- export = Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)
+ export = Export.objects.create(
+ xform=self.xform, filename=filename, filedir=filedir
+ )
export.save()
- request = self.factory.get('/export')
+ request = self.factory.get("/export")
force_authenticate(request, user=self.user)
response = self.view(request, pk=export.pk)
- self.assertIn(filename, response.get('Content-Disposition'))
+ self.assertIn(filename, response.get("Content-Disposition"))
def test_export_formats_present(self):
"""
@@ -70,7 +68,7 @@ def test_export_non_existent_file(self):
self._create_user_and_login()
non_existent_pk = 1525266252676
for ext in self.formats:
- request = self.factory.get('/export')
+ request = self.factory.get("/export")
force_authenticate(request, user=self.user)
response = self.view(request, pk=non_existent_pk, format=ext)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@@ -80,8 +78,8 @@ def test_export_list(self):
Test ExportViewSet list endpoint.
"""
self._create_user_and_login()
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export')
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export")
force_authenticate(request, user=self.user)
response = view(request)
self.assertFalse(bool(response.data))
@@ -96,38 +94,46 @@ def test_export_list_public(self):
self.xform.shared_data = True
self.xform.save()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- export = Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)
+ export = Export.objects.create(
+ xform=self.xform, filename=filename, filedir=filedir
+ )
export.save()
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export')
+ view = ExportViewSet.as_view({"get": "list"})
+
+ # Should be empty list when no xform filter is provided
+ request = self.factory.get("/export")
force_authenticate(request, user=self.user)
response = view(request)
- self.assertTrue(bool(response.data))
+ self.assertEqual(response.data, [])
+
+ # Should not be empty list when xform filter is provided
+ request = self.factory.get("/export", data={"xform": self.xform.pk})
+ force_authenticate(request, user=self.user)
+ response = view(request)
+ self.assertNotEqual(response.data, [])
self.assertEqual(status.HTTP_200_OK, response.status_code)
def test_export_list_public_form(self):
"""
Test ExportViewSet list endpoint for a single public form.
"""
- user_mosh = self._create_user('mosh', 'mosh')
+ user_mosh = self._create_user("mosh", "mosh")
self._publish_transportation_form()
self.xform.shared_data = True
self.xform.save()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- export = Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)
+ export = Export.objects.create(
+ xform=self.xform, filename=filename, filedir=filedir
+ )
export.save()
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export', {'xform': self.xform.pk})
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", {"xform": self.xform.pk})
force_authenticate(request, user=user_mosh)
response = view(request)
self.assertTrue(bool(response.data))
@@ -141,11 +147,10 @@ def test_export_public_project(self):
self._publish_transportation_form()
self.xform.shared_data = True
self.xform.save()
- export = generate_export(Export.CSV_EXPORT,
- self.xform,
- None,
- {"extension": "csv"})
- request = self.factory.get('/export')
+ export = generate_export(
+ Export.CSV_EXPORT, self.xform, None, {"extension": "csv"}
+ )
+ request = self.factory.get("/export")
response = self.view(request, pk=export.pk)
self.assertEqual(status.HTTP_200_OK, response.status_code)
@@ -158,11 +163,10 @@ def test_export_public_authenticated(self):
self._publish_transportation_form()
self.xform.shared_data = True
self.xform.save()
- export = generate_export(Export.CSV_EXPORT,
- self.xform,
- None,
- {"extension": "csv"})
- request = self.factory.get('/export')
+ export = generate_export(
+ Export.CSV_EXPORT, self.xform, None, {"extension": "csv"}
+ )
+ request = self.factory.get("/export")
force_authenticate(request, user=self.user)
response = self.view(request, pk=export.pk)
self.assertEqual(status.HTTP_200_OK, response.status_code)
@@ -177,23 +181,21 @@ def test_export_public_not_owner_authenticated(self):
self.xform.shared_data = True
self.xform.shared = True
self.xform.save()
- test_user = self._create_user('not_bob', 'pass')
- request = self.factory.get('/export')
+ test_user = self._create_user("not_bob", "pass")
+ request = self.factory.get("/export")
force_authenticate(request, user=test_user)
# csv export
- export = generate_export(Export.CSV_EXPORT,
- self.xform,
- None,
- {"extension": "csv"})
- export.options = {"query": {"_submitted_by": 'not_bob'}}
+ export = generate_export(
+ Export.CSV_EXPORT, self.xform, None, {"extension": "csv"}
+ )
+ export.options = {"query": {"_submitted_by": "not_bob"}}
export.save()
response = self.view(request, pk=export.pk)
self.assertEqual(status.HTTP_200_OK, response.status_code)
# sav export
- export = generate_export(Export.SAV_ZIP_EXPORT,
- self.xform,
- None,
- {"extension": "zip"})
+ export = generate_export(
+ Export.SAV_ZIP_EXPORT, self.xform, None, {"extension": "zip"}
+ )
response = self.view(request, pk=export.pk)
self.assertEqual(status.HTTP_200_OK, response.status_code)
@@ -206,11 +208,10 @@ def test_export_non_public_export(self):
self._publish_transportation_form()
self.xform.shared_data = False
self.xform.save()
- export = generate_export(Export.CSV_EXPORT,
- self.xform,
- None,
- {"extension": "csv"})
- request = self.factory.get('/export')
+ export = generate_export(
+ Export.CSV_EXPORT, self.xform, None, {"extension": "csv"}
+ )
+ request = self.factory.get("/export")
response = self.view(request, pk=export.pk)
self.assertEqual(status.HTTP_404_NOT_FOUND, response.status_code)
@@ -221,19 +222,19 @@ def test_export_list_on_user(self):
self._create_user_and_login()
self._publish_transportation_form()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- exports = [Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)]
+ exports = [
+ Export.objects.create(xform=self.xform, filename=filename, filedir=filedir)
+ ]
exports[0].save()
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export', data={'xform': self.xform.id})
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=self.user)
response = view(request)
self.assertEqual(len(exports), len(response.data))
- self.assertEqual(exports[0].id, response.data[0].get('id'))
+ self.assertEqual(exports[0].id, response.data[0].get("id"))
self.assertEqual(status.HTTP_200_OK, response.status_code)
def test_export_list_on_with_different_users(self):
@@ -243,16 +244,16 @@ def test_export_list_on_with_different_users(self):
self._create_user_and_login()
self._publish_transportation_form()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- export = Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)
+ export = Export.objects.create(
+ xform=self.xform, filename=filename, filedir=filedir
+ )
export.save()
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export', data={'xform': self.xform.id})
- self._create_user_and_login(username='mary', password='password1')
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
+ self._create_user_and_login(username="mary", password="password1")
force_authenticate(request, user=self.user)
response = view(request)
self.assertFalse(bool(response.data))
@@ -277,17 +278,17 @@ def test_export_delete(self):
bob = self.user
export = Export.objects.create(xform=xform)
export.save()
- view = ExportViewSet.as_view({'delete': 'destroy'})
+ view = ExportViewSet.as_view({"delete": "destroy"})
# mary has no access hence cannot delete
- self._create_user_and_login(username='mary', password='password1')
- request = self.factory.delete('/export')
+ self._create_user_and_login(username="mary", password="password1")
+ request = self.factory.delete("/export")
force_authenticate(request, user=self.user)
response = view(request, pk=export.pk)
self.assertEqual(status.HTTP_404_NOT_FOUND, response.status_code)
# bob has access hence can delete
- request = self.factory.delete('/export')
+ request = self.factory.delete("/export")
force_authenticate(request, user=bob)
response = view(request, pk=export.pk)
self.assertEqual(status.HTTP_204_NO_CONTENT, response.status_code)
@@ -302,16 +303,24 @@ def test_export_list_with_meta_perms(self):
for survey in self.surveys:
self._make_submission(
os.path.join(
- settings.PROJECT_ROOT, 'apps',
- 'main', 'tests', 'fixtures', 'transportation',
- 'instances', survey, survey + '.xml'),
- forced_submission_time=parse_datetime(
- '2013-02-18 15:54:01Z'))
-
- alice = self._create_user('alice', 'alice', True)
-
- MetaData.xform_meta_permission(self.xform,
- data_value="editor|dataentry-minor")
+ settings.PROJECT_ROOT,
+ "apps",
+ "main",
+ "tests",
+ "fixtures",
+ "transportation",
+ "instances",
+ survey,
+ survey + ".xml",
+ ),
+ forced_submission_time=parse_datetime("2013-02-18 15:54:01Z"),
+ )
+
+ alice = self._create_user("alice", "alice", True)
+
+ MetaData.xform_meta_permission(
+ self.xform, data_value="editor|dataentry-minor"
+ )
DataEntryMinorRole.add(alice, self.xform)
@@ -319,36 +328,30 @@ def test_export_list_with_meta_perms(self):
i.user = alice
i.save()
- view = XFormViewSet.as_view({
- 'get': 'retrieve'
- })
+ view = XFormViewSet.as_view({"get": "retrieve"})
- alices_extra = {
- 'HTTP_AUTHORIZATION': 'Token %s' % alice.auth_token.key
- }
+ alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % alice.auth_token.key}
# Alice creates an export with her own submissions
- request = self.factory.get('/', **alices_extra)
- response = view(request, pk=self.xform.pk, format='csv')
+ request = self.factory.get("/", **alices_extra)
+ response = view(request, pk=self.xform.pk, format="csv")
self.assertEqual(response.status_code, 200)
exports = Export.objects.filter(xform=self.xform)
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=alice)
response = view(request)
self.assertEqual(len(exports), len(response.data))
# Mary should not have access to the export with Alice's
# submissions.
- self._create_user_and_login(username='mary', password='password1')
- self.assertEqual(self.user.username, 'mary')
+ self._create_user_and_login(username="mary", password="password1")
+ self.assertEqual(self.user.username, "mary")
# Mary should only view their own submissions.
DataEntryMinorRole.add(self.user, self.xform)
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=self.user)
response = view(request)
self.assertFalse(bool(response.data), response.data)
@@ -364,16 +367,24 @@ def test_export_async_with_meta_perms(self):
for survey in self.surveys:
self._make_submission(
os.path.join(
- settings.PROJECT_ROOT, 'apps',
- 'main', 'tests', 'fixtures', 'transportation',
- 'instances', survey, survey + '.xml'),
- forced_submission_time=parse_datetime(
- '2013-02-18 15:54:01Z'))
-
- alice = self._create_user('alice', 'alice', True)
-
- MetaData.xform_meta_permission(self.xform,
- data_value="editor|dataentry-minor")
+ settings.PROJECT_ROOT,
+ "apps",
+ "main",
+ "tests",
+ "fixtures",
+ "transportation",
+ "instances",
+ survey,
+ survey + ".xml",
+ ),
+ forced_submission_time=parse_datetime("2013-02-18 15:54:01Z"),
+ )
+
+ alice = self._create_user("alice", "alice", True)
+
+ MetaData.xform_meta_permission(
+ self.xform, data_value="editor|dataentry-minor"
+ )
DataEntryMinorRole.add(alice, self.xform)
@@ -381,37 +392,34 @@ def test_export_async_with_meta_perms(self):
i.user = alice
i.save()
- view = XFormViewSet.as_view({
- 'get': 'export_async',
- })
+ view = XFormViewSet.as_view(
+ {
+ "get": "export_async",
+ }
+ )
- alices_extra = {
- 'HTTP_AUTHORIZATION': 'Token %s' % alice.auth_token.key
- }
+ alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % alice.auth_token.key}
# Alice creates an export with her own submissions
- request = self.factory.get('/', data={"format": 'csv'},
- **alices_extra)
+ request = self.factory.get("/", data={"format": "csv"}, **alices_extra)
response = view(request, pk=self.xform.pk)
self.assertEqual(response.status_code, 202)
exports = Export.objects.filter(xform=self.xform)
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=alice)
response = view(request)
self.assertEqual(len(exports), len(response.data))
# Mary should not have access to the export with Alice's
# submissions.
- self._create_user_and_login(username='mary', password='password1')
- self.assertEqual(self.user.username, 'mary')
+ self._create_user_and_login(username="mary", password="password1")
+ self.assertEqual(self.user.username, "mary")
# Mary should only view their own submissions.
DataEntryMinorRole.add(self.user, self.xform)
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=self.user)
response = view(request)
self.assertFalse(bool(response.data), response.data)
@@ -427,37 +435,43 @@ def test_export_readonly_with_meta_perms(self):
for survey in self.surveys:
self._make_submission(
os.path.join(
- settings.PROJECT_ROOT, 'apps',
- 'main', 'tests', 'fixtures', 'transportation',
- 'instances', survey, survey + '.xml'),
- forced_submission_time=parse_datetime(
- '2013-02-18 15:54:01Z'))
-
- alice = self._create_user('alice', 'alice', True)
-
- MetaData.xform_meta_permission(self.xform,
- data_value="editor|dataentry-minor")
+ settings.PROJECT_ROOT,
+ "apps",
+ "main",
+ "tests",
+ "fixtures",
+ "transportation",
+ "instances",
+ survey,
+ survey + ".xml",
+ ),
+ forced_submission_time=parse_datetime("2013-02-18 15:54:01Z"),
+ )
+
+ alice = self._create_user("alice", "alice", True)
+
+ MetaData.xform_meta_permission(
+ self.xform, data_value="editor|dataentry-minor"
+ )
ReadOnlyRole.add(alice, self.xform)
- export_view = XFormViewSet.as_view({
- 'get': 'export_async',
- })
+ export_view = XFormViewSet.as_view(
+ {
+ "get": "export_async",
+ }
+ )
- alices_extra = {
- 'HTTP_AUTHORIZATION': 'Token %s' % alice.auth_token.key
- }
+ alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % alice.auth_token.key}
# Alice creates an export with her own submissions
- request = self.factory.get('/', data={"format": 'csv'},
- **alices_extra)
+ request = self.factory.get("/", data={"format": "csv"}, **alices_extra)
response = export_view(request, pk=self.xform.pk)
self.assertEqual(response.status_code, 202)
exports = Export.objects.filter(xform=self.xform)
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=alice)
response = view(request)
self.assertEqual(len(exports), len(response.data))
@@ -465,13 +479,12 @@ def test_export_readonly_with_meta_perms(self):
# Mary should not have access to the export with Alice's
# submissions.
- self._create_user_and_login(username='mary', password='password1')
- self.assertEqual(self.user.username, 'mary')
+ self._create_user_and_login(username="mary", password="password1")
+ self.assertEqual(self.user.username, "mary")
# Mary should only view their own submissions.
DataEntryMinorRole.add(self.user, self.xform)
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=self.user)
response = view(request)
self.assertFalse(bool(response.data), response.data)
@@ -483,24 +496,21 @@ def test_export_readonly_with_meta_perms(self):
i.save()
# Mary creates an export with her own submissions
- request = self.factory.get('/', data={"format": 'csv'})
+ request = self.factory.get("/", data={"format": "csv"})
force_authenticate(request, user=self.user)
response = export_view(request, pk=self.xform.pk)
self.assertEqual(response.status_code, 202)
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=self.user)
response = view(request)
self.assertTrue(bool(response.data), response.data)
self.assertEqual(status.HTTP_200_OK, response.status_code)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- Export.objects.filter(xform=self.xform).count(), 2)
+ self.assertEqual(Export.objects.filter(xform=self.xform).count(), 2)
# Alice does not have access to the submitter only export
- request = self.factory.get('/export',
- data={'xform': self.xform.id})
+ request = self.factory.get("/export", data={"xform": self.xform.id})
force_authenticate(request, user=alice)
response = view(request)
self.assertEqual(len(exports), len(response.data))
@@ -513,18 +523,16 @@ def test_export_retrieval_authentication(self):
self._create_user_and_login()
self._publish_transportation_form()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- export = Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)
+ export = Export.objects.create(
+ xform=self.xform, filename=filename, filedir=filedir
+ )
export.save()
- extra = {
- 'HTTP_AUTHORIZATION': f'Token {self.user.auth_token.key}'
- }
+ extra = {"HTTP_AUTHORIZATION": f"Token {self.user.auth_token.key}"}
- request = self.factory.get('/export', **extra)
+ request = self.factory.get("/export", **extra)
response = self.view(request, pk=export.pk)
self.assertEqual(response.status_code, 200)
@@ -537,41 +545,39 @@ def test_export_failure_reason_returned(self):
Export.objects.create(
xform=self.xform,
internal_status=Export.FAILED,
- error_message="Something unexpected happened")
+ error_message="Something unexpected happened",
+ )
extra = {
- 'HTTP_AUTHORIZATION': f'Token {self.user.auth_token.key}',
+ "HTTP_AUTHORIZATION": f"Token {self.user.auth_token.key}",
}
- view = ExportViewSet.as_view({'get': 'list'})
- request = self.factory.get(
- '/export', {'xform': self.xform.pk}, **extra)
+ view = ExportViewSet.as_view({"get": "list"})
+ request = self.factory.get("/export", {"xform": self.xform.pk}, **extra)
force_authenticate(request)
response = view(request)
self.assertEqual(response.status_code, 200)
- self.assertIn('error_message', response.data[0].keys())
+ self.assertIn("error_message", response.data[0].keys())
self.assertEqual(
- response.data[0]['error_message'],
- 'Something unexpected happened')
+ response.data[0]["error_message"], "Something unexpected happened"
+ )
def test_export_are_downloadable_to_all_users_when_public_form(self):
self._create_user_and_login()
self._publish_transportation_form()
temp_dir = settings.MEDIA_ROOT
- dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir)
+ dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir)
filename = os.path.basename(dummy_export_file.name)
filedir = os.path.dirname(dummy_export_file.name)
- export = Export.objects.create(xform=self.xform,
- filename=filename,
- filedir=filedir)
+ export = Export.objects.create(
+ xform=self.xform, filename=filename, filedir=filedir
+ )
export.save()
- user_alice = self._create_user('alice', 'alice')
+ user_alice = self._create_user("alice", "alice")
# create user profile and set require_auth to false for tests
_ = UserProfile.objects.get_or_create(user=user_alice)
- alices_extra = {
- 'HTTP_AUTHORIZATION': 'Token %s' % user_alice.auth_token.key
- }
+ alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % user_alice.auth_token.key}
EditorMinorRole.add(user_alice, self.xform)
# Form permissions are ignored when downloading Export;
@@ -584,11 +590,11 @@ def test_export_are_downloadable_to_all_users_when_public_form(self):
self.xform.save()
# Anonymous user
- request = self.factory.get('/export')
+ request = self.factory.get("/export")
response = self.view(request, pk=export.pk)
self.assertEqual(response.status_code, 200)
# Alice user; With editor role
- request = self.factory.get('/export', **alices_extra)
+ request = self.factory.get("/export", **alices_extra)
response = self.view(request, pk=export.pk)
self.assertEqual(response.status_code, 200)
diff --git a/onadata/apps/api/tests/viewsets/test_media_viewset.py b/onadata/apps/api/tests/viewsets/test_media_viewset.py
index 37f3623e38..f36be593ab 100644
--- a/onadata/apps/api/tests/viewsets/test_media_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_media_viewset.py
@@ -34,7 +34,7 @@ class TestMediaViewSet(TestAbstractViewSet, TestBase):
"""
def setUp(self):
- super(TestMediaViewSet, self).setUp()
+ super().setUp()
self.retrieve_view = MediaViewSet.as_view({"get": "retrieve"})
self._publish_xls_form_to_project()
diff --git a/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py b/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py
index b5e2607405..05b2441714 100644
--- a/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py
@@ -13,8 +13,7 @@
from django.conf import settings
from django.core.files.base import File
-from onadata.apps.api.tests.viewsets.test_abstract_viewset import \
- TestAbstractViewSet
+from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet
from onadata.apps.api.viewsets.charts_viewset import ChartsViewSet
from onadata.apps.api.viewsets.attachment_viewset import AttachmentViewSet
from onadata.apps.api.viewsets.data_viewset import DataViewSet
@@ -27,8 +26,7 @@
from onadata.apps.logger.models.instance import FormIsMergedDatasetError
from onadata.apps.logger.models.open_data import get_or_create_opendata
from onadata.apps.restservice.models import RestService
-from onadata.apps.restservice.viewsets.restservices_viewset import \
- RestServicesViewSet
+from onadata.apps.restservice.viewsets.restservices_viewset import RestServicesViewSet
from onadata.libs.utils.export_tools import get_osm_data_kwargs
from onadata.libs.utils.user_auth import get_user_default_project
from onadata.libs.serializers.attachment_serializer import AttachmentSerializer
@@ -73,22 +71,20 @@ def streaming_data(response):
"""
Iterates through a streaming response to return a json list object
"""
- return json.loads(u''.join(
- [i.decode('utf-8') for i in response.streaming_content]))
+ return json.loads("".join([i.decode("utf-8") for i in response.streaming_content]))
def _add_attachments_to_instances(instance):
attachment_file_path = os.path.join(
- settings.PROJECT_ROOT,
- "libs",
- "tests",
- "utils",
- "fixtures",
- "test-image.png"
+ settings.PROJECT_ROOT, "libs", "tests", "utils", "fixtures", "test-image.png"
)
with open(attachment_file_path, "rb") as file:
- Attachment.objects.create(instance=instance, media_file=File(
- file, attachment_file_path))
+ Attachment.objects.create(
+ instance=instance,
+ media_file=File(file, attachment_file_path),
+ xform=instance.xform,
+ user=instance.user,
+ )
def _make_submissions_merged_datasets(merged_xform):
@@ -107,51 +103,51 @@ class TestMergedXFormViewSet(TestAbstractViewSet):
"""Test merged dataset functionality."""
def _create_merged_dataset(self, geo=False):
- view = MergedXFormViewSet.as_view({
- 'post': 'create',
- })
+ view = MergedXFormViewSet.as_view(
+ {
+ "post": "create",
+ }
+ )
# pylint: disable=attribute-defined-outside-init
self.project = get_user_default_project(self.user)
- xform1 = self._publish_markdown(MD, self.user, id_string='a')
- xform2 = self._publish_markdown(MD, self.user, id_string='b')
+ xform1 = self._publish_markdown(MD, self.user, id_string="a")
+ xform2 = self._publish_markdown(MD, self.user, id_string="b")
if geo:
xform2.instances_with_geopoints = True
- xform2.save(update_fields=['instances_with_geopoints'])
+ xform2.save(update_fields=["instances_with_geopoints"])
data = {
- 'xforms': [
+ "xforms": [
"http://testserver/api/v1/forms/%s" % xform1.pk,
"http://testserver/api/v1/forms/%s" % xform2.pk,
],
- 'name':
- 'Merged Dataset',
- 'project':
- f"http://testserver/api/v1/projects/{self.project.pk}",
+ "name": "Merged Dataset",
+ "project": f"http://testserver/api/v1/projects/{self.project.pk}",
}
# anonymous user
- request = self.factory.post('/', data=data)
+ request = self.factory.post("/", data=data)
response = view(request)
self.assertEqual(response.status_code, 401)
- request = self.factory.post('/', data=data, **self.extra)
+ request = self.factory.post("/", data=data, **self.extra)
response = view(request)
self.assertEqual(response.status_code, 201)
- self.assertIn('id', response.data)
- self.assertIn('title', response.data)
- self.assertIn('xforms', response.data)
+ self.assertIn("id", response.data)
+ self.assertIn("title", response.data)
+ self.assertIn("xforms", response.data)
expected_xforms_data = {
- 'id': xform1.pk,
- 'title': xform1.title,
- 'id_string': xform1.id_string,
- 'url': "http://testserver/api/v1/forms/%s" % xform1.pk,
- 'num_of_submissions': xform1.num_of_submissions,
- 'owner': xform1.user.username,
- 'project_id': self.project.pk,
- 'project_name': self.project.name
+ "id": xform1.pk,
+ "title": xform1.title,
+ "id_string": xform1.id_string,
+ "url": "http://testserver/api/v1/forms/%s" % xform1.pk,
+ "num_of_submissions": xform1.num_of_submissions,
+ "owner": xform1.user.username,
+ "project_id": self.project.pk,
+ "project_name": self.project.name,
}
- self.assertEqual(response.data['xforms'][0], expected_xforms_data)
- self.assertIsNotNone(response.data['uuid'])
- self.assertEqual(len(response.data['uuid']), 32)
+ self.assertEqual(response.data["xforms"][0], expected_xforms_data)
+ self.assertIsNotNone(response.data["uuid"])
+ self.assertEqual(len(response.data["uuid"]), 32)
return response.data
@@ -161,10 +157,12 @@ def test_create_merged_dataset(self):
def test_merged_datasets_list(self):
"""Test list endpoint of a merged dataset"""
- view = MergedXFormViewSet.as_view({
- 'get': 'list',
- })
- request = self.factory.get('/')
+ view = MergedXFormViewSet.as_view(
+ {
+ "get": "list",
+ }
+ )
+ request = self.factory.get("/")
# Empty list when there are no merged datasets
response = view(request)
@@ -182,31 +180,28 @@ def test_merged_datasets_list(self):
self.assertEqual([], response.data)
# A list containing the merged datasets for user bob
- request = self.factory.get('/', **self.extra)
+ request = self.factory.get("/", **self.extra)
response = view(request)
self.assertEqual(response.status_code, 200)
self.assertIsInstance(response.data, list)
self.assertIn(merged_dataset, response.data)
# merged dataset included in api/forms endpoint
- request = self.factory.get('/', **self.extra)
- view = XFormViewSet.as_view({'get': 'list'})
+ request = self.factory.get("/", **self.extra)
+ view = XFormViewSet.as_view({"get": "list"})
response = view(request)
self.assertEqual(response.status_code, 200)
self.assertIsInstance(response.data, list)
self.assertEqual(len(response.data), 3)
- self.assertIn(merged_dataset['id'],
- [d['formid'] for d in response.data])
- data = [
- _ for _ in response.data if _['formid'] == merged_dataset['id']
- ][0]
- self.assertIn('is_merged_dataset', data)
- self.assertTrue(data['is_merged_dataset'])
+ self.assertIn(merged_dataset["id"], [d["formid"] for d in response.data])
+ data = [_ for _ in response.data if _["formid"] == merged_dataset["id"]][0]
+ self.assertIn("is_merged_dataset", data)
+ self.assertTrue(data["is_merged_dataset"])
def test_merged_datasets_retrieve(self):
"""Test retrieving a specific merged dataset"""
merged_dataset = self._create_merged_dataset(geo=True)
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
# make submission to form b
form_b = merged_xform.xforms.all()[1]
@@ -216,59 +211,63 @@ def test_merged_datasets_retrieve(self):
form_b.refresh_from_db()
form_b.last_submission_time = instance.date_created
form_b.save()
- view = MergedXFormViewSet.as_view({'get': 'retrieve'})
- request = self.factory.get('/')
+ view = MergedXFormViewSet.as_view({"get": "retrieve"})
+ request = self.factory.get("/")
# status_code is 404 when the pk doesn't exist
- response = view(request, pk=(1000 * merged_dataset['id']))
+ response = view(request, pk=(1000 * merged_dataset["id"]))
self.assertEqual(response.status_code, 404)
# status_code is 404 when: pk exists, user is not authenticated
- response = view(request, pk=merged_dataset['id'])
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 404)
# status_code is 200 when: pk exists, user is authenticated
- request = self.factory.get('/', **self.extra)
- response = view(request, pk=merged_dataset['id'])
+ request = self.factory.get("/", **self.extra)
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
# data has expected fields
- self.assertIn('id', response.data)
- self.assertIn('title', response.data)
- self.assertIn('xforms', response.data)
- self.assertEqual(response.data['num_of_submissions'], 1)
- self.assertEqual(response.data['last_submission_time'],
- form_b.last_submission_time.isoformat())
+ self.assertIn("id", response.data)
+ self.assertIn("title", response.data)
+ self.assertIn("xforms", response.data)
+ self.assertEqual(response.data["num_of_submissions"], 1)
+ self.assertEqual(
+ response.data["last_submission_time"],
+ form_b.last_submission_time.isoformat(),
+ )
# merged dataset should be available at api/forms/[pk] endpoint
- request = self.factory.get('/', **self.extra)
- view = XFormViewSet.as_view({'get': 'retrieve'})
- response = view(request, pk=merged_dataset['id'])
- self.assertEqual(response.status_code, 200)
- self.assertEqual(merged_dataset['id'], response.data['formid'])
- self.assertIn('is_merged_dataset', response.data)
- self.assertTrue(response.data['is_merged_dataset'])
- self.assertTrue(response.data['instances_with_geopoints'])
- self.assertEqual(response.data['num_of_submissions'], 1)
- self.assertEqual(response.data['last_submission_time'],
- form_b.last_submission_time.isoformat())
+ request = self.factory.get("/", **self.extra)
+ view = XFormViewSet.as_view({"get": "retrieve"})
+ response = view(request, pk=merged_dataset["id"])
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(merged_dataset["id"], response.data["formid"])
+ self.assertIn("is_merged_dataset", response.data)
+ self.assertTrue(response.data["is_merged_dataset"])
+ self.assertTrue(response.data["instances_with_geopoints"])
+ self.assertEqual(response.data["num_of_submissions"], 1)
+ self.assertEqual(
+ response.data["last_submission_time"],
+ form_b.last_submission_time.isoformat(),
+ )
def test_merged_datasets_form_json(self):
"""Test retrieving the XLSForm JSON of a merged dataset"""
# create a merged dataset
merged_dataset = self._create_merged_dataset()
- view = MergedXFormViewSet.as_view({'get': 'form'})
- request = self.factory.get('/', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='json')
+ view = MergedXFormViewSet.as_view({"get": "form"})
+ request = self.factory.get("/", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="json")
self.assertEqual(response.status_code, 200)
response.render()
- self.assertEqual('application/json', response['Content-Type'])
+ self.assertEqual("application/json", response["Content-Type"])
data = json.loads(response.content)
self.assertIsInstance(data, dict)
- for key in ['children', 'id_string', 'name', 'default_language']:
+ for key in ["children", "id_string", "name", "default_language"]:
self.assertIn(key, data)
def test_merged_datasets_form_xml(self):
@@ -276,92 +275,102 @@ def test_merged_datasets_form_xml(self):
# create a merged dataset
merged_dataset = self._create_merged_dataset()
- view = MergedXFormViewSet.as_view({'get': 'form'})
- request = self.factory.get('/', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='xml')
+ view = MergedXFormViewSet.as_view({"get": "form"})
+ request = self.factory.get("/", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="xml")
self.assertEqual(response.status_code, 200)
response.render()
- self.assertEqual('text/xml; charset=utf-8', response['Content-Type'])
+ self.assertEqual("text/xml; charset=utf-8", response["Content-Type"])
def test_merged_datasets_data(self):
"""Test retrieving data of a merged dataset"""
merged_dataset = self._create_merged_dataset()
- request = self.factory.get('/', **self.extra)
- view = MergedXFormViewSet.as_view({'get': 'data'})
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
- detail_view = MergedXFormViewSet.as_view({
- 'get': 'retrieve',
- })
- xform_detail_view = XFormViewSet.as_view({
- 'get': 'retrieve',
- })
-
- response = view(request, pk=merged_dataset['id'])
+ request = self.factory.get("/", **self.extra)
+ view = MergedXFormViewSet.as_view({"get": "data"})
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
+ detail_view = MergedXFormViewSet.as_view(
+ {
+ "get": "retrieve",
+ }
+ )
+ xform_detail_view = XFormViewSet.as_view(
+ {
+ "get": "retrieve",
+ }
+ )
+
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
# check num_of_submissions
- response = detail_view(request, pk=merged_dataset['id'])
+ response = detail_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data['num_of_submissions'], 0)
+ self.assertEqual(response.data["num_of_submissions"], 0)
# make submission to form a
form_a = merged_xform.xforms.all()[0]
xml = 'orange'
Instance(xform=form_a, xml=xml).save()
- response = view(request, pk=merged_dataset['id'])
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- fruit = [d['fruit'] for d in response.data]
- expected_fruit = ['orange']
+ fruit = [d["fruit"] for d in response.data]
+ expected_fruit = ["orange"]
self.assertEqual(fruit, expected_fruit)
# check num_of_submissions
- response = detail_view(request, pk=merged_dataset['id'])
+ response = detail_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data['num_of_submissions'], 1)
+ self.assertEqual(response.data["num_of_submissions"], 1)
# make submission to form b
form_b = merged_xform.xforms.all()[1]
xml = 'mango'
last_submission = Instance(xform=form_b, xml=xml)
last_submission.save()
- response = view(request, pk=merged_dataset['id'])
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
- fruit = [d['fruit'] for d in response.data]
- expected_fruit = ['orange', 'mango']
+ fruit = [d["fruit"] for d in response.data]
+ expected_fruit = ["orange", "mango"]
self.assertEqual(fruit, expected_fruit)
# check num_of_submissions /merged-datasets/[pk]
- response = detail_view(request, pk=merged_dataset['id'])
+ response = detail_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data['num_of_submissions'], 2)
+ self.assertEqual(response.data["num_of_submissions"], 2)
# check last_submission_time
- self.assertEqual(response.data['last_submission_time'],
- last_submission.date_created.isoformat())
+ self.assertEqual(
+ response.data["last_submission_time"],
+ last_submission.date_created.isoformat(),
+ )
# check num_of_submissions /forms/[pk]
- response = xform_detail_view(request, pk=merged_dataset['id'])
+ response = xform_detail_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data['num_of_submissions'], 2)
+ self.assertEqual(response.data["num_of_submissions"], 2)
# check last_submission_time
- self.assertEqual(response.data['last_submission_time'],
- last_submission.date_created.isoformat())
+ self.assertEqual(
+ response.data["last_submission_time"],
+ last_submission.date_created.isoformat(),
+ )
def test_md_data_viewset(self):
"""Test retrieving data of a merged dataset at the /data endpoint"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
- request = self.factory.get('/', **self.extra)
- data_view = DataViewSet.as_view({
- 'get': 'list',
- })
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
+ request = self.factory.get("/", **self.extra)
+ data_view = DataViewSet.as_view(
+ {
+ "get": "list",
+ }
+ )
# make submission to form a
form_a = merged_xform.xforms.all()[0]
@@ -369,12 +378,12 @@ def test_md_data_viewset(self):
Instance(xform=form_a, xml=xml).save()
# DataViewSet /data/[pk] endpoint
- response = data_view(request, pk=merged_dataset['id'])
+ response = data_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- fruit = [d['fruit'] for d in response.data]
- expected_fruit = ['orange']
+ fruit = [d["fruit"] for d in response.data]
+ expected_fruit = ["orange"]
self.assertEqual(fruit, expected_fruit)
# make submission to form b
@@ -383,56 +392,56 @@ def test_md_data_viewset(self):
Instance(xform=form_b, xml=xml).save()
# DataViewSet /data/[pk] endpoint
- response = data_view(request, pk=merged_dataset['id'])
+ response = data_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
- dataid = response.data[0]['_id']
+ dataid = response.data[0]["_id"]
- fruit = [d['fruit'] for d in response.data]
- expected_fruit = ['orange', 'mango']
+ fruit = [d["fruit"] for d in response.data]
+ expected_fruit = ["orange", "mango"]
self.assertEqual(fruit, expected_fruit)
# DataViewSet /data/[pk]/[dataid] endpoint
- data_view = DataViewSet.as_view({
- 'get': 'retrieve',
- })
- response = data_view(request, pk=merged_dataset['id'], dataid=dataid)
+ data_view = DataViewSet.as_view(
+ {
+ "get": "retrieve",
+ }
+ )
+ response = data_view(request, pk=merged_dataset["id"], dataid=dataid)
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data['fruit'], 'orange')
+ self.assertEqual(response.data["fruit"], "orange")
def test_deleted_forms(self):
"""Test retrieving data of a merged dataset with no forms linked."""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
merged_xform.xforms.all().delete()
request = self.factory.get(
- '/',
- data={
- 'sort': '{"_submission_time":1}',
- 'limit': '10'
- },
- **self.extra)
- data_view = DataViewSet.as_view({
- 'get': 'list',
- })
+ "/", data={"sort": '{"_submission_time":1}', "limit": "10"}, **self.extra
+ )
+ data_view = DataViewSet.as_view(
+ {
+ "get": "list",
+ }
+ )
# DataViewSet /data/[pk] endpoint
- response = data_view(request, pk=merged_dataset['id'])
+ response = data_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200, response.data)
self.assertEqual(response.data, [])
- data = {'field_name': 'fruit'}
- view = ChartsViewSet.as_view({'get': 'retrieve'})
+ data = {"field_name": "fruit"}
+ view = ChartsViewSet.as_view({"get": "retrieve"})
- request = self.factory.get('/charts', data, **self.extra)
- response = view(request, pk=merged_dataset['id'], format='html')
+ request = self.factory.get("/charts", data, **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="html")
self.assertEqual(response.status_code, 200)
- self.assertEqual(response.data['data'].__len__(), 0)
+ self.assertEqual(response.data["data"].__len__(), 0)
def test_md_geojson_response(self):
"""Test geojson response of a merged dataset"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
_make_submissions_merged_datasets(merged_xform)
@@ -444,125 +453,148 @@ def test_md_geojson_response(self):
instance.deleted_by = self.user
instance.save()
- view = MergedXFormViewSet.as_view({'get': 'data'})
+ view = MergedXFormViewSet.as_view({"get": "data"})
- request = self.factory.get('/', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='geojson')
+ request = self.factory.get("/", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="geojson")
self.assertEqual(response.status_code, 200)
# we get correct content type
headers = dict(response.items())
self.assertEqual(headers["Content-Type"], "application/geo+json")
- del response.data['features'][0]['properties']['xform']
- del response.data['features'][1]['properties']['xform']
- del response.data['features'][0]['properties']['id']
- del response.data['features'][1]['properties']['id']
+ del response.data["features"][0]["properties"]["xform"]
+ del response.data["features"][1]["properties"]["xform"]
+ del response.data["features"][0]["properties"]["id"]
+ del response.data["features"][1]["properties"]["id"]
self.assertEqual(
- {'type': 'FeatureCollection',
- 'features':
- [{'type': 'Feature', 'geometry': None, 'properties': {}},
- {'type': 'Feature', 'geometry': None, 'properties': {}}]},
- response.data
+ {
+ "type": "FeatureCollection",
+ "features": [
+ {"type": "Feature", "geometry": None, "properties": {}},
+ {"type": "Feature", "geometry": None, "properties": {}},
+ ],
+ },
+ response.data,
)
# pagination works ok!
- request = self.factory.get('/?page=1&page_size=1', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='geojson')
+ request = self.factory.get("/?page=1&page_size=1", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="geojson")
self.assertEqual(response.status_code, 200)
- del response.data['features'][0]['properties']['xform']
- del response.data['features'][0]['properties']['id']
+ del response.data["features"][0]["properties"]["xform"]
+ del response.data["features"][0]["properties"]["id"]
self.assertEqual(
- {'type': 'FeatureCollection',
- 'features':
- [{'type': 'Feature', 'geometry': None, 'properties': {}}]},
- response.data
+ {
+ "type": "FeatureCollection",
+ "features": [{"type": "Feature", "geometry": None, "properties": {}}],
+ },
+ response.data,
)
- request = self.factory.get('/?page=2&page_size=1', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='geojson')
+ request = self.factory.get("/?page=2&page_size=1", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="geojson")
self.assertEqual(response.status_code, 200)
- del response.data['features'][0]['properties']['xform']
- del response.data['features'][0]['properties']['id']
+ del response.data["features"][0]["properties"]["xform"]
+ del response.data["features"][0]["properties"]["id"]
self.assertEqual(
- {'type': 'FeatureCollection',
- 'features':
- [{'type': 'Feature', 'geometry': None, 'properties': {}}]},
- response.data
+ {
+ "type": "FeatureCollection",
+ "features": [{"type": "Feature", "geometry": None, "properties": {}}],
+ },
+ response.data,
)
# fields argument is applied correctly
- request = self.factory.get('/?page=1&page_size=1&fields=fruit', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='geojson')
+ request = self.factory.get("/?page=1&page_size=1&fields=fruit", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="geojson")
self.assertEqual(response.status_code, 200)
- del response.data['features'][0]['properties']['xform']
- del response.data['features'][0]['properties']['id']
+ del response.data["features"][0]["properties"]["xform"]
+ del response.data["features"][0]["properties"]["id"]
self.assertEqual(
- {'type': 'FeatureCollection',
- 'features':
- [{'type': 'Feature', 'geometry': None,
- 'properties': {'fruit': 'orange'}}]},
- response.data
+ {
+ "type": "FeatureCollection",
+ "features": [
+ {
+ "type": "Feature",
+ "geometry": None,
+ "properties": {"fruit": "orange"},
+ }
+ ],
+ },
+ response.data,
)
# Invalid page error when we reqeust for a non-existent page
- request = self.factory.get('/?page=10&page_size=1&fields=fruit', **self.extra)
- response = view(request, pk=merged_dataset['id'], format='geojson')
+ request = self.factory.get("/?page=10&page_size=1&fields=fruit", **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="geojson")
self.assertEqual(response.status_code, 404)
- self.assertEqual(
- {'detail': 'Invalid page.'},
- response.data
- )
+ self.assertEqual({"detail": "Invalid page."}, response.data)
def test_md_csv_export(self):
"""Test CSV export of a merged dataset"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
_make_submissions_merged_datasets(merged_xform)
# merged dataset should be available at api/forms/[pk] endpoint
- request = self.factory.get('/', **self.extra)
- view = XFormViewSet.as_view({'get': 'retrieve'})
- response = view(request, pk=merged_dataset['id'], format='csv')
+ request = self.factory.get("/", **self.extra)
+ view = XFormViewSet.as_view({"get": "retrieve"})
+ response = view(request, pk=merged_dataset["id"], format="csv")
self.assertEqual(response.status_code, 200)
- csv_file_obj = StringIO(''.join(
- [c.decode('utf-8') for c in response.streaming_content]))
+ csv_file_obj = StringIO(
+ "".join([c.decode("utf-8") for c in response.streaming_content])
+ )
csv_reader = csv.reader(csv_file_obj)
# jump over headers first
headers = next(csv_reader)
- self.assertEqual(headers, [
- 'fruit', 'meta/instanceID', '_id', '_uuid', '_submission_time',
- '_date_modified', '_tags', '_notes', '_version', '_duration',
- '_submitted_by', '_total_media', '_media_count',
- '_media_all_received'])
+ self.assertEqual(
+ headers,
+ [
+ "fruit",
+ "meta/instanceID",
+ "_id",
+ "_uuid",
+ "_submission_time",
+ "_date_modified",
+ "_tags",
+ "_notes",
+ "_version",
+ "_duration",
+ "_submitted_by",
+ "_total_media",
+ "_media_count",
+ "_media_all_received",
+ ],
+ )
row1 = next(csv_reader)
- self.assertEqual(row1[0], 'orange')
+ self.assertEqual(row1[0], "orange")
row2 = next(csv_reader)
- self.assertEqual(row2[0], 'mango')
+ self.assertEqual(row2[0], "mango")
def test_get_osm_data_kwargs(self):
"""
Test get_osm_data_kwargs returns correct kwargs for a merged dataset.
"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
- pks = [_ for _ in merged_xform.xforms.values_list('id', flat=True)]
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
+ pks = [_ for _ in merged_xform.xforms.values_list("id", flat=True)]
kwargs = get_osm_data_kwargs(merged_xform)
- self.assertEqual(kwargs, {
- 'instance__deleted_at__isnull': True,
- 'instance__xform_id__in': pks
- })
+ self.assertEqual(
+ kwargs,
+ {"instance__deleted_at__isnull": True, "instance__xform_id__in": pks},
+ )
xform = merged_xform.xforms.all()[0]
kwargs = get_osm_data_kwargs(xform)
- self.assertEqual(kwargs, {
- 'instance__deleted_at__isnull': True,
- 'instance__xform_id': xform.pk
- })
+ self.assertEqual(
+ kwargs,
+ {"instance__deleted_at__isnull": True, "instance__xform_id": xform.pk},
+ )
# pylint: disable=invalid-name
def test_merged_with_attachment_endpoint(self):
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
_make_submissions_merged_datasets(merged_xform)
# Attachment viewset works ok for filtered datasets
@@ -571,41 +603,40 @@ def test_merged_with_attachment_endpoint(self):
for instance in all_instances:
_add_attachments_to_instances(instance)
request = self.factory.get(
- "/?merged_xform=" + str(merged_xform.pk),
- **self.extra)
+ "/?merged_xform=" + str(merged_xform.pk), **self.extra
+ )
response = attachment_list_view(request)
serialized_attachments = AttachmentSerializer(
- Attachment.objects.filter(
- instance__xform__in=merged_xform.xforms.all()),
- many=True, context={'request': request}).data
- self.assertEqual(
- response.data,
- serialized_attachments)
+ Attachment.objects.filter(instance__xform__in=merged_xform.xforms.all()),
+ many=True,
+ context={"request": request},
+ ).data
+ self.assertEqual(response.data, serialized_attachments)
def test_merged_dataset_charts(self):
"""Test /charts endpoint for a merged dataset works"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
_make_submissions_merged_datasets(merged_xform)
- data = {'field_name': 'fruit'}
- view = ChartsViewSet.as_view({'get': 'retrieve'})
+ data = {"field_name": "fruit"}
+ view = ChartsViewSet.as_view({"get": "retrieve"})
- request = self.factory.get('/charts', data, **self.extra)
- response = view(request, pk=merged_dataset['id'], format='html')
+ request = self.factory.get("/charts", data, **self.extra)
+ response = view(request, pk=merged_dataset["id"], format="html")
self.assertEqual(response.status_code, 200)
- self.assertNotEqual(response.get('Cache-Control'), None)
- self.assertEqual(response.data['field_type'], 'select one')
- self.assertEqual(response.data['field_name'], 'fruit')
- self.assertEqual(response.data['data_type'], 'categorized')
- self.assertEqual(response.data['data'][0]['fruit'], 'Mango')
- self.assertEqual(response.data['data'][1]['fruit'], 'Orange')
+ self.assertNotEqual(response.get("Cache-Control"), None)
+ self.assertEqual(response.data["field_type"], "select one")
+ self.assertEqual(response.data["field_name"], "fruit")
+ self.assertEqual(response.data["data_type"], "categorized")
+ self.assertEqual(response.data["data"][0]["fruit"], "Mango")
+ self.assertEqual(response.data["data"][1]["fruit"], "Orange")
def test_submissions_not_allowed(self):
"""Test submissions to a merged form is not allowed"""
merged_dataset = self._create_merged_dataset()
- merged_xform = XForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = XForm.objects.get(pk=merged_dataset["id"])
# make submission to form a
xml = 'orange'
@@ -615,24 +646,23 @@ def test_submissions_not_allowed(self):
def test_openrosa_form_list(self):
"""Test merged dataset form is not included in /formList"""
merged_dataset = self._create_merged_dataset()
- merged_xform = XForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = XForm.objects.get(pk=merged_dataset["id"])
view = XFormListViewSet.as_view({"get": "list"})
- request = self.factory.get('/')
+ request = self.factory.get("/")
response = view(request, username=self.user.username)
self.assertEqual(response.status_code, 200)
- self.assertNotIn(merged_xform.id_string,
- [_['formID'] for _ in response.data])
+ self.assertNotIn(merged_xform.id_string, [_["formID"] for _ in response.data])
def test_open_data(self):
"""Test OpenDataViewSet data endpoint"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
_make_submissions_merged_datasets(merged_xform)
- xform = XForm.objects.get(pk=merged_dataset['id'])
- view = OpenDataViewSet.as_view({'get': 'data'})
+ xform = XForm.objects.get(pk=merged_dataset["id"])
+ view = OpenDataViewSet.as_view({"get": "data"})
_open_data = get_or_create_opendata(xform)[0]
uuid = _open_data.uuid
- request = self.factory.get('/', **self.extra)
+ request = self.factory.get("/", **self.extra)
response = view(request, uuid=uuid)
self.assertEqual(response.status_code, 200)
# cast generator response to list so that we can get the response count
@@ -644,21 +674,20 @@ def test_filtered_dataset(self):
the linked forms.
"""
merged_dataset = self._create_merged_dataset()
- xform = XForm.objects.get(pk=merged_dataset['id'])
+ xform = XForm.objects.get(pk=merged_dataset["id"])
_make_submissions_merged_datasets(xform.mergedxform)
self.assertTrue(xform.is_merged_dataset)
data = {
- 'name': "My DataView",
- 'xform': 'http://testserver/api/v1/forms/%s' % xform.pk,
- 'project':
- 'http://testserver/api/v1/projects/%s' % xform.project.pk,
+ "name": "My DataView",
+ "xform": "http://testserver/api/v1/forms/%s" % xform.pk,
+ "project": "http://testserver/api/v1/projects/%s" % xform.project.pk,
# ensure there's an attachment column(photo) in you dataview
- 'columns': '["fruit"]'
+ "columns": '["fruit"]',
}
- view = DataViewViewSet.as_view({'get': 'data'})
+ view = DataViewViewSet.as_view({"get": "data"})
self._create_dataview(data=data, project=xform.project, xform=xform)
- request = self.factory.get('/', **self.extra)
+ request = self.factory.get("/", **self.extra)
response = view(request, pk=self.data_view.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
@@ -669,15 +698,15 @@ def test_rest_service(self):
"""
count = RestService.objects.count()
merged_dataset = self._create_merged_dataset()
- xform = XForm.objects.get(pk=merged_dataset['id'])
- view = RestServicesViewSet.as_view({'post': 'create'})
+ xform = XForm.objects.get(pk=merged_dataset["id"])
+ view = RestServicesViewSet.as_view({"post": "create"})
post_data = {
"name": "generic_json",
"service_url": "http://crunch.goodbot.ai",
- "xform": xform.pk
+ "xform": xform.pk,
}
- request = self.factory.post('/', data=post_data, **self.extra)
+ request = self.factory.post("/", data=post_data, **self.extra)
response = view(request)
self.assertEqual(response.status_code, 201)
@@ -693,70 +722,71 @@ def test_md_has_deleted_xforms(self):
"""
Test creating a merged dataset that includes a soft deleted form.
"""
- view = MergedXFormViewSet.as_view({
- 'post': 'create',
- })
+ view = MergedXFormViewSet.as_view(
+ {
+ "post": "create",
+ }
+ )
# pylint: disable=attribute-defined-outside-init
self.project = get_user_default_project(self.user)
- xform1 = self._publish_markdown(MD, self.user, id_string='a')
- xform2 = self._publish_markdown(MD, self.user, id_string='b')
+ xform1 = self._publish_markdown(MD, self.user, id_string="a")
+ xform2 = self._publish_markdown(MD, self.user, id_string="b")
xform2.soft_delete()
data = {
- 'xforms': [
+ "xforms": [
"http://testserver/api/v1/forms/%s" % xform1.pk,
"http://testserver/api/v1/forms/%s" % xform2.pk,
],
- 'name':
- 'Merged Dataset',
- 'project':
- f"http://testserver/api/v1/projects/{self.project.pk}",
+ "name": "Merged Dataset",
+ "project": f"http://testserver/api/v1/projects/{self.project.pk}",
}
- request = self.factory.post('/', data=data, **self.extra)
+ request = self.factory.post("/", data=data, **self.extra)
response = view(request)
self.assertEqual(response.status_code, 400)
self.assertEqual(
- response.data,
- {'xforms': [u'Invalid hyperlink - Object does not exist.']})
+ response.data, {"xforms": ["Invalid hyperlink - Object does not exist."]}
+ )
def test_md_has_no_matching_fields(self):
"""
Test creating a merged dataset that has no matching fields.
"""
- view = MergedXFormViewSet.as_view({
- 'post': 'create',
- })
+ view = MergedXFormViewSet.as_view(
+ {
+ "post": "create",
+ }
+ )
# pylint: disable=attribute-defined-outside-init
self.project = get_user_default_project(self.user)
- xform1 = self._publish_markdown(MD, self.user, id_string='a')
- xform2 = self._publish_markdown(NOT_MATCHING, self.user, id_string='b')
+ xform1 = self._publish_markdown(MD, self.user, id_string="a")
+ xform2 = self._publish_markdown(NOT_MATCHING, self.user, id_string="b")
data = {
- 'xforms': [
+ "xforms": [
"http://testserver/api/v1/forms/%s" % xform1.pk,
"http://testserver/api/v1/forms/%s" % xform2.pk,
],
- 'name':
- 'Merged Dataset',
- 'project':
- f"http://testserver/api/v1/projects/{self.project.pk}",
+ "name": "Merged Dataset",
+ "project": f"http://testserver/api/v1/projects/{self.project.pk}",
}
- request = self.factory.post('/', data=data, **self.extra)
+ request = self.factory.post("/", data=data, **self.extra)
response = view(request)
self.assertEqual(response.status_code, 400)
- self.assertEqual(response.data,
- {'xforms': [u'No matching fields in xforms.']})
+ self.assertEqual(response.data, {"xforms": ["No matching fields in xforms."]})
def test_md_data_viewset_deleted_form(self):
"""Test retrieving data of a merged dataset with one form deleted"""
merged_dataset = self._create_merged_dataset()
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
- request = self.factory.get('/', **self.extra)
- data_view = DataViewSet.as_view({
- 'get': 'list',
- })
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
+ request = self.factory.get("/", **self.extra)
+ data_view = DataViewSet.as_view(
+ {
+ "get": "list",
+ }
+ )
# make submission to form a
form_a = merged_xform.xforms.all()[0]
@@ -764,12 +794,12 @@ def test_md_data_viewset_deleted_form(self):
Instance(xform=form_a, xml=xml).save()
# DataViewSet /data/[pk] endpoint
- response = data_view(request, pk=merged_dataset['id'])
+ response = data_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- fruit = [d['fruit'] for d in response.data]
- expected_fruit = ['orange']
+ fruit = [d["fruit"] for d in response.data]
+ expected_fruit = ["orange"]
self.assertEqual(fruit, expected_fruit)
# make submission to form b
@@ -778,18 +808,18 @@ def test_md_data_viewset_deleted_form(self):
Instance(xform=form_b, xml=xml).save()
# DataViewSet /data/[pk] endpoint
- response = data_view(request, pk=merged_dataset['id'])
+ response = data_view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
- dataid = response.data[0]['_id']
+ dataid = response.data[0]["_id"]
- fruit = [d['fruit'] for d in response.data]
- expected_fruit = ['orange', 'mango']
+ fruit = [d["fruit"] for d in response.data]
+ expected_fruit = ["orange", "mango"]
self.assertEqual(fruit, expected_fruit)
# DataViewSet /data/[pk] endpoint, form_a deleted
form_a.soft_delete()
- response = data_view(request, pk=merged_dataset['id'], dataid=dataid)
+ response = data_view(request, pk=merged_dataset["id"], dataid=dataid)
self.assertEqual(response.status_code, 404)
def test_xform_has_uncommon_reference(self):
@@ -797,41 +827,40 @@ def test_xform_has_uncommon_reference(self):
Test creating a merged dataset that has matching fields but with
uncommon reference variable.
"""
- view = MergedXFormViewSet.as_view({
- 'post': 'create',
- })
+ view = MergedXFormViewSet.as_view(
+ {
+ "post": "create",
+ }
+ )
# pylint: disable=attribute-defined-outside-init
self.project = get_user_default_project(self.user)
- xform1 = self._publish_markdown(MD, self.user, id_string='a')
- xform2 = self._publish_markdown(
- REFERENCE_ISSUE, self.user, id_string='b')
+ xform1 = self._publish_markdown(MD, self.user, id_string="a")
+ xform2 = self._publish_markdown(REFERENCE_ISSUE, self.user, id_string="b")
data = {
- 'xforms': [
+ "xforms": [
"http://testserver/api/v1/forms/%s" % xform2.pk,
"http://testserver/api/v1/forms/%s" % xform1.pk,
],
- 'name':
- 'Merged Dataset',
- 'project':
- f"http://testserver/api/v1/projects/{self.project.pk}",
+ "name": "Merged Dataset",
+ "project": f"http://testserver/api/v1/projects/{self.project.pk}",
}
- request = self.factory.post('/', data=data, **self.extra)
+ request = self.factory.post("/", data=data, **self.extra)
response = view(request)
self.assertEqual(response.status_code, 400)
error_message = (
"There has been a problem trying to replace ${tunda} with the "
"XPath to the survey element named 'tunda'. There is no survey "
- "element with this name.")
- self.assertIn('xforms', response.data)
- self.assertIn(error_message, response.data['xforms'])
+ "element with this name."
+ )
+ self.assertIn("xforms", response.data)
+ self.assertIn(error_message, response.data["xforms"])
def test_merged_datasets_deleted_parent_retrieve(self):
- """Test retrieving a specific merged dataset when the parent is deleted
- """
+ """Test retrieving a specific merged dataset when the parent is deleted"""
merged_dataset = self._create_merged_dataset(geo=True)
- merged_xform = MergedXForm.objects.get(pk=merged_dataset['id'])
+ merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"])
# make submission to form b
form_b = merged_xform.xforms.all()[1]
@@ -841,12 +870,12 @@ def test_merged_datasets_deleted_parent_retrieve(self):
form_b.refresh_from_db()
form_b.last_submission_time = instance.date_created
form_b.save()
- view = MergedXFormViewSet.as_view({'get': 'retrieve'})
+ view = MergedXFormViewSet.as_view({"get": "retrieve"})
# status_code is 200 when: pk exists, user is authenticated
- request = self.factory.get('/', **self.extra)
- response = view(request, pk=merged_dataset['id'])
+ request = self.factory.get("/", **self.extra)
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
# delete parents
@@ -854,12 +883,12 @@ def test_merged_datasets_deleted_parent_retrieve(self):
merged_xform.refresh_from_db()
# merged dataset should be available at api/forms/[pk] endpoint
- request = self.factory.get('/', **self.extra)
- view = XFormViewSet.as_view({'get': 'retrieve'})
- response = view(request, pk=merged_dataset['id'])
+ request = self.factory.get("/", **self.extra)
+ view = XFormViewSet.as_view({"get": "retrieve"})
+ response = view(request, pk=merged_dataset["id"])
self.assertEqual(response.status_code, 200)
- self.assertEqual(merged_dataset['id'], response.data['formid'])
- self.assertTrue(response.data['is_merged_dataset'])
- self.assertTrue(response.data['instances_with_geopoints'])
+ self.assertEqual(merged_dataset["id"], response.data["formid"])
+ self.assertTrue(response.data["is_merged_dataset"])
+ self.assertTrue(response.data["instances_with_geopoints"])
# deleted parents, 0 submissions
- self.assertEqual(response.data['num_of_submissions'], 0)
+ self.assertEqual(response.data["num_of_submissions"], 0)
diff --git a/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py b/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py
index 1dac6fba1b..98a99fd5e8 100644
--- a/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py
@@ -1,8 +1,9 @@
"""
Module containing test for the MessagingStatsViewset (api/v1/stats/messaging)
"""
+
import json
-from datetime import date
+from datetime import datetime, timezone
from django.test import RequestFactory
from onadata.apps.api.viewsets.messaging_stats_viewset import MessagingStatsViewSet
@@ -34,7 +35,7 @@ def test_filters(self):
"target_type": "xform",
"target_id": self.xform.id,
"group_by": "day",
- "timestamp__day": date.today().day,
+ "timestamp__day": datetime.now().day, # .astimezone(timezone.utc).day,
},
**self.extra,
)
@@ -49,7 +50,7 @@ def test_filters(self):
returned_data,
[
{
- "group": str(date.today()),
+ "group": str(datetime.now().astimezone(timezone.utc).date()),
"submission_created": self.xform.instances.count(),
}
],
@@ -61,7 +62,7 @@ def test_filters(self):
"target_type": "xform",
"target_id": self.xform.id,
"group_by": "day",
- "timestamp__day": date.today().day + 1,
+ "timestamp__day": datetime.now().astimezone(timezone.utc).day + 1,
},
**self.extra,
)
@@ -98,7 +99,7 @@ def test_filters(self):
returned_data,
[
{
- "group": str(date.today()),
+ "group": str(datetime.now().astimezone(timezone.utc).date()),
"submission_created": self.xform.instances.count(),
}
],
@@ -153,7 +154,7 @@ def test_expected_responses(self):
returned_data,
[
{
- "group": str(date.today()),
+ "group": str(datetime.now().astimezone(timezone.utc).date()),
"submission_created": self.xform.instances.count(),
}
],
diff --git a/onadata/apps/api/tests/viewsets/test_widget_viewset.py b/onadata/apps/api/tests/viewsets/test_widget_viewset.py
index 93ec7cf33a..0ed0605530 100644
--- a/onadata/apps/api/tests/viewsets/test_widget_viewset.py
+++ b/onadata/apps/api/tests/viewsets/test_widget_viewset.py
@@ -217,11 +217,17 @@ def test_list_widgets(self):
}
)
+ # empty - no xform filter
request = self.factory.get("/", **self.extra)
response = view(request)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(len(response.data), 0)
+ # not empty - xform filter
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
+ response = view(request)
self.assertEqual(response.status_code, 200)
- self.assertEqual(len(response.data), 2)
+ self.assertEqual(len(response.data), 1)
def test_widget_permission_create(self):
@@ -313,7 +319,7 @@ def test_widget_permission_list(self):
)
request = self.factory.get("/", **self.extra)
- response = view(request)
+ response = view(request, formid=self.xform.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
@@ -322,8 +328,7 @@ def test_widget_permission_list(self):
ReadOnlyRole.add(self.user, self.xform)
request = self.factory.get("/", **self.extra)
- response = view(request)
-
+ response = view(request, formid=self.xform.pk)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
@@ -498,7 +503,6 @@ def test_widget_data_public_form(self):
request = self.factory.get("/", **self.extra)
response = view(request, formid=self.xform.pk)
-
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
diff --git a/onadata/apps/api/viewsets/media_viewset.py b/onadata/apps/api/viewsets/media_viewset.py
index 11827bbb51..87f77b6050 100644
--- a/onadata/apps/api/viewsets/media_viewset.py
+++ b/onadata/apps/api/viewsets/media_viewset.py
@@ -34,9 +34,7 @@ class MediaViewSet(
):
"""A view to redirect to actual attachments url"""
- queryset = Attachment.objects.filter(
- instance__deleted_at__isnull=True, deleted_at__isnull=True
- )
+ queryset = Attachment.objects.filter(deleted_at__isnull=True)
filter_backends = (filters.AttachmentFilter, filters.AttachmentTypeFilter)
lookup_field = "pk"
permission_classes = (AttachmentObjectPermissions,)
diff --git a/onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py b/onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py
new file mode 100644
index 0000000000..19131cce2c
--- /dev/null
+++ b/onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py
@@ -0,0 +1,36 @@
+# Generated by Django 4.1 on 2024-04-15 14:00
+
+from django.conf import settings
+from django.db import migrations, models
+import django.db.models.deletion
+import onadata.apps.logger.models.instance
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("logger", "0012_add_instance_history_uuid_and_checksum_idx"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="attachment",
+ name="xform",
+ field=models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="xform_attachments",
+ to="logger.xform",
+ ),
+ ),
+ migrations.AddField(
+ model_name="attachment",
+ name="user",
+ field=models.ForeignKey(
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ ]
diff --git a/onadata/apps/logger/migrations/0014_populate_attachment_xform.py b/onadata/apps/logger/migrations/0014_populate_attachment_xform.py
new file mode 100644
index 0000000000..563b5f8c96
--- /dev/null
+++ b/onadata/apps/logger/migrations/0014_populate_attachment_xform.py
@@ -0,0 +1,37 @@
+# Generated by Django 4.2.11 on 2024-04-22 06:42
+
+from django.db import migrations
+
+
+def populate_attachment_xform(apps, schema_editor):
+ """Populate xform field for Attachments"""
+ Attachment = apps.get_model("logger", "Attachment")
+ queryset = Attachment.objects.filter(xform__isnull=True).values(
+ "pk", "instance__xform", "instance__user"
+ )
+ count = queryset.count()
+ print("Start populating attachment xform...")
+ print(f"Found {count} records")
+
+ for attachment in queryset.iterator(chunk_size=100):
+ # We do not want to trigger Model.save or any signal
+ # Queryset.update is a workaround to achieve this.
+ # Model.save and the post/pre signals may contain
+ # some side-effects which we are not interested in
+ Attachment.objects.filter(pk=attachment["pk"]).update(
+ xform=attachment["instance__xform"],
+ user=attachment["instance__user"],
+ )
+ count -= 1
+ print(f"{count} remaining")
+
+ print("Done populating attachment xform!")
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("logger", "0013_add_xform_to_logger_attachment"),
+ ]
+
+ operations = [migrations.RunPython(populate_attachment_xform)]
diff --git a/onadata/apps/logger/models/attachment.py b/onadata/apps/logger/models/attachment.py
index 5a58a186b3..e44f7d18a7 100644
--- a/onadata/apps/logger/models/attachment.py
+++ b/onadata/apps/logger/models/attachment.py
@@ -50,6 +50,13 @@ class Attachment(models.Model):
OSM = "osm"
+ xform = models.ForeignKey(
+ "logger.XForm",
+ related_name="xform_attachments",
+ on_delete=models.CASCADE,
+ null=True,
+ blank=True,
+ )
instance = models.ForeignKey(
"logger.Instance", related_name="attachments", on_delete=models.CASCADE
)
@@ -69,6 +76,12 @@ class Attachment(models.Model):
null=True,
on_delete=models.SET_NULL,
)
+ # submitted_by user
+ user = models.ForeignKey(
+ get_user_model(),
+ null=True,
+ on_delete=models.SET_NULL,
+ )
class Meta:
app_label = "logger"
diff --git a/onadata/apps/logger/models/instance.py b/onadata/apps/logger/models/instance.py
index 5b87600ebf..e84aafe019 100644
--- a/onadata/apps/logger/models/instance.py
+++ b/onadata/apps/logger/models/instance.py
@@ -823,6 +823,10 @@ def post_save_submission(sender, instance=None, created=False, **kwargs):
"""
if instance.deleted_at is not None:
_update_xform_submission_count_delete(instance)
+ # mark attachments also as deleted.
+ instance.attachments.filter(deleted_at__isnull=True).update(
+ deleted_at=instance.deleted_at, deleted_by=instance.deleted_by
+ )
if (
hasattr(settings, "ASYNC_POST_SUBMISSION_PROCESSING_ENABLED")
diff --git a/onadata/apps/messaging/tests/test_messaging_viewset.py b/onadata/apps/messaging/tests/test_messaging_viewset.py
index f0ffd719c9..3b807b6444 100644
--- a/onadata/apps/messaging/tests/test_messaging_viewset.py
+++ b/onadata/apps/messaging/tests/test_messaging_viewset.py
@@ -31,21 +31,20 @@ def _create_message(self, user=None):
"""
if not user:
user = _create_user()
- assign_perm('auth.change_user', user, user)
- view = MessagingViewSet.as_view({'post': 'create'})
+ assign_perm("auth.change_user", user, user)
+ view = MessagingViewSet.as_view({"post": "create"})
data = {
"message": "Hello World!",
"target_id": user.pk,
- "target_type": 'user',
+ "target_type": "user",
} # yapf: disable
- request = self.factory.post('/messaging', data)
+ request = self.factory.post("/messaging", data)
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 201, response.data)
self.assertDictContainsSubset(data, response.data)
# ensure that id and timestamp are returned
- self.assertTrue('id' and 'timestamp' in
- [text(x) for x in list(response.data)])
+ self.assertTrue("id" and "timestamp" in [text(x) for x in list(response.data)])
return response.data
def test_create_message(self):
@@ -60,17 +59,17 @@ def test_target_does_not_exist(self):
target that does not exist.
"""
user = _create_user()
- view = MessagingViewSet.as_view({'post': 'create'})
+ view = MessagingViewSet.as_view({"post": "create"})
data = {
"message": "Hello World!",
"target_id": 1000000000,
- "target_type": 'user',
+ "target_type": "user",
} # yapf: disable
- request = self.factory.post('/messaging', data)
+ request = self.factory.post("/messaging", data)
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 400, response.data)
- self.assertEqual(response.data['target_id'], 'target_id not found')
+ self.assertEqual(response.data["target_id"], "target_id not found")
def test_delete_message(self):
"""
@@ -78,12 +77,12 @@ def test_delete_message(self):
"""
user = _create_user()
message_data = self._create_message(user)
- view = MessagingViewSet.as_view({'delete': 'destroy'})
- request = self.factory.delete('/messaging/%s' % message_data['id'])
+ view = MessagingViewSet.as_view({"delete": "destroy"})
+ request = self.factory.delete("/messaging/%s" % message_data["id"])
force_authenticate(request, user=user)
- response = view(request=request, pk=message_data['id'])
+ response = view(request=request, pk=message_data["id"])
self.assertEqual(response.status_code, 204)
- self.assertFalse(Action.objects.filter(pk=message_data['id']).exists())
+ self.assertFalse(Action.objects.filter(pk=message_data["id"]).exists())
def test_list_messages(self):
"""
@@ -91,61 +90,60 @@ def test_list_messages(self):
"""
user = _create_user()
message_data = self._create_message(user)
- target_id = message_data['target_id']
- view = MessagingViewSet.as_view({'get': 'list'})
+ target_id = message_data["target_id"]
+ view = MessagingViewSet.as_view({"get": "list"})
# return data only when a target_type is provided
request = self.factory.get(
- '/messaging', {'target_type': 'user',
- 'target_id': target_id})
+ "/messaging", {"target_type": "user", "target_id": target_id}
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
- message_data.pop('target_id')
- message_data.pop('target_type')
+ message_data.pop("target_id")
+ message_data.pop("target_type")
self.assertEqual(len(response.data), 1)
self.assertEqual(dict(response.data[0]), message_data)
# returns empty list when a target type does not have any records
request = self.factory.get(
- '/messaging', {'target_type': 'xform',
- 'target_id': target_id})
+ "/messaging", {"target_type": "xform", "target_id": target_id}
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, [])
# return status 400 if both target_type and target_id are misssing
- request = self.factory.get('/messaging')
+ request = self.factory.get("/messaging")
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 400)
# returns 400 status when a target_id is missing
- request = self.factory.get('/messaging', {'target_type': 'user'})
+ request = self.factory.get("/messaging", {"target_type": "user"})
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 400)
- self.assertEqual(response.data,
- {u'detail': u"Parameter 'target_id' is missing."})
+ self.assertEqual(response.data, {"detail": "Parameter 'target_id' is missing."})
# returns 400 status when a target_type is missing
- request = self.factory.get('/messaging', {'target_id': target_id})
+ request = self.factory.get("/messaging", {"target_id": target_id})
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 400)
- self.assertEqual(response.data,
- {u'detail': u"Parameter 'target_type' is missing."})
+ self.assertEqual(
+ response.data, {"detail": "Parameter 'target_type' is missing."}
+ )
# returns 400 status when a target type is not known
request = self.factory.get(
- '/messaging', {'target_type': 'xyz',
- 'target_id': target_id})
+ "/messaging", {"target_type": "xyz", "target_id": target_id}
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 400)
- self.assertEqual(response.data,
- {u'detail': u'Unknown target_type xyz'})
+ self.assertEqual(response.data, {"detail": "Unknown target_type xyz"})
def test_retrieve_message(self):
"""
@@ -153,13 +151,13 @@ def test_retrieve_message(self):
"""
user = _create_user()
message_data = self._create_message(user)
- view = MessagingViewSet.as_view({'get': 'retrieve'})
- request = self.factory.get('/messaging/{}'.format(message_data['id']))
+ view = MessagingViewSet.as_view({"get": "retrieve"})
+ request = self.factory.get("/messaging/{}".format(message_data["id"]))
force_authenticate(request, user=user)
- response = view(request=request, pk=message_data['id'])
+ response = view(request=request, pk=message_data["id"])
self.assertEqual(response.status_code, 200)
- message_data.pop('target_id')
- message_data.pop('target_type')
+ message_data.pop("target_id")
+ message_data.pop("target_type")
self.assertDictEqual(response.data, message_data)
def test_authentication_required(self):
@@ -167,51 +165,47 @@ def test_authentication_required(self):
Test that authentication is required at all endpoints.
"""
# Test that the list endpoint requires authentication
- view1 = MessagingViewSet.as_view({'get': 'list'})
- request1 = self.factory.get('/messaging',
- {'target_type': 'xform',
- 'target_id': 1})
+ view1 = MessagingViewSet.as_view({"get": "list"})
+ request1 = self.factory.get(
+ "/messaging", {"target_type": "xform", "target_id": 1}
+ )
response1 = view1(request=request1)
self.assertEqual(response1.status_code, 401)
- self.assertEqual(response1.data, {
- u'detail':
- u"Authentication credentials were not provided."
- })
+ self.assertEqual(
+ response1.data, {"detail": "Authentication credentials were not provided."}
+ )
# Test that retrieve requires authentication
- view2 = MessagingViewSet.as_view({'get': 'retrieve'})
- request2 = self.factory.get('/messaging/1')
+ view2 = MessagingViewSet.as_view({"get": "retrieve"})
+ request2 = self.factory.get("/messaging/1")
response2 = view2(request=request2, pk=1)
self.assertEqual(response2.status_code, 401)
- self.assertEqual(response2.data, {
- u'detail':
- u"Authentication credentials were not provided."
- })
+ self.assertEqual(
+ response2.data, {"detail": "Authentication credentials were not provided."}
+ )
# Test that delete requires authentication
- view3 = MessagingViewSet.as_view({'delete': 'destroy'})
- request3 = self.factory.delete('/messaging/5')
+ view3 = MessagingViewSet.as_view({"delete": "destroy"})
+ request3 = self.factory.delete("/messaging/5")
response3 = view3(request=request3, pk=5)
self.assertEqual(response3.status_code, 401)
- self.assertEqual(response3.data, {
- u'detail':
- u"Authentication credentials were not provided."
- })
+ self.assertEqual(
+ response3.data, {"detail": "Authentication credentials were not provided."}
+ )
# Test that create requires authentication
- view4 = MessagingViewSet.as_view({'post': 'create'})
+ view4 = MessagingViewSet.as_view({"post": "create"})
data = {
"message": "Hello World!",
"target_id": 1,
- "target_type": 'user',
+ "target_type": "user",
} # yapf: disable
- request4 = self.factory.post('/messaging', data)
+ request4 = self.factory.post("/messaging", data)
response4 = view4(request=request4)
self.assertEqual(response4.status_code, 401)
- self.assertEqual(response4.data, {
- u'detail':
- u"Authentication credentials were not provided."
- })
+ self.assertEqual(
+ response4.data, {"detail": "Authentication credentials were not provided."}
+ )
def test_create_permissions(self):
"""
@@ -221,19 +215,19 @@ def test_create_permissions(self):
data = {
"message": "Hello World!",
"target_id": user.pk,
- "target_type": 'user',
+ "target_type": "user",
} # yapf: disable
- view = MessagingViewSet.as_view({'post': 'create'})
+ view = MessagingViewSet.as_view({"post": "create"})
- request = self.factory.post('/messaging', data)
+ request = self.factory.post("/messaging", data)
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 403)
- self.assertIn(u'You do not have permission', response.data['detail'])
+ self.assertIn("You do not have permission", response.data["detail"])
# assign add_user permissions
- assign_perm('auth.change_user', user, user)
- request = self.factory.post('/messaging', data)
+ assign_perm("auth.change_user", user, user)
+ request = self.factory.post("/messaging", data)
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 201)
@@ -243,17 +237,17 @@ def test_retrieve_permissions(self):
Test that correct permissions are required when retrieving a message
"""
user = _create_user()
- other_user = _create_user('anotheruser')
+ other_user = _create_user("anotheruser")
message_data = self._create_message(user)
- view = MessagingViewSet.as_view({'get': 'retrieve'})
- request = self.factory.get('/messaging/{}'.format(message_data['id']))
+ view = MessagingViewSet.as_view({"get": "retrieve"})
+ request = self.factory.get("/messaging/{}".format(message_data["id"]))
force_authenticate(request, user=other_user)
- response = view(request=request, pk=message_data['id'])
+ response = view(request=request, pk=message_data["id"])
self.assertEqual(response.status_code, 403)
- request = self.factory.get('/messaging/{}'.format(message_data['id']))
+ request = self.factory.get("/messaging/{}".format(message_data["id"]))
force_authenticate(request, user=user)
- response = view(request=request, pk=message_data['id'])
+ response = view(request=request, pk=message_data["id"])
self.assertEqual(response.status_code, 200)
def test_retrieve_pagination(self):
@@ -263,10 +257,10 @@ def test_retrieve_pagination(self):
self._create_message(user)
count += 1
- view = MessagingViewSet.as_view({'get': 'list'})
+ view = MessagingViewSet.as_view({"get": "list"})
request = self.factory.get(
- '/messaging', data={
- "target_type": "user", "target_id": user.pk})
+ "/messaging", data={"target_type": "user", "target_id": user.pk}
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200, response.data)
@@ -274,39 +268,43 @@ def test_retrieve_pagination(self):
# Test that the pagination query params paginate the responses
request = self.factory.get(
- '/messaging', data={
- "target_type": "user",
- "target_id": user.pk, "page_size": 2})
+ "/messaging",
+ data={"target_type": "user", "target_id": user.pk, "page_size": 2},
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200, response.data)
self.assertEqual(len(response.data), 2)
- self.assertIn('Link', response)
+ self.assertIn("Link", response)
self.assertEqual(
- response['Link'],
- (f'; rel="next",'
- ' ; rel="last"'))
+ response["Link"],
+ (
+ f"; rel="next",'
+ " ; rel="last"'
+ ),
+ )
# Test the retrieval threshold is respected
with override_settings(MESSAGE_RETRIEVAL_THRESHOLD=2):
request = self.factory.get(
- '/messaging', data={
- "target_type": "user", "target_id": user.pk
- }
+ "/messaging", data={"target_type": "user", "target_id": user.pk}
)
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200, response.data)
self.assertEqual(len(response.data), 2)
- self.assertIn('Link', response)
+ self.assertIn("Link", response)
self.assertEqual(
- response['Link'],
- (f'; rel="next",'
- ' ; rel="last"'))
+ response["Link"],
+ (
+ f"; rel="next",'
+ " ; rel="last"'
+ ),
+ )
@override_settings(USE_TZ=False)
def test_messaging_timestamp_filter(self):
@@ -317,265 +315,286 @@ def test_messaging_timestamp_filter(self):
message_one = self._create_message(user)
message_two = self._create_message(user)
- view = MessagingViewSet.as_view({'get': 'list'})
- message_one_timestamp = message_one['timestamp']
+ view = MessagingViewSet.as_view({"get": "list"})
+ message_one_timestamp = message_one["timestamp"]
target_id = user.id
request = self.factory.get(
- f'/messaging?timestamp={message_one_timestamp}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp={message_one_timestamp}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0].get('id'), message_one['id'])
+ self.assertEqual(response.data[0].get("id"), message_one["id"])
# Test able to filter using gt & gte lookups
request = self.factory.get(
- f'/messaging?timestamp__gt={message_one_timestamp}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__gt={message_one_timestamp}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0].get('id'), message_two['id'])
+ self.assertEqual(response.data[0].get("id"), message_two["id"])
request = self.factory.get(
- f'/messaging?timestamp__gte={message_one_timestamp}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__gte={message_one_timestamp}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Test able to filter using lt & lte lookups
- message_two_timestamp = message_two['timestamp']
+ message_two_timestamp = message_two["timestamp"]
request = self.factory.get(
- f'/messaging?timestamp__lt={message_two_timestamp}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__lt={message_two_timestamp}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
- self.assertEqual(
- response.data[0].get('id'), message_one['id'])
+ self.assertEqual(response.data[0].get("id"), message_one["id"])
- message_two_timestamp = message_two['timestamp']
+ message_two_timestamp = message_two["timestamp"]
request = self.factory.get(
- f'/messaging?timestamp__lte={message_two_timestamp}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__lte={message_two_timestamp}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Test able to use day filters
- day = Action.objects.get(
- id=message_one['id']).timestamp.day
+ day = Action.objects.get(id=message_one["id"]).timestamp.day
request = self.factory.get(
- f'/messaging?timestamp__day={day}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__day={day}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__day__gt={day}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__day__gt={day}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__day__gte={day}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__day__gte={day}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__day__lt={day}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__day__lt={day}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__day__lte={day}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__day__lte={day}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Test able to use month filters
- month = Action.objects.get(
- id=message_one['id']).timestamp.month
+ month = Action.objects.get(id=message_one["id"]).timestamp.month
request = self.factory.get(
- f'/messaging?timestamp__month={month}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__month={month}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__month__gt={month}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__month__gt={month}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__month__gte={month}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__month__gte={month}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__month__lt={month}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__month__lt={month}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__month__lte={month}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__month__lte={month}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Test able to use year filters
- year = Action.objects.get(
- id=message_one['id']).timestamp.year
+ year = Action.objects.get(id=message_one["id"]).timestamp.year
request = self.factory.get(
- f'/messaging?timestamp__year={year}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__year={year}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__year__gt={year}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__year__gt={year}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__year__gte={year}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__year__gte={year}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__year__lt={year}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__year__lt={year}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__year__lte={year}&'
- f'target_type=user&target_id={target_id}')
+ f"/messaging?timestamp__year__lte={year}&"
+ f"target_type=user&target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Test able to use hour & minute filters
- hour = Action.objects.get(
- id=message_one['id']).timestamp.hour
- minute = Action.objects.get(
- id=message_one['id']).timestamp.minute
+ hour = Action.objects.get(id=message_one["id"]).timestamp.hour
+ minute = Action.objects.get(id=message_one["id"]).timestamp.minute
request = self.factory.get(
- f'/messaging?timestamp__hour={hour}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__hour={hour}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__hour__lt={hour}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__hour__lt={hour}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__hour__gt={hour}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__hour__gt={hour}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__hour__lte={hour}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__hour__lte={hour}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__hour__gte={hour}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__hour__gte={hour}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__minute__gt={minute}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__minute__gt={minute}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__minute__lt={minute}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__minute__lt={minute}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)
request = self.factory.get(
- f'/messaging?timestamp__minute__gte={minute}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__minute__gte={minute}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
request = self.factory.get(
- f'/messaging?timestamp__minute__lte={minute}&target_type=user&'
- f'target_id={target_id}')
+ f"/messaging?timestamp__minute__lte={minute}&target_type=user&"
+ f"target_id={target_id}"
+ )
force_authenticate(request, user=user)
response = view(request=request)
self.assertEqual(response.status_code, 200)
diff --git a/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py b/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py
index af2052e068..a4e46ecc9b 100644
--- a/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py
+++ b/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py
@@ -108,7 +108,7 @@ def test_retrieve_textit_services(self):
_id = response_data.get("id")
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.view(request, pk=_id)
expected_dict = {
"name": "textit",
@@ -239,18 +239,18 @@ def test_delete(self):
def test_retrieve(self):
"""Test retrieving a service via API."""
rest = RestService(
- name="testservice", service_url="http://serviec.io", xform=self.xform
+ name="testservice", service_url="http://service.io", xform=self.xform
)
rest.save()
- request = self.factory.get("/", **self.extra)
+ request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra)
response = self.view(request, pk=rest.pk)
data = {
"id": rest.pk,
"xform": self.xform.pk,
"name": "testservice",
- "service_url": "http://serviec.io",
+ "service_url": "http://service.io",
"active": True,
"inactive_reason": "",
}
diff --git a/onadata/libs/filters.py b/onadata/libs/filters.py
index e210ad658f..22726b4a7d 100644
--- a/onadata/libs/filters.py
+++ b/onadata/libs/filters.py
@@ -332,9 +332,11 @@ def _add_instance_prefix_to_dataview_filter_kwargs(self, filter_kwargs):
return prefixed_filter_kwargs
- def _xform_filter(self, request, view, keyword):
+ def _xform_filter(self, request, view, keyword, queryset=None):
"""Use XForm permissions"""
xform = request.query_params.get("xform")
+ if xform is None and "xform" in request.data:
+ xform = request.data.get("xform")
dataview = request.query_params.get("dataview")
merged_xform = request.query_params.get("merged_xform")
filename = request.query_params.get("filename")
@@ -344,17 +346,19 @@ def _xform_filter(self, request, view, keyword):
if dataview:
int_or_parse_error(
dataview,
- "Invalid value for dataview ID. It must be a positive integer."
+ "Invalid value for dataview ID. It must be a positive integer.",
)
self.dataview = get_object_or_404(DataView, pk=dataview)
# filter with fitlered dataset query
dataview_kwargs = self._add_instance_prefix_to_dataview_filter_kwargs(
- get_filter_kwargs(self.dataview.query))
+ get_filter_kwargs(self.dataview.query)
+ )
xform_qs = XForm.objects.filter(pk=self.dataview.xform.pk)
elif merged_xform:
int_or_parse_error(
merged_xform,
- "Invalid value for Merged Dataset ID. It must be a positive integer.")
+ "Invalid value for Merged Dataset ID. It must be a positive integer.",
+ )
self.merged_xform = get_object_or_404(MergedXForm, pk=merged_xform)
xform_qs = self.merged_xform.xforms.all()
elif xform:
@@ -365,26 +369,41 @@ def _xform_filter(self, request, view, keyword):
xform_qs = XForm.objects.filter(pk=self.xform.pk)
public_forms = XForm.objects.filter(pk=self.xform.pk, shared_data=True)
elif filename:
- attachment_id = view.kwargs.get("pk")
- attachment = get_object_or_404(Attachment, pk=attachment_id)
- self.xform = attachment.instance.xform
+ attachment = get_object_or_404(Attachment, pk=view.kwargs.get("pk"))
+ self.xform = (
+ attachment.instance.xform
+ if attachment.xform is None
+ else attachment.xform
+ )
xform_qs = XForm.objects.filter(pk=self.xform.pk)
public_forms = XForm.objects.filter(pk=self.xform.pk, shared_data=True)
else:
- xform_qs = XForm.objects.all()
+ if queryset is not None and "pk" in view.kwargs:
+ xform_ids = list(
+ set(
+ queryset.filter(pk=view.kwargs.get("pk")).values_list(
+ f"{keyword}", flat=True
+ )
+ )
+ )
+ xform_qs = XForm.objects.filter(pk__in=xform_ids)
+ elif queryset is not None and "formid" in view.kwargs:
+ xform_qs = XForm.objects.filter(
+ pk=view.kwargs.get("formid"), deleted_at__isnull=True
+ )
+ else:
+ # No form filter supplied - return empty list.
+ xform_qs = XForm.objects.none()
xform_qs = xform_qs.filter(deleted_at=None)
if request.user.is_anonymous:
xforms = xform_qs.filter(shared_data=True)
else:
xforms = super().filter_queryset(request, xform_qs, view) | public_forms
- return {
- **{f"{keyword}__in": xforms},
- **dataview_kwargs
- }
+ return {**{f"{keyword}__in": xforms}, **dataview_kwargs}
def _xform_filter_queryset(self, request, queryset, view, keyword):
- kwarg = self._xform_filter(request, view, keyword)
+ kwarg = self._xform_filter(request, view, keyword, queryset)
return queryset.filter(**kwarg)
@@ -495,7 +514,7 @@ def filter_queryset(self, request, queryset, view):
# generate queries
xform_content_type = ContentType.objects.get_for_model(XForm)
- xform_kwarg = self._xform_filter(request, view, keyword)
+ xform_kwarg = self._xform_filter(request, view, keyword, queryset)
xform_kwarg["content_type"] = xform_content_type
project_content_type = ContentType.objects.get_for_model(Project)
@@ -530,16 +549,18 @@ class AttachmentFilter(XFormPermissionFilterMixin, ObjectPermissionsFilter):
"""Attachment filter."""
def filter_queryset(self, request, queryset, view):
- queryset = self._xform_filter_queryset(
- request, queryset, view, "instance__xform"
- )
+ queryset = self._xform_filter_queryset(request, queryset, view, "xform")
+ xform = getattr(self, "xform", None)
# Ensure queryset is filtered by XForm meta permissions
- xform_ids = set(queryset.values_list("instance__xform", flat=True))
- for xform_id in xform_ids:
- xform = XForm.objects.get(id=xform_id)
- user = request.user
+ if xform is None:
+ xform_ids = list(set(queryset.values_list("xform", flat=True)))
+ if xform_ids:
+ # only the first form xform_ids[0]
+ xform = XForm.objects.get(pk=xform_ids[0])
+
+ if xform is not None:
queryset = exclude_items_from_queryset_using_xform_meta_perms(
- xform, user, queryset
+ xform, request.user, queryset
)
instance_id = request.query_params.get("instance")
diff --git a/onadata/libs/permissions.py b/onadata/libs/permissions.py
index 0913f918fe..00d174a2b7 100644
--- a/onadata/libs/permissions.py
+++ b/onadata/libs/permissions.py
@@ -12,7 +12,6 @@
import six
from guardian.shortcuts import assign_perm, get_perms, get_users_with_perms, remove_perm
-from onadata.apps.logger.models.attachment import Attachment
from onadata.apps.logger.models.project import (
Project,
ProjectGroupObjectPermission,
@@ -571,15 +570,17 @@ def get_object_users_with_permissions(
except UserProfile.DoesNotExist:
profile = UserProfile.objects.create(user=user)
- result.append({
- "user": user.username if username else user,
- "first_name": user.first_name,
- "last_name": user.last_name,
- "role": get_role(permissions, obj),
- "is_org": is_organization(profile),
- "gravatar": profile.gravatar,
- "metadata": profile.metadata,
- })
+ result.append(
+ {
+ "user": user.username if username else user,
+ "first_name": user.first_name,
+ "last_name": user.last_name,
+ "role": get_role(permissions, obj),
+ "is_org": is_organization(profile),
+ "gravatar": profile.gravatar,
+ "metadata": profile.metadata,
+ }
+ )
return result
@@ -615,8 +616,6 @@ def exclude_items_from_queryset_using_xform_meta_perms(xform, user, queryset):
):
return queryset
if user.has_perm(CAN_VIEW_XFORM_DATA, xform):
- if queryset.model is Attachment:
- return queryset.exclude(~Q(instance__user=user), instance__xform=xform)
return queryset.exclude(~Q(user=user), xform=xform)
return queryset.none()
diff --git a/onadata/libs/serializers/attachment_serializer.py b/onadata/libs/serializers/attachment_serializer.py
index 174fc4ca6f..5b0c72acd6 100644
--- a/onadata/libs/serializers/attachment_serializer.py
+++ b/onadata/libs/serializers/attachment_serializer.py
@@ -52,7 +52,7 @@ class AttachmentSerializer(serializers.HyperlinkedModelSerializer):
download_url = serializers.SerializerMethodField()
small_download_url = serializers.SerializerMethodField()
medium_download_url = serializers.SerializerMethodField()
- xform = serializers.ReadOnlyField(source="instance.xform.pk")
+ xform = serializers.SerializerMethodField()
instance = serializers.PrimaryKeyRelatedField(queryset=Instance.objects.all())
filename = serializers.ReadOnlyField(source="media_file.name")
@@ -71,6 +71,16 @@ class Meta:
)
model = Attachment
+ @check_obj
+ def get_xform(self, obj):
+ """
+ Return xform_id - old forms xform id is in submission instance xform_id
+ """
+ if obj.xform is None:
+ return obj.instance.xform_id
+
+ return obj.xform_id
+
@check_obj
def get_download_url(self, obj):
"""
diff --git a/onadata/libs/utils/logger_tools.py b/onadata/libs/utils/logger_tools.py
index 412af0ac03..e90d745c8a 100644
--- a/onadata/libs/utils/logger_tools.py
+++ b/onadata/libs/utils/logger_tools.py
@@ -114,9 +114,11 @@ def create_xform_version(xform: XForm, user: User) -> XFormVersion:
versioned_xform = XFormVersion.objects.create(
xform=xform,
xls=xform.xls,
- json=xform.json
- if isinstance(xform.json, str)
- else json.dumps(xform.json),
+ json=(
+ xform.json
+ if isinstance(xform.json, str)
+ else json.dumps(xform.json)
+ ),
version=xform.version,
created_by=user,
xml=xform.xml,
@@ -421,17 +423,21 @@ def save_attachments(xform, instance, media_files, remove_deleted_media=False):
if len(filename) > 100:
raise AttachmentNameError(filename)
media_in_submission = filename in instance.get_expected_media() or [
- instance.xml.decode("utf-8").find(filename) != -1
- if isinstance(instance.xml, bytes)
- else instance.xml.find(filename) != -1
+ (
+ instance.xml.decode("utf-8").find(filename) != -1
+ if isinstance(instance.xml, bytes)
+ else instance.xml.find(filename) != -1
+ )
]
if media_in_submission:
Attachment.objects.get_or_create(
+ xform=xform,
instance=instance,
media_file=f,
mimetype=content_type,
name=filename,
extension=extension,
+ user=instance.user,
)
if remove_deleted_media:
instance.soft_delete_attachments()