From 23059d56bb7e2931174748e867fde0d5021ec1a6 Mon Sep 17 00:00:00 2001 From: Bastien Gerard Date: Fri, 27 Sep 2024 23:37:01 +0200 Subject: [PATCH 1/3] Use geonear or collstats as first steps and add warning in aggregate doc (to emphasize the flaws of current design in case of inheritance or skip/limit/etc as our pipeline will have an arbitrary ordering) --- mongoengine/queryset/base.py | 21 +++++++++- tests/queryset/test_queryset_aggregation.py | 43 +++++++++++++++++++-- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index 049aff22b..718e2c971 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -1343,6 +1343,14 @@ def from_json(self, json_data): def aggregate(self, pipeline, *suppl_pipeline, **kwargs): """Perform an aggregate function based on your queryset params + If the queryset contains a query or skip/limit/sort or if the target Document class + uses inheritance, this method will add steps prior to the provided pipeline in an arbitrary order. + This may affect the performance or outcome of the aggregation, so use it consciously. + + For complex/critical pipelines, we recommended to use the aggregation framework of Pymongo directly, + it is available through the collection object (YourDocument._collection.aggregate) and will guarantee + that you have full control on the pipeline. + :param pipeline: list of aggregation commands, see: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/ :param suppl_pipeline: unpacked list of pipeline (added to support deprecation of the old interface) @@ -1380,7 +1388,18 @@ def aggregate(self, pipeline, *suppl_pipeline, **kwargs): if self._skip is not None: initial_pipeline.append({"$skip": self._skip}) - final_pipeline = initial_pipeline + user_pipeline + # geoNear and collStats must be the first stages in the pipeline if present + first_step = [] + new_user_pipeline = [] + for step_step in user_pipeline: + if "$geoNear" in step_step: + first_step.append(step_step) + elif "$collStats" in step_step: + first_step.append(step_step) + else: + new_user_pipeline.append(step_step) + + final_pipeline = first_step + initial_pipeline + new_user_pipeline collection = self._collection if self._read_preference is not None or self._read_concern is not None: diff --git a/tests/queryset/test_queryset_aggregation.py b/tests/queryset/test_queryset_aggregation.py index ecfa0b6f3..040b38dd0 100644 --- a/tests/queryset/test_queryset_aggregation.py +++ b/tests/queryset/test_queryset_aggregation.py @@ -1,4 +1,3 @@ -import unittest import warnings from pymongo.read_preferences import ReadPreference @@ -294,6 +293,44 @@ class Person(Document): assert list(data) == [] + def test_aggregate_geo_near_used_as_initial_step_before_cls_implicit_step(self): + class BaseClass(Document): + meta = {"allow_inheritance": True} -if __name__ == "__main__": - unittest.main() + class Aggr(BaseClass): + name = StringField() + c = PointField() + + BaseClass.drop_collection() + + x = Aggr(name="X", c=[10.634584, 35.8245029]).save() + y = Aggr(name="Y", c=[10.634584, 35.8245029]).save() + + pipeline = [ + { + "$geoNear": { + "near": {"type": "Point", "coordinates": [10.634584, 35.8245029]}, + "distanceField": "c", + "spherical": True, + } + } + ] + res = list(Aggr.objects.aggregate(*pipeline)) + assert res == [ + {"_cls": "BaseClass.Aggr", "_id": x.id, "c": 0.0, "name": "X"}, + {"_cls": "BaseClass.Aggr", "_id": y.id, "c": 0.0, "name": "Y"}, + ] + + def test_aggregate_collstats_used_as_initial_step_before_cls_implicit_step(self): + class SomeDoc(Document): + name = StringField() + + SomeDoc.drop_collection() + + SomeDoc(name="X").save() + SomeDoc(name="Y").save() + + pipeline = [{"$collStats": {"count": {}}}] + res = list(SomeDoc.objects.aggregate(pipeline)) + assert len(res) == 1 + assert res[0]["count"] == 2 From 3b6d437c4f864bf3c5d5fd03838e07a1a17d4389 Mon Sep 17 00:00:00 2001 From: Bastien Gerard Date: Fri, 27 Sep 2024 23:39:17 +0200 Subject: [PATCH 2/3] update changelog --- docs/changelog.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 4beaa4a48..163d89d04 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -12,6 +12,7 @@ Development - make sure to read https://www.mongodb.com/docs/manual/core/transactions-in-applications/#callback-api-vs-core-api - run_in_transaction context manager relies on Pymongo coreAPI, it will retry automatically in case of `UnknownTransactionCommitResult` but not `TransientTransactionError` exceptions - Using .count() in a transaction will always use Collection.count_document (as estimated_document_count is not supported in transactions) +- Fix use of $geoNear or $collStats in aggregate #2493 Changes in 0.29.0 ================= From 25d759d305ea8c9d6acc1af6fdda92eeca387345 Mon Sep 17 00:00:00 2001 From: Bastien Gerard Date: Sat, 28 Sep 2024 12:19:09 +0200 Subject: [PATCH 3/3] Fix deprecated use of pipeline args --- tests/queryset/test_queryset_aggregation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queryset/test_queryset_aggregation.py b/tests/queryset/test_queryset_aggregation.py index 789a639aa..7e390e35a 100644 --- a/tests/queryset/test_queryset_aggregation.py +++ b/tests/queryset/test_queryset_aggregation.py @@ -354,7 +354,7 @@ class Aggr(BaseClass): } } ] - res = list(Aggr.objects.aggregate(*pipeline)) + res = list(Aggr.objects.aggregate(pipeline)) assert res == [ {"_cls": "BaseClass.Aggr", "_id": x.id, "c": 0.0, "name": "X"}, {"_cls": "BaseClass.Aggr", "_id": y.id, "c": 0.0, "name": "Y"},