Skip to content

Commit

Permalink
add ordered argument to insert method
Browse files Browse the repository at this point in the history
  • Loading branch information
omaxx committed Sep 30, 2021
1 parent 5fe9436 commit 79e0366
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 17 deletions.
55 changes: 39 additions & 16 deletions mongoengine/queryset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,12 @@ def first(self):
return result

def insert(
self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None
self,
doc_or_docs,
load_bulk=True,
write_concern=None,
signal_kwargs=None,
ordered=True,
):
"""bulk insert documents
Expand All @@ -309,6 +314,11 @@ def insert(
each server being written to.
:param signal_kwargs: (optional) kwargs dictionary to be passed to
the signal calls.
:param ordered (optional): If True (the default) documents will be
inserted on the server serially, in the order provided. If an error
occurs all remaining inserts are aborted. If False, documents will
be inserted on the server in arbitrary order, possibly in parallel,
and all document inserts will be attempted.
By default returns document instances, set ``load_bulk`` to False to
return just ``ObjectIds``
Expand Down Expand Up @@ -341,12 +351,14 @@ def insert(

with set_write_concern(self._collection, write_concern) as collection:
insert_func = collection.insert_many
insert_func_kwargs = {"ordered": ordered}
if return_one:
raw = raw[0]
insert_func = collection.insert_one
insert_func_kwargs = {}

try:
inserted_result = insert_func(raw)
inserted_result = insert_func(raw, **insert_func_kwargs)
ids = (
[inserted_result.inserted_id]
if return_one
Expand All @@ -358,6 +370,17 @@ def insert(
except pymongo.errors.BulkWriteError as err:
# inserting documents that already have an _id field will
# give huge performance debt or raise
if ordered:
inserted = err.details["nInserted"]
for doc, raw_doc in zip(docs[:inserted], raw[:inserted]):
doc.pk = raw_doc["_id"]
else:
not_writed_ids = [
error["op"]["_id"] for error in err.details["writeErrors"]
]
for doc, raw_doc in zip(docs, raw):
if raw_doc["_id"] not in not_writed_ids:
doc.pk = raw_doc["_id"]
message = "Bulk write error: (%s)"
raise BulkWriteError(message % err.details)
except pymongo.errors.OperationFailure as err:
Expand Down Expand Up @@ -1715,29 +1738,29 @@ def no_dereference(self):

def _item_frequencies_map_reduce(self, field, normalize=False):
map_func = """
function() {
var path = '{{~%(field)s}}'.split('.');
function() {{
var path = '{{{{~{field}}}}}'.split('.');
var field = this;
for (p in path) {
for (p in path) {{
if (typeof field != 'undefined')
field = field[path[p]];
else
break;
}
if (field && field.constructor == Array) {
field.forEach(function(item) {
}}
if (field && field.constructor == Array) {{
field.forEach(function(item) {{
emit(item, 1);
});
} else if (typeof field != 'undefined') {
}});
}} else if (typeof field != 'undefined') {{
emit(field, 1);
} else {
}} else {{
emit(null, 1);
}
}
""" % {
"field": field
}
}}
}}
""".format(
field=field
)
reduce_func = """
function(key, values) {
var total = 0;
Expand Down
29 changes: 28 additions & 1 deletion tests/queryset/test_queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from mongoengine import *
from mongoengine.connection import get_db
from mongoengine.context_managers import query_counter, switch_db
from mongoengine.errors import InvalidQueryError
from mongoengine.errors import BulkWriteError, InvalidQueryError
from mongoengine.mongodb_support import (
MONGODB_36,
get_mongodb_version,
Expand Down Expand Up @@ -1067,6 +1067,33 @@ class Comment(Document):
com2 = Comment(id=1)
Comment.objects.insert([com1, com2])

def test_bulk_insert_ordered(self):
class Comment(Document):
name = StringField(unique=True)

Comment.drop_collection()
Comment.objects.insert(Comment(name="b"), ordered=True)
comments = [Comment(name="a"), Comment(name="b"), Comment(name="c")]
with pytest.raises(BulkWriteError):
Comment.objects.insert(comments, ordered=True)
Comment.objects.get(name="a")
with pytest.raises(DoesNotExist):
Comment.objects.get(name="c")
assert comments[0].pk is not None
assert comments[1].pk is None
assert comments[2].pk is None

Comment.drop_collection()
Comment.objects.insert(Comment(name="b"), ordered=False)
comments = [Comment(name="a"), Comment(name="b"), Comment(name="c")]
with pytest.raises(BulkWriteError):
Comment.objects.insert(comments, ordered=False)
Comment.objects.get(name="a")
Comment.objects.get(name="c")
assert comments[0].pk is not None
assert comments[1].pk is None
assert comments[2].pk is not None

def test_insert_raise_if_duplicate_in_constraint(self):
class Comment(Document):
id = IntField(primary_key=True)
Expand Down

0 comments on commit 79e0366

Please sign in to comment.