From 814ace612b0a01cc7c623207ac7348df52956ea2 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Mon, 25 May 2015 12:31:38 +0300 Subject: [PATCH 01/18] Handle update_many * Update filter_objects to re-query only for first=True * BaseMixin._update_many now uses Query.update if passed items are Query instance and are not limit()'ed, offset() and so on. * Connect signal handler to reindex objects on update_many --- nefertari_sqla/documents.py | 23 +++++++++++++++++++---- nefertari_sqla/signals.py | 21 +++++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 2aad5cb..4591200 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -6,6 +6,7 @@ from sqlalchemy.orm.collections import InstrumentedList from sqlalchemy.exc import InvalidRequestError, IntegrityError from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound +from sqlalchemy.orm.query import Query from pyramid_sqlalchemy import Session, BaseObject from nefertari.json_httpexceptions import ( @@ -239,10 +240,9 @@ def filter_objects(cls, objects, first=False, **params): if first: params['_limit'] = 1 params['__raise_on_empty'] = True - params['query_set'] = query_set.from_self() - query_set = cls.get_collection(**params) + params['query_set'] = query_set.from_self() + query_set = cls.get_collection(**params) - if first: first_obj = query_set.first() if not first_obj: msg = "'{}({}={})' resource not found".format( @@ -479,7 +479,22 @@ def _delete_many(cls, items): session.flush() @classmethod - def _update_many(cls, items, **params): + def _update_many(cls, items, synchronize_session='fetch', **params): + """ Update :items: queryset or objects list. + + When queryset passed, Query.update() is used to update it. Note that + queryset may not jave limit(), offset(), order_by(), group_by(), or + distinct() called on it. + + Or some of above methods were called, or :items: is not a Query + instance, one-by-one items update is performed. + """ + if isinstance(items, Query): + try: + return items.update( + params, synchronize_session=synchronize_session) + except Exception as ex: + log.error(str(ex)) for item in items: item.update(params) diff --git a/nefertari_sqla/signals.py b/nefertari_sqla/signals.py index 8546089..3b35ac3 100644 --- a/nefertari_sqla/signals.py +++ b/nefertari_sqla/signals.py @@ -3,6 +3,9 @@ from sqlalchemy import event from sqlalchemy.ext.declarative import DeclarativeMeta from sqlalchemy.orm import object_session, class_mapper +from pyramid_sqlalchemy import Session + +from nefertari.utils import to_dicts log = logging.getLogger(__name__) @@ -43,6 +46,21 @@ def on_after_delete(mapper, connection, target): es.index_refs(target) +def on_bulk_update(update_context): + objects = update_context.query.all() + if not objects: + return + + model_cls = type(objects[0]) + if not getattr(model_cls, '_index_enabled', False): + return + + from nefertari.elasticsearch import ES + es = ES(source=model_cls.__name__) + documents = to_dicts(objects) + es.index(documents) + + def setup_es_signals_for(source_cls): event.listen(source_cls, 'after_insert', on_after_insert) event.listen(source_cls, 'after_update', on_after_update) @@ -50,6 +68,9 @@ def setup_es_signals_for(source_cls): log.info('setup_sqla_es_signals_for: %r' % source_cls) +event.listen(Session, 'after_bulk_update', on_bulk_update) + + class ESMetaclass(DeclarativeMeta): def __init__(self, name, bases, attrs): self._index_enabled = True From 80952703c7080782cce8947fa925390af393b864 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Mon, 25 May 2015 13:30:14 +0300 Subject: [PATCH 02/18] Handle delete_many BaseMixin._delete_many now runs Query.delete() if passed items are a queryset. Added handler for after_delete_many signal to reindex deleted objects. --- nefertari_sqla/documents.py | 8 +++++++- nefertari_sqla/signals.py | 27 ++++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 4591200..07c2a43 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -472,7 +472,13 @@ def _delete(cls, **params): object_session(obj).delete(obj) @classmethod - def _delete_many(cls, items): + def _delete_many(cls, items, synchronize_session=False): + if isinstance(items, Query): + try: + return items.delete( + synchronize_session=synchronize_session) + except Exception as ex: + log.error(str(ex)) session = Session() for item in items: session.delete(item) diff --git a/nefertari_sqla/signals.py b/nefertari_sqla/signals.py index 3b35ac3..c42d87f 100644 --- a/nefertari_sqla/signals.py +++ b/nefertari_sqla/signals.py @@ -1,4 +1,5 @@ import logging +from collections import defaultdict from sqlalchemy import event from sqlalchemy.ext.declarative import DeclarativeMeta @@ -47,18 +48,37 @@ def on_after_delete(mapper, connection, target): def on_bulk_update(update_context): + model_cls = update_context.mapper.entity + if not getattr(model_cls, '_index_enabled', False): + return + objects = update_context.query.all() if not objects: return - model_cls = type(objects[0]) + from nefertari.elasticsearch import ES + es = ES(source=model_cls.__name__) + documents = to_dicts(objects) + es.index(documents) + + +def on_bulk_delete(delete_context): + model_cls = delete_context.mapper.entity if not getattr(model_cls, '_index_enabled', False): return + # Run SQL statements again, to fetch objects that will be + # deleted on session commit + query = delete_context.query.statement.execute() + values_tuples = query.fetchall() + + pk_field = model_cls.pk_field() + pk_index = query.keys().index(pk_field) + ids = [tup[pk_index] for tup in values_tuples] + from nefertari.elasticsearch import ES es = ES(source=model_cls.__name__) - documents = to_dicts(objects) - es.index(documents) + es.delete(ids) def setup_es_signals_for(source_cls): @@ -69,6 +89,7 @@ def setup_es_signals_for(source_cls): event.listen(Session, 'after_bulk_update', on_bulk_update) +event.listen(Session, 'after_bulk_delete', on_bulk_delete) class ESMetaclass(DeclarativeMeta): From 7b512ce922a42a478d4755e25eeced561fb3e85e Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Mon, 25 May 2015 14:47:12 +0300 Subject: [PATCH 03/18] Rework bulk update and delete signal handlers Bulk update handler now reindexes references of updated objects. Bulk delete handler is called explicitly from BaseMixin._delete_many. --- nefertari_sqla/documents.py | 27 ++++++++++++++++++++++----- nefertari_sqla/signals.py | 22 +++++++++++----------- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 07c2a43..884d8a0 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -14,7 +14,7 @@ from nefertari.utils import ( process_fields, process_limit, _split, dictset, DataProxy) -from .signals import ESMetaclass +from .signals import ESMetaclass, on_bulk_delete from .fields import ListField, DictField, DateTimeField, IntegerField from . import types @@ -473,10 +473,27 @@ def _delete(cls, **params): @classmethod def _delete_many(cls, items, synchronize_session=False): + """ Delete :items: queryset or objects list. + + When queryset passed, Query.delete() is used to delete it. Note that + queryset may not have limit(), offset(), order_by(), group_by(), or + distinct() called on it. + + If some of the methods listed above were called, or :items: is not + a Query instance, one-by-one items update is performed. + + `on_bulk_delete` function is called to delete objects from index + and to reindex relationships. This is done explicitly because it is + impossible to get access to deleted objects in signal handler for + 'after_bulk_delete' ORM event. + """ if isinstance(items, Query): try: - return items.delete( + delete_items = items.all() + items.delete( synchronize_session=synchronize_session) + on_bulk_delete(cls, delete_items) + return except Exception as ex: log.error(str(ex)) session = Session() @@ -489,11 +506,11 @@ def _update_many(cls, items, synchronize_session='fetch', **params): """ Update :items: queryset or objects list. When queryset passed, Query.update() is used to update it. Note that - queryset may not jave limit(), offset(), order_by(), group_by(), or + queryset may not have limit(), offset(), order_by(), group_by(), or distinct() called on it. - Or some of above methods were called, or :items: is not a Query - instance, one-by-one items update is performed. + If some of the methods listed above were called, or :items: is not + a Query instance, one-by-one items update is performed. """ if isinstance(items, Query): try: diff --git a/nefertari_sqla/signals.py b/nefertari_sqla/signals.py index c42d87f..ef51ef0 100644 --- a/nefertari_sqla/signals.py +++ b/nefertari_sqla/signals.py @@ -1,9 +1,9 @@ import logging -from collections import defaultdict from sqlalchemy import event from sqlalchemy.ext.declarative import DeclarativeMeta from sqlalchemy.orm import object_session, class_mapper +from sqlalchemy.orm.query import Query from pyramid_sqlalchemy import Session from nefertari.utils import to_dicts @@ -61,25 +61,26 @@ def on_bulk_update(update_context): documents = to_dicts(objects) es.index(documents) + # Reindex relationships + for obj in objects: + es.index_refs(obj) + -def on_bulk_delete(delete_context): - model_cls = delete_context.mapper.entity +def on_bulk_delete(model_cls, objects): if not getattr(model_cls, '_index_enabled', False): return - # Run SQL statements again, to fetch objects that will be - # deleted on session commit - query = delete_context.query.statement.execute() - values_tuples = query.fetchall() - pk_field = model_cls.pk_field() - pk_index = query.keys().index(pk_field) - ids = [tup[pk_index] for tup in values_tuples] + ids = [getattr(obj, pk_field) for obj in objects] from nefertari.elasticsearch import ES es = ES(source=model_cls.__name__) es.delete(ids) + # Reindex relationships + for obj in objects: + es.index_refs(obj) + def setup_es_signals_for(source_cls): event.listen(source_cls, 'after_insert', on_after_insert) @@ -89,7 +90,6 @@ def setup_es_signals_for(source_cls): event.listen(Session, 'after_bulk_update', on_bulk_update) -event.listen(Session, 'after_bulk_delete', on_bulk_delete) class ESMetaclass(DeclarativeMeta): From 68516ba29325351fdc723219bbf72b6b63d431de Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Mon, 25 May 2015 15:01:27 +0300 Subject: [PATCH 04/18] Add tests --- nefertari_sqla/tests/test_documents.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index 4070476..a6a7d22 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -378,11 +378,30 @@ def test_underscore_delete_many(self, mock_session): assert mock_session().delete.call_count == 2 mock_session().flush.assert_called_once_with() + @patch.object(docs, 'on_bulk_delete') + def test_underscore_delete_many_query(self, mock_on_bulk): + from sqlalchemy.orm.query import Query + items = Query('asd') + items.all = Mock(return_value=[1, 2, 3]) + items.delete = Mock() + docs.BaseMixin._delete_many(items) + items.delete.assert_called_once_with( + synchronize_session=False) + mock_on_bulk.assert_called_once_with(docs.BaseMixin, [1, 2, 3]) + def test_underscore_update_many(self): item = Mock() docs.BaseMixin._update_many([item], foo='bar') item.update.assert_called_once_with({'foo': 'bar'}) + def test_underscore_update_many_query(self): + from sqlalchemy.orm.query import Query + items = Query('asd') + items.update = Mock() + docs.BaseMixin._update_many(items, foo='bar') + items.update.assert_called_once_with( + {'foo': 'bar'}, synchronize_session='fetch') + def test_repr(self): obj = docs.BaseMixin() obj.id = 3 From fd28b5212f08b0eab3b38e8b4fbd3af9a5ded67d Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Wed, 27 May 2015 10:40:24 +0300 Subject: [PATCH 05/18] Migrate BaseMixin._delete to be instance method BaseDOcument.delete --- nefertari_sqla/documents.py | 8 +++----- nefertari_sqla/tests/test_documents.py | 15 +++++++-------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index db77233..6118020 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -464,11 +464,6 @@ def _update(self, params, **kw): setattr(self, key, new_value) return self - @classmethod - def _delete(cls, **params): - obj = cls.get(**params) - object_session(obj).delete(obj) - @classmethod def _delete_many(cls, items, synchronize_session=False): """ Delete :items: queryset or objects list. @@ -726,6 +721,9 @@ def update(self, params): self.__class__.__name__), extra={'data': e}) + def delete(self): + object_session(self).delete(self) + def clean(self, force_all=False): """ Apply field processors to all changed fields And perform custom field values cleaning before running DB validation. diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index fed104e..29b79ae 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -358,14 +358,6 @@ class MyModel(docs.BaseDocument): assert newobj.name == 'bar' assert newobj.settings == {'sett1': 'val1'} - @patch.object(docs.BaseMixin, 'get') - @patch.object(docs, 'object_session') - def test_underscore_delete(self, obj_session, mock_get): - docs.BaseMixin._delete(foo='bar') - mock_get.assert_called_once_with(foo='bar') - obj_session.assert_called_once_with(mock_get()) - obj_session().delete.assert_called_once_with(mock_get()) - @patch.object(docs, 'Session') def test_underscore_delete_many(self, mock_session): docs.BaseMixin._delete_many(['foo', 'bar']) @@ -658,6 +650,13 @@ def test_update_error(self, mock_upd, simple_model, memory_db): simple_model(id=4).update({'name': 'q'}) assert 'There was a conflict' in str(ex.value) + @patch.object(docs, 'object_session') + def test_delete(self, obj_session): + obj = docs.BaseDocument() + obj.delete() + obj_session.assert_called_once_with(obj) + obj_session().delete.assert_called_once_with(obj) + def test_clean_new_object(self, memory_db): processor = lambda instance, new_value: 'foobar' From 684fd4dd6c9c921b8ec4e94bc0a0b286b369b558 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Wed, 27 May 2015 11:56:33 +0300 Subject: [PATCH 06/18] Receive refresh_index arg in all document CRUD methods. Set flag on docs --- nefertari_sqla/documents.py | 32 +++++++++++++++----------- nefertari_sqla/signals.py | 2 +- nefertari_sqla/tests/test_documents.py | 10 ++++---- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 6118020..b5292e1 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -465,7 +465,8 @@ def _update(self, params, **kw): return self @classmethod - def _delete_many(cls, items, synchronize_session=False): + def _delete_many(cls, items, synchronize_session=False, + refresh_index=False): """ Delete :items: queryset or objects list. When queryset passed, Query.delete() is used to delete it. Note that @@ -485,17 +486,19 @@ def _delete_many(cls, items, synchronize_session=False): delete_items = items.all() items.delete( synchronize_session=synchronize_session) - on_bulk_delete(cls, delete_items) + on_bulk_delete(cls, delete_items, refresh_index=refresh_index) return except Exception as ex: log.error(str(ex)) session = Session() for item in items: + item._refresh_index = refresh_index session.delete(item) session.flush() @classmethod - def _update_many(cls, items, synchronize_session='fetch', **params): + def _update_many(cls, items, synchronize_session='fetch', + refresh_index=False, **params): """ Update :items: queryset or objects list. When queryset passed, Query.update() is used to update it. Note that @@ -507,11 +510,13 @@ def _update_many(cls, items, synchronize_session='fetch', **params): """ if isinstance(items, Query): try: + items._refresh_index = refresh_index return items.update( params, synchronize_session=synchronize_session) except Exception as ex: log.error(str(ex)) for item in items: + item._refresh_index = refresh_index item.update(params) def __repr__(self): @@ -565,7 +570,9 @@ def to_dict(self, **kwargs): return _dict def update_iterables(self, params, attr, unique=False, - value_type=None, save=True): + value_type=None, save=True, + refresh_index=False): + self._refresh_index = refresh_index mapper = class_mapper(self.__class__) columns = {c.name: c for c in mapper.columns} is_dict = isinstance(columns.get(attr), DictField) @@ -601,9 +608,7 @@ def update_dict(update_params): setattr(self, attr, final_value) if save: - session = object_session(self) - session.add(self) - session.flush() + self.save(refresh_index=refresh_index) def update_list(update_params): final_value = getattr(self, attr, []) or [] @@ -627,9 +632,7 @@ def update_list(update_params): setattr(self, attr, final_value) if save: - session = object_session(self) - session.add(self) - session.flush() + self.save(refresh_index=refresh_index) if is_dict: update_dict(params) @@ -684,9 +687,10 @@ def _bump_version(self): self.updated_at = datetime.utcnow() self._version = (self._version or 0) + 1 - def save(self, *arg, **kw): + def save(self, refresh_index=False): session = object_session(self) self._bump_version() + self._refresh_index = refresh_index session = session or Session() try: self.clean() @@ -703,7 +707,8 @@ def save(self, *arg, **kw): self.__class__.__name__), extra={'data': e}) - def update(self, params): + def update(self, params, refresh_index=False): + self._refresh_index = refresh_index try: self._update(params) self._bump_version() @@ -721,7 +726,8 @@ def update(self, params): self.__class__.__name__), extra={'data': e}) - def delete(self): + def delete(self, refresh_index=False): + self._refresh_index = refresh_index object_session(self).delete(self) def clean(self, force_all=False): diff --git a/nefertari_sqla/signals.py b/nefertari_sqla/signals.py index ef51ef0..550eda6 100644 --- a/nefertari_sqla/signals.py +++ b/nefertari_sqla/signals.py @@ -66,7 +66,7 @@ def on_bulk_update(update_context): es.index_refs(obj) -def on_bulk_delete(model_cls, objects): +def on_bulk_delete(model_cls, objects, refresh_index=False): if not getattr(model_cls, '_index_enabled', False): return diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index 29b79ae..63bf436 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -360,10 +360,11 @@ class MyModel(docs.BaseDocument): @patch.object(docs, 'Session') def test_underscore_delete_many(self, mock_session): - docs.BaseMixin._delete_many(['foo', 'bar']) + foo = Mock() + docs.BaseMixin._delete_many([foo]) mock_session.assert_called_once_with() - mock_session().delete.assert_called_with('bar') - assert mock_session().delete.call_count == 2 + mock_session().delete.assert_called_with(foo) + assert mock_session().delete.call_count == 1 mock_session().flush.assert_called_once_with() @patch.object(docs, 'on_bulk_delete') @@ -375,7 +376,8 @@ def test_underscore_delete_many_query(self, mock_on_bulk): docs.BaseMixin._delete_many(items) items.delete.assert_called_once_with( synchronize_session=False) - mock_on_bulk.assert_called_once_with(docs.BaseMixin, [1, 2, 3]) + mock_on_bulk.assert_called_once_with( + docs.BaseMixin, [1, 2, 3], refresh_index=False) def test_underscore_update_many(self): item = Mock() From 0e452ae2d01bb8af43fa17fac38a6c8e24ebb3e2 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Wed, 27 May 2015 12:17:07 +0300 Subject: [PATCH 07/18] Pass refresh_index from signals to ES --- nefertari_sqla/signals.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/nefertari_sqla/signals.py b/nefertari_sqla/signals.py index 550eda6..031d7d8 100644 --- a/nefertari_sqla/signals.py +++ b/nefertari_sqla/signals.py @@ -12,42 +12,47 @@ log = logging.getLogger(__name__) -def index_object(obj, with_refs=True): +def index_object(obj, with_refs=True, **kwargs): from nefertari.elasticsearch import ES es = ES(obj.__class__.__name__) - es.index(obj.to_dict()) + es.index(obj.to_dict(), **kwargs) if with_refs: - es.index_refs(obj) + es.index_refs(obj, **kwargs) def on_after_insert(mapper, connection, target): # Reload `target` to get access to back references and processed # fields values + refresh_index = getattr(target, '_refresh_index', False) model_cls = target.__class__ pk_field = target.pk_field() reloaded = model_cls.get(**{pk_field: getattr(target, pk_field)}) - index_object(reloaded) + index_object(reloaded, refresh_index=refresh_index) def on_after_update(mapper, connection, target): + refresh_index = getattr(target, '_refresh_index', False) session = object_session(target) # Reload `target` to get access to processed fields values attributes = [c.name for c in class_mapper(target.__class__).columns] session.expire(target, attribute_names=attributes) - index_object(target) + index_object(target, refresh_index=refresh_index) def on_after_delete(mapper, connection, target): from nefertari.elasticsearch import ES + refresh_index = getattr(target, '_refresh_index', False) model_cls = target.__class__ es = ES(model_cls.__name__) obj_id = getattr(target, model_cls.pk_field()) - es.delete(obj_id) - es.index_refs(target) + es.delete(obj_id, refresh_index=refresh_index) + es.index_refs(target, refresh_index=refresh_index) def on_bulk_update(update_context): + refresh_index = getattr( + update_context.query, '_refresh_index', False) model_cls = update_context.mapper.entity if not getattr(model_cls, '_index_enabled', False): return @@ -59,11 +64,11 @@ def on_bulk_update(update_context): from nefertari.elasticsearch import ES es = ES(source=model_cls.__name__) documents = to_dicts(objects) - es.index(documents) + es.index(documents, refresh_index=refresh_index) # Reindex relationships for obj in objects: - es.index_refs(obj) + es.index_refs(obj, refresh_index=refresh_index) def on_bulk_delete(model_cls, objects, refresh_index=False): @@ -75,11 +80,11 @@ def on_bulk_delete(model_cls, objects, refresh_index=False): from nefertari.elasticsearch import ES es = ES(source=model_cls.__name__) - es.delete(ids) + es.delete(ids, refresh_index=refresh_index) # Reindex relationships for obj in objects: - es.index_refs(obj) + es.index_refs(obj, refresh_index=refresh_index) def setup_es_signals_for(source_cls): From 564afcd55ca889fa676d37ab581e892f7af4b2ad Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Wed, 27 May 2015 13:54:21 +0300 Subject: [PATCH 08/18] Make refresh_index None by default --- nefertari_sqla/documents.py | 15 +++++++-------- nefertari_sqla/signals.py | 11 +++++------ nefertari_sqla/tests/test_documents.py | 5 +++-- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index b5292e1..61d0207 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -466,7 +466,7 @@ def _update(self, params, **kw): @classmethod def _delete_many(cls, items, synchronize_session=False, - refresh_index=False): + refresh_index=None): """ Delete :items: queryset or objects list. When queryset passed, Query.delete() is used to delete it. Note that @@ -498,7 +498,7 @@ def _delete_many(cls, items, synchronize_session=False, @classmethod def _update_many(cls, items, synchronize_session='fetch', - refresh_index=False, **params): + refresh_index=None, **params): """ Update :items: queryset or objects list. When queryset passed, Query.update() is used to update it. Note that @@ -516,8 +516,7 @@ def _update_many(cls, items, synchronize_session='fetch', except Exception as ex: log.error(str(ex)) for item in items: - item._refresh_index = refresh_index - item.update(params) + item.update(params, refresh_index=refresh_index) def __repr__(self): parts = [] @@ -571,7 +570,7 @@ def to_dict(self, **kwargs): def update_iterables(self, params, attr, unique=False, value_type=None, save=True, - refresh_index=False): + refresh_index=None): self._refresh_index = refresh_index mapper = class_mapper(self.__class__) columns = {c.name: c for c in mapper.columns} @@ -687,7 +686,7 @@ def _bump_version(self): self.updated_at = datetime.utcnow() self._version = (self._version or 0) + 1 - def save(self, refresh_index=False): + def save(self, refresh_index=None): session = object_session(self) self._bump_version() self._refresh_index = refresh_index @@ -707,7 +706,7 @@ def save(self, refresh_index=False): self.__class__.__name__), extra={'data': e}) - def update(self, params, refresh_index=False): + def update(self, params, refresh_index=None): self._refresh_index = refresh_index try: self._update(params) @@ -726,7 +725,7 @@ def update(self, params, refresh_index=False): self.__class__.__name__), extra={'data': e}) - def delete(self, refresh_index=False): + def delete(self, refresh_index=None): self._refresh_index = refresh_index object_session(self).delete(self) diff --git a/nefertari_sqla/signals.py b/nefertari_sqla/signals.py index 031d7d8..49777ff 100644 --- a/nefertari_sqla/signals.py +++ b/nefertari_sqla/signals.py @@ -3,7 +3,6 @@ from sqlalchemy import event from sqlalchemy.ext.declarative import DeclarativeMeta from sqlalchemy.orm import object_session, class_mapper -from sqlalchemy.orm.query import Query from pyramid_sqlalchemy import Session from nefertari.utils import to_dicts @@ -23,7 +22,7 @@ def index_object(obj, with_refs=True, **kwargs): def on_after_insert(mapper, connection, target): # Reload `target` to get access to back references and processed # fields values - refresh_index = getattr(target, '_refresh_index', False) + refresh_index = getattr(target, '_refresh_index', None) model_cls = target.__class__ pk_field = target.pk_field() reloaded = model_cls.get(**{pk_field: getattr(target, pk_field)}) @@ -31,7 +30,7 @@ def on_after_insert(mapper, connection, target): def on_after_update(mapper, connection, target): - refresh_index = getattr(target, '_refresh_index', False) + refresh_index = getattr(target, '_refresh_index', None) session = object_session(target) # Reload `target` to get access to processed fields values @@ -42,7 +41,7 @@ def on_after_update(mapper, connection, target): def on_after_delete(mapper, connection, target): from nefertari.elasticsearch import ES - refresh_index = getattr(target, '_refresh_index', False) + refresh_index = getattr(target, '_refresh_index', None) model_cls = target.__class__ es = ES(model_cls.__name__) obj_id = getattr(target, model_cls.pk_field()) @@ -52,7 +51,7 @@ def on_after_delete(mapper, connection, target): def on_bulk_update(update_context): refresh_index = getattr( - update_context.query, '_refresh_index', False) + update_context.query, '_refresh_index', None) model_cls = update_context.mapper.entity if not getattr(model_cls, '_index_enabled', False): return @@ -71,7 +70,7 @@ def on_bulk_update(update_context): es.index_refs(obj, refresh_index=refresh_index) -def on_bulk_delete(model_cls, objects, refresh_index=False): +def on_bulk_delete(model_cls, objects, refresh_index=None): if not getattr(model_cls, '_index_enabled', False): return diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index 63bf436..e205fca 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -377,12 +377,13 @@ def test_underscore_delete_many_query(self, mock_on_bulk): items.delete.assert_called_once_with( synchronize_session=False) mock_on_bulk.assert_called_once_with( - docs.BaseMixin, [1, 2, 3], refresh_index=False) + docs.BaseMixin, [1, 2, 3], refresh_index=None) def test_underscore_update_many(self): item = Mock() docs.BaseMixin._update_many([item], foo='bar') - item.update.assert_called_once_with({'foo': 'bar'}) + item.update.assert_called_once_with( + {'foo': 'bar'}, refresh_index=None) def test_underscore_update_many_query(self): from sqlalchemy.orm.query import Query From 4302ebadef3bd1a331ac8b1a348b19c17fdbe45e Mon Sep 17 00:00:00 2001 From: Jonathan Stoikovitch Date: Wed, 27 May 2015 12:51:19 -0400 Subject: [PATCH 09/18] added missing fixes to 0.2.1 changelog --- docs/source/changelog.rst | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index be8a4a9..deefd57 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -2,9 +2,11 @@ Changelog ========= * :release:`0.2.2 <2015-05-27>` -* :bug:`-` fixes login issue -* :bug:`-` fixes posting to singular resources e.g. /api/users//profile -* :bug:`-` fixes multiple foreign keys to same model +* :bug:`-` Fixed login issue +* :bug:`-` Fixed posting to singular resources e.g. /api/users//profile +* :bug:`-` Fixed multiple foreign keys to same model +* :bug:`-` Fixed ES mapping error when values of field were all null +* :bug:`-` Fixed a bug whereby Relationship could not be created without a backref * :release:`0.2.1 <2015-05-20>` * :bug:`-` Fixed slow queries to backrefs From 511e2b28bd290e52cd197209582eec33373c3737 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Thu, 28 May 2015 10:44:17 +0300 Subject: [PATCH 10/18] Remove `Processable` prefix from type names --- nefertari_sqla/documents.py | 18 +++++----- nefertari_sqla/fields.py | 40 ++++++++++----------- nefertari_sqla/tests/test_types.py | 56 +++++++++++++++--------------- nefertari_sqla/types.py | 26 +++++++------- 4 files changed, 69 insertions(+), 71 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 53254ce..2193d7e 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -67,11 +67,11 @@ def process_bools(_dict): types.LimitedText: {'type': 'string'}, types.LimitedUnicode: {'type': 'string'}, types.LimitedUnicodeText: {'type': 'string'}, - types.ProcessableChoice: {'type': 'string'}, + types.Choice: {'type': 'string'}, - types.ProcessableBoolean: {'type': 'boolean'}, - types.ProcessableLargeBinary: {'type': 'object'}, - types.ProcessableDict: {'type': 'object'}, + types.Boolean: {'type': 'boolean'}, + types.LargeBinary: {'type': 'object'}, + types.Dict: {'type': 'object'}, types.LimitedNumeric: {'type': 'double'}, types.LimitedFloat: {'type': 'double'}, @@ -79,11 +79,11 @@ def process_bools(_dict): types.LimitedInteger: {'type': 'long'}, types.LimitedBigInteger: {'type': 'long'}, types.LimitedSmallInteger: {'type': 'long'}, - types.ProcessableInterval: {'type': 'long'}, + types.Interval: {'type': 'long'}, - types.ProcessableDateTime: {'type': 'date', 'format': 'dateOptionalTime'}, - types.ProcessableDate: {'type': 'date', 'format': 'dateOptionalTime'}, - types.ProcessableTime: {'type': 'date', 'format': 'HH:mm:ss'}, + types.DateTime: {'type': 'date', 'format': 'dateOptionalTime'}, + types.Date: {'type': 'date', 'format': 'dateOptionalTime'}, + types.Time: {'type': 'date', 'format': 'HH:mm:ss'}, } @@ -124,7 +124,7 @@ def get_es_mapping(cls): for name, column in columns.items(): column_type = column.type - if isinstance(column_type, types.ProcessableChoiceArray): + if isinstance(column_type, types.ChoiceArray): column_type = column_type.impl.item_type column_type = type(column_type) if column_type not in TYPES_MAP: diff --git a/nefertari_sqla/fields.py b/nefertari_sqla/fields.py index 5a6342d..85af5c7 100644 --- a/nefertari_sqla/fields.py +++ b/nefertari_sqla/fields.py @@ -12,16 +12,16 @@ LimitedFloat, LimitedNumeric, LimitedUnicodeText, - ProcessableDateTime, - ProcessableBoolean, - ProcessableDate, - ProcessableInterval, - ProcessableLargeBinary, - ProcessablePickleType, - ProcessableTime, - ProcessableChoice, - ProcessableDict, - ProcessableChoiceArray, + DateTime, + Boolean, + Date, + Interval, + LargeBinary, + PickleType, + Time, + Choice, + Dict, + ChoiceArray, ) @@ -142,7 +142,7 @@ class BigIntegerField(ProcessableMixin, BaseField): class BooleanField(ProcessableMixin, BaseField): - _sqla_type = ProcessableBoolean + _sqla_type = Boolean _type_unchanged_kwargs = ('create_constraint') def process_type_args(self, kwargs): @@ -159,17 +159,17 @@ def process_type_args(self, kwargs): class DateField(ProcessableMixin, BaseField): - _sqla_type = ProcessableDate + _sqla_type = Date _type_unchanged_kwargs = () class DateTimeField(ProcessableMixin, BaseField): - _sqla_type = ProcessableDateTime + _sqla_type = DateTime _type_unchanged_kwargs = ('timezone',) class ChoiceField(ProcessableMixin, BaseField): - _sqla_type = ProcessableChoice + _sqla_type = Choice _type_unchanged_kwargs = ( 'collation', 'convert_unicode', 'unicode_error', '_warn_on_bytestring', 'choices') @@ -195,13 +195,13 @@ class IdField(IntegerField): class IntervalField(ProcessableMixin, BaseField): - _sqla_type = ProcessableInterval + _sqla_type = Interval _type_unchanged_kwargs = ( 'native', 'second_precision', 'day_precision') class BinaryField(ProcessableMixin, BaseField): - _sqla_type = ProcessableLargeBinary + _sqla_type = LargeBinary _type_unchanged_kwargs = ('length',) # Since SQLAlchemy 1.0.0 @@ -217,7 +217,7 @@ class DecimalField(ProcessableMixin, BaseField): class PickleField(ProcessableMixin, BaseField): - _sqla_type = ProcessablePickleType + _sqla_type = PickleType _type_unchanged_kwargs = ( 'protocol', 'pickler', 'comparator') @@ -251,7 +251,7 @@ class TextField(StringField): class TimeField(DateTimeField): - _sqla_type = ProcessableTime + _sqla_type = Time class UnicodeField(StringField): @@ -263,7 +263,7 @@ class UnicodeTextField(StringField): class DictField(BaseField): - _sqla_type = ProcessableDict + _sqla_type = Dict _type_unchanged_kwargs = () def process_type_args(self, kwargs): @@ -274,7 +274,7 @@ def process_type_args(self, kwargs): class ListField(BaseField): - _sqla_type = ProcessableChoiceArray + _sqla_type = ChoiceArray _type_unchanged_kwargs = ( 'as_tuple', 'dimensions', 'zero_indexes', 'choices') diff --git a/nefertari_sqla/tests/test_types.py b/nefertari_sqla/tests/test_types.py index 95a7fd5..64c61ba 100644 --- a/nefertari_sqla/tests/test_types.py +++ b/nefertari_sqla/tests/test_types.py @@ -114,10 +114,10 @@ def test_min_and_max_value(self): raise Exception('Unexpected exception') -class TestProcessableChoice(object): +class TestChoice(object): def test_no_choices(self): - field = types.ProcessableChoice() + field = types.Choice() field._column_name = 'foo' with pytest.raises(ValueError) as ex: field.process_bind_param('foo', None) @@ -125,14 +125,14 @@ def test_no_choices(self): 'Field `foo`: Got an invalid choice `foo`. Valid choices: ()' def test_none_value(self): - field = types.ProcessableChoice() + field = types.Choice() try: field.process_bind_param(None, None) except ValueError: raise Exception('Unexpected error') def test_value_not_in_choices(self): - field = types.ProcessableChoice(choices=['foo']) + field = types.Choice(choices=['foo']) field._column_name = 'foo' with pytest.raises(ValueError) as ex: field.process_bind_param('bar', None) @@ -140,38 +140,38 @@ def test_value_not_in_choices(self): 'Field `foo`: Got an invalid choice `bar`. Valid choices: (foo)' def test_value_in_choices(self): - field = types.ProcessableChoice(choices=['foo']) + field = types.Choice(choices=['foo']) try: field.process_bind_param('foo', None) except ValueError: raise Exception('Unexpected error') def test_choices_not_sequence(self): - field = types.ProcessableChoice(choices='foo') + field = types.Choice(choices='foo') try: field.process_bind_param('foo', None) except ValueError: raise Exception('Unexpected error') -class TestProcessableInterval(object): +class TestInterval(object): def test_passing_seconds(self): - field = types.ProcessableInterval() + field = types.Interval() value = field.process_bind_param(36000, None) assert isinstance(value, datetime.timedelta) assert value.seconds == 36000 def test_passing_timedelta(self): - field = types.ProcessableInterval() + field = types.Interval() value = field.process_bind_param(datetime.timedelta(seconds=60), None) assert isinstance(value, datetime.timedelta) -class TestProcessableDict(object): +class TestDict(object): def test_load_dialect_impl_postgresql(self): - field = types.ProcessableDict() + field = types.Dict() dialect = Mock() dialect.name = 'postgresql' field.load_dialect_impl(dialect=dialect) @@ -180,7 +180,7 @@ def test_load_dialect_impl_postgresql(self): def test_load_dialect_impl_not_postgresql(self): from sqlalchemy.types import UnicodeText - field = types.ProcessableDict() + field = types.Dict() dialect = Mock() dialect.name = 'some_other' field.load_dialect_impl(dialect=dialect) @@ -188,36 +188,36 @@ def test_load_dialect_impl_not_postgresql(self): dialect.type_descriptor.assert_called_once_with(UnicodeText) def test_process_bind_param_postgres(self): - field = types.ProcessableDict() + field = types.Dict() dialect = Mock() dialect.name = 'postgresql' assert {'q': 'f'} == field.process_bind_param({'q': 'f'}, dialect) def test_process_bind_param_not_postgres(self): - field = types.ProcessableDict() + field = types.Dict() dialect = Mock() dialect.name = 'some_other' assert '{"q": "f"}' == field.process_bind_param({'q': 'f'}, dialect) def test_process_result_value_postgres(self): - field = types.ProcessableDict() + field = types.Dict() dialect = Mock() dialect.name = 'postgresql' assert {'q': 'f'} == field.process_result_value({'q': 'f'}, dialect) def test_process_result_value_not_postgres(self): - field = types.ProcessableDict() + field = types.Dict() dialect = Mock() dialect.name = 'some_other' assert {'q': 'f'} == field.process_result_value('{"q": "f"}', dialect) -class TestProcessableChoiceArray(object): +class TestChoiceArray(object): @patch.object(types, 'ARRAY') @patch.object(types.types, 'UnicodeText') def test_load_dialect_impl_postgresql(self, mock_unic, mock_array): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) dialect = Mock() dialect.name = 'postgresql' field.load_dialect_impl(dialect=dialect) @@ -228,7 +228,7 @@ def test_load_dialect_impl_postgresql(self, mock_unic, mock_array): @patch.object(types, 'ARRAY') @patch.object(types.types, 'UnicodeText') def test_load_dialect_impl_not_postgresql(self, mock_unic, mock_array): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) dialect = Mock() dialect.name = 'some_other' field.load_dialect_impl(dialect=dialect) @@ -237,12 +237,12 @@ def test_load_dialect_impl_not_postgresql(self, mock_unic, mock_array): assert not mock_array.called def test_choices_not_sequence(self): - field = types.ProcessableChoiceArray( + field = types.ChoiceArray( item_type=fields.StringField, choices='foo') assert field.choices == ['foo'] def test_validate_choices_no_choices(self): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) assert field.choices is None try: field._validate_choices(['foo']) @@ -250,7 +250,7 @@ def test_validate_choices_no_choices(self): raise Exception('Unexpected error') def test_validate_choices_no_value(self): - field = types.ProcessableChoiceArray( + field = types.ChoiceArray( item_type=fields.StringField, choices=['foo']) try: field._validate_choices(None) @@ -258,7 +258,7 @@ def test_validate_choices_no_value(self): raise Exception('Unexpected error') def test_validate_choices_valid(self): - field = types.ProcessableChoiceArray( + field = types.ChoiceArray( item_type=fields.StringField, choices=['foo', 'bar']) try: @@ -267,7 +267,7 @@ def test_validate_choices_valid(self): raise Exception('Unexpected error') def test_validate_choices_invalid(self): - field = types.ProcessableChoiceArray( + field = types.ChoiceArray( item_type=fields.StringField, choices=['foo', 'bar']) field._column_name = 'mycol' @@ -278,25 +278,25 @@ def test_validate_choices_invalid(self): 'Valid choices: (foo, bar)') def test_process_bind_param_postgres(self): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) dialect = Mock() dialect.name = 'postgresql' assert ['q'] == field.process_bind_param(['q'], dialect) def test_process_bind_param_not_postgres(self): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) dialect = Mock() dialect.name = 'some_other' assert '["q"]' == field.process_bind_param(['q'], dialect) def test_process_result_value_postgres(self): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) dialect = Mock() dialect.name = 'postgresql' assert ['q'] == field.process_result_value(['q'], dialect) def test_process_result_value_not_postgres(self): - field = types.ProcessableChoiceArray(item_type=fields.StringField) + field = types.ChoiceArray(item_type=fields.StringField) dialect = Mock() dialect.name = 'some_other' assert ['q'] == field.process_result_value('["q"]', dialect) diff --git a/nefertari_sqla/types.py b/nefertari_sqla/types.py index 2fa7fed..49cb576 100644 --- a/nefertari_sqla/types.py +++ b/nefertari_sqla/types.py @@ -97,21 +97,19 @@ class LimitedNumeric(SizeLimitedNumberMixin, types.TypeDecorator): impl = types.Numeric -# Types that support running processors - -class ProcessableDateTime(types.TypeDecorator): +class DateTime(types.TypeDecorator): impl = types.DateTime -class ProcessableBoolean(types.TypeDecorator): +class Boolean(types.TypeDecorator): impl = types.Boolean -class ProcessableDate(types.TypeDecorator): +class Date(types.TypeDecorator): impl = types.Date -class ProcessableChoice(types.TypeDecorator): +class Choice(types.TypeDecorator): """ Type that represents value from a particular set of choices. Value may be any number of choices from a provided set of @@ -124,7 +122,7 @@ def __init__(self, *args, **kwargs): self.choices = kwargs.pop('choices', ()) if not isinstance(self.choices, (list, tuple, set)): self.choices = [self.choices] - super(ProcessableChoice, self).__init__(*args, **kwargs) + super(Choice, self).__init__(*args, **kwargs) def process_bind_param(self, value, dialect): if (value is not None) and (value not in self.choices): @@ -134,7 +132,7 @@ def process_bind_param(self, value, dialect): return value -class ProcessableInterval(types.TypeDecorator): +class Interval(types.TypeDecorator): impl = types.Interval def process_bind_param(self, value, dialect): @@ -144,19 +142,19 @@ def process_bind_param(self, value, dialect): return value -class ProcessableLargeBinary(types.TypeDecorator): +class LargeBinary(types.TypeDecorator): impl = types.LargeBinary -class ProcessablePickleType(types.TypeDecorator): +class PickleType(types.TypeDecorator): impl = types.PickleType -class ProcessableTime(types.TypeDecorator): +class Time(types.TypeDecorator): impl = types.Time -class ProcessableDict(types.TypeDecorator): +class Dict(types.TypeDecorator): """ Represents a dictionary of values. @@ -193,7 +191,7 @@ def process_result_value(self, value, dialect): return value -class ProcessableChoiceArray(types.TypeDecorator): +class ChoiceArray(types.TypeDecorator): """ Represents a list of values. If 'postgresql' is used, postgress.ARRAY type is used for db column @@ -211,7 +209,7 @@ def __init__(self, *args, **kwargs): self.choices, (list, tuple, set)): self.choices = [self.choices] self.kwargs = kwargs - super(ProcessableChoiceArray, self).__init__(*args, **kwargs) + super(ChoiceArray, self).__init__(*args, **kwargs) def load_dialect_impl(self, dialect): """ Based on :dialect.name: determine type to be used. From 459e2d36b03e61cb2d14d06010f45221a993e0c0 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Thu, 28 May 2015 10:54:28 +0300 Subject: [PATCH 11/18] Split processors to pre/post. Update apply_processors method --- nefertari_sqla/fields.py | 12 +++++++++--- nefertari_sqla/tests/test_documents.py | 8 ++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/nefertari_sqla/fields.py b/nefertari_sqla/fields.py index 85af5c7..303eae1 100644 --- a/nefertari_sqla/fields.py +++ b/nefertari_sqla/fields.py @@ -30,11 +30,17 @@ class ProcessableMixin(object): is being set on a field. """ def __init__(self, *args, **kwargs): - self.processors = kwargs.pop('processors', ()) + self.pre_processors = kwargs.pop('pre_processors', ()) + self.post_processors = kwargs.pop('post_processors', ()) super(ProcessableMixin, self).__init__(*args, **kwargs) - def apply_processors(self, instance, new_value): - for proc in self.processors: + def apply_processors(self, instance, new_value, pre=False, post=False): + processors = [] + if pre: + processors += list(self.pre_processors) + if post: + processors += list(self.post_processors) + for proc in processors: new_value = proc(instance=instance, new_value=new_value) return new_value diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index 32f75eb..d9e7cdd 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -645,8 +645,8 @@ def test_clean_new_object(self, memory_db): class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) - name = fields.StringField(processors=[processor]) - email = fields.StringField(processors=[processor]) + name = fields.StringField(pre_processors=[processor]) + email = fields.StringField(pre_processors=[processor]) memory_db() obj = MyModel(name='myname') @@ -660,8 +660,8 @@ def test_clean_existing_object(self, memory_db): class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) - name = fields.StringField(processors=[processor]) - email = fields.StringField(processors=[processor]) + name = fields.StringField(pre_processors=[processor]) + email = fields.StringField(pre_processors=[processor]) memory_db() obj = MyModel(id=1, name='myname', email='FOO').save() From e2cb672c8aa533c50ca69457be30cd64833900f6 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Thu, 28 May 2015 12:01:02 +0300 Subject: [PATCH 12/18] Apply new processors --- nefertari_sqla/documents.py | 65 +++++++++++++++++-------- nefertari_sqla/tests/test_documents.py | 67 ++++++++++++++++++++++---- 2 files changed, 102 insertions(+), 30 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 2193d7e..e8f458d 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -439,8 +439,7 @@ def get_or_create(cls, **params): except NoResultFound: defaults.update(params) new_obj = cls(**defaults) - query_set.session.add(new_obj) - query_set.session.flush() + new_obj.save() return new_obj, True except MultipleResultsFound: raise JHTTPBadRequest('Bad or Insufficient Params') @@ -568,9 +567,7 @@ def update_dict(update_params): setattr(self, attr, final_value) if save: - session = object_session(self) - session.add(self) - session.flush() + self.save() def update_list(update_params): final_value = getattr(self, attr, []) or [] @@ -594,9 +591,7 @@ def update_list(update_params): setattr(self, attr, final_value) if save: - session = object_session(self) - session.add(self) - session.flush() + self.save() if is_dict: update_dict(params) @@ -656,10 +651,11 @@ def save(self, *arg, **kw): self._bump_version() session = session or Session() try: - self.clean() + self.apply_pre_processors() session.add(self) session.flush() session.expire(self) + self.apply_post_processors() return self except (IntegrityError,) as e: if 'duplicate' not in e.message: @@ -674,10 +670,11 @@ def update(self, params): try: self._update(params) self._bump_version() - self.clean() + self.apply_pre_processors() session = object_session(self) session.add(self) session.flush() + self.apply_post_processors() return self except (IntegrityError,) as e: if 'duplicate' not in e.message: @@ -688,9 +685,30 @@ def update(self, params): self.__class__.__name__), extra={'data': e}) - def clean(self, force_all=False): - """ Apply field processors to all changed fields And perform custom - field values cleaning before running DB validation. + def apply_processors(self, column_names=None, pre=False, post=False): + """ Apply processors to columns with :column_names: names. + + Arguments: + :column_names: List of string names of changed columns. + :pre: Boolean indicating whether to apply pre-processors. + :post: Boolean indicating whether to apply post-processors. + """ + columns = {c.key: c for c in class_mapper(self.__class__).columns} + if column_names is None: + column_names = columns.keys() + + for name in column_names: + column = columns.get(name) + if column is not None and hasattr(column, 'apply_processors'): + new_value = getattr(self, name) + processed_value = column.apply_processors( + instance=self, new_value=new_value, + pre=pre, post=post) + setattr(self, name, processed_value) + + def apply_pre_processors(self): + """ Determine changed columns and run `self.apply_processors` to + apply needed processors. Note that at this stage, field values are in the exact same state you posted/set them. E.g. if you set time_field='11/22/2000', @@ -699,18 +717,23 @@ def clean(self, force_all=False): columns = {c.key: c for c in class_mapper(self.__class__).columns} state = attributes.instance_state(self) - if state.persistent and not force_all: + if state.persistent: changed_columns = state.committed_state.keys() else: # New object changed_columns = columns.keys() - for name in changed_columns: - column = columns.get(name) - if column is not None and hasattr(column, 'apply_processors'): - new_value = getattr(self, name) - processed_value = column.apply_processors( - instance=self, new_value=new_value) - setattr(self, name, processed_value) + self._columns_to_process = changed_columns + self.apply_processors(changed_columns, pre=True) + + def apply_post_processors(self): + """ Run `self.apply_processors` with columns names determined by + `self.apply_pre_processors`. + + Note that at this stage, field values are in the exact same state + you posted/set them. E.g. if you set time_field='11/22/2000', + self.time_field will be equal to '11/22/2000' here. + """ + self.apply_processors(self._columns_to_process, post=True) class ESBaseDocument(BaseDocument): diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index d9e7cdd..0837fed 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -338,8 +338,6 @@ def test_get_or_create_existing_created( one, created = simple_model.get_or_create( defaults={'id': 7}, _limit=2, name='q') assert created - assert queryset.session.add.call_count == 1 - assert queryset.session.flush.call_count == 1 assert one.id == 7 assert one.name == 'q' @@ -639,41 +637,92 @@ def test_update_error(self, mock_upd, simple_model, memory_db): simple_model(id=4).update({'name': 'q'}) assert 'There was a conflict' in str(ex.value) - def test_clean_new_object(self, memory_db): + def test_apply_pre_processors_new_object(self, memory_db): processor = lambda instance, new_value: 'foobar' + processor2 = lambda instance, new_value: new_value + '+' class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) - name = fields.StringField(pre_processors=[processor]) + name = fields.StringField( + pre_processors=[processor], + post_processors=[processor2]) email = fields.StringField(pre_processors=[processor]) memory_db() obj = MyModel(name='myname') - obj.clean() + obj.apply_pre_processors() assert obj.name == 'foobar' assert obj.email == 'foobar' - def test_clean_existing_object(self, memory_db): + def test_apply_pre_processors_existing_object(self, memory_db): processor = lambda instance, new_value: new_value + '-' + processor2 = lambda instance, new_value: new_value + '+' class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) - name = fields.StringField(pre_processors=[processor]) + name = fields.StringField( + pre_processors=[processor], + post_processors=[processor2]) email = fields.StringField(pre_processors=[processor]) memory_db() obj = MyModel(id=1, name='myname', email='FOO').save() - assert obj.name == 'myname-' + assert obj.name == 'myname-+' assert obj.email == 'FOO-' obj = MyModel.get(id=1) + assert obj.name == 'myname-+' obj.name = 'supername' - obj.clean() + obj.apply_pre_processors() assert obj.name == 'supername-' assert obj.email == 'FOO-' + def test_apply_post_processors(self, memory_db): + memory_db() + obj = docs.BaseDocument() + obj.apply_processors = Mock() + obj._columns_to_process = [1, 2, 3] + obj.apply_post_processors() + obj.apply_processors.assert_called_once_with( + [1, 2, 3], post=True) + + def test_apply_pre_processors(self, memory_db): + class MyModel(docs.BaseDocument): + __tablename__ = 'mymodel' + id = fields.IdField(primary_key=True) + memory_db() + + obj = MyModel(id=1) + obj.apply_processors = Mock() + obj.apply_pre_processors() + obj.apply_processors.assert_called_once_with( + ['id', 'updated_at', '_version'], pre=True) + + def test_apply_processors(self, memory_db): + class MyModel(docs.BaseDocument): + __tablename__ = 'mymodel' + name = fields.StringField( + primary_key=True, + pre_processors=[lambda instance, new_value: new_value + '-'], + post_processors=[ + lambda instance, new_value: new_value + '+']) + memory_db() + obj = MyModel(name='foo') + + obj.apply_processors(pre=True) + assert obj.name == 'foo-' + + obj.apply_processors(post=True) + assert obj.name == 'foo-+' + + obj.apply_processors() + assert obj.name == 'foo-+' + + obj.apply_processors(column_names=['name'], pre=True, post=True) + assert obj.name == 'foo-+-+' + class TestGetCollection(object): From 241ee0529febcfebc4a4a2c4a63039d2d7593082 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Thu, 28 May 2015 12:58:57 +0300 Subject: [PATCH 13/18] Add docstring to ProcessableMixin --- nefertari_sqla/fields.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nefertari_sqla/fields.py b/nefertari_sqla/fields.py index 303eae1..8526354 100644 --- a/nefertari_sqla/fields.py +++ b/nefertari_sqla/fields.py @@ -30,6 +30,12 @@ class ProcessableMixin(object): is being set on a field. """ def __init__(self, *args, **kwargs): + """ Pop pre/post processors + + :pre_processors: Processors that are run before session.flush() + :post_processors: Processors that are run after session.flush() but + before session.commit() + """ self.pre_processors = kwargs.pop('pre_processors', ()) self.post_processors = kwargs.pop('post_processors', ()) super(ProcessableMixin, self).__init__(*args, **kwargs) From 59c41f4f7a53108d6483014483035c0dd8020d36 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Mon, 1 Jun 2015 13:16:13 +0300 Subject: [PATCH 14/18] Fix reindexation of related obj on creation --- nefertari_sqla/documents.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index 61d0207..cfc7bf8 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -650,8 +650,11 @@ def get_reference_documents(self): # If 'Many' side should be indexed, its value is already a list. if value is None or isinstance(value, list): continue - session = object_session(value) - session.refresh(value) + try: + session = object_session(value) + session.refresh(value) + except InvalidRequestError: + pass yield (value.__class__, [value.to_dict()]) def _is_modified(self): From 645809de6ec38360f538d6c4ef3aa0064d9ddd44 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Tue, 2 Jun 2015 10:22:21 +0300 Subject: [PATCH 15/18] Rename pre/post processors to before/after validation --- nefertari_sqla/documents.py | 28 +++++++++-------- nefertari_sqla/fields.py | 23 +++++++------- nefertari_sqla/tests/test_documents.py | 42 +++++++++++++------------- 3 files changed, 48 insertions(+), 45 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index e8f458d..b174e54 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -651,11 +651,11 @@ def save(self, *arg, **kw): self._bump_version() session = session or Session() try: - self.apply_pre_processors() + self.apply_before_validation() session.add(self) session.flush() session.expire(self) - self.apply_post_processors() + self.apply_after_validation() return self except (IntegrityError,) as e: if 'duplicate' not in e.message: @@ -670,11 +670,11 @@ def update(self, params): try: self._update(params) self._bump_version() - self.apply_pre_processors() + self.apply_before_validation() session = object_session(self) session.add(self) session.flush() - self.apply_post_processors() + self.apply_after_validation() return self except (IntegrityError,) as e: if 'duplicate' not in e.message: @@ -685,13 +685,15 @@ def update(self, params): self.__class__.__name__), extra={'data': e}) - def apply_processors(self, column_names=None, pre=False, post=False): + def apply_processors(self, column_names=None, before=False, after=False): """ Apply processors to columns with :column_names: names. Arguments: :column_names: List of string names of changed columns. - :pre: Boolean indicating whether to apply pre-processors. - :post: Boolean indicating whether to apply post-processors. + :before: Boolean indicating whether to apply before_validation + processors. + :after: Boolean indicating whether to apply after_validation + processors. """ columns = {c.key: c for c in class_mapper(self.__class__).columns} if column_names is None: @@ -703,10 +705,10 @@ def apply_processors(self, column_names=None, pre=False, post=False): new_value = getattr(self, name) processed_value = column.apply_processors( instance=self, new_value=new_value, - pre=pre, post=post) + before=before, after=after) setattr(self, name, processed_value) - def apply_pre_processors(self): + def apply_before_validation(self): """ Determine changed columns and run `self.apply_processors` to apply needed processors. @@ -723,17 +725,17 @@ def apply_pre_processors(self): changed_columns = columns.keys() self._columns_to_process = changed_columns - self.apply_processors(changed_columns, pre=True) + self.apply_processors(changed_columns, before=True) - def apply_post_processors(self): + def apply_after_validation(self): """ Run `self.apply_processors` with columns names determined by - `self.apply_pre_processors`. + `self.apply_before_validation`. Note that at this stage, field values are in the exact same state you posted/set them. E.g. if you set time_field='11/22/2000', self.time_field will be equal to '11/22/2000' here. """ - self.apply_processors(self._columns_to_process, post=True) + self.apply_processors(self._columns_to_process, after=True) class ESBaseDocument(BaseDocument): diff --git a/nefertari_sqla/fields.py b/nefertari_sqla/fields.py index 8526354..43ca971 100644 --- a/nefertari_sqla/fields.py +++ b/nefertari_sqla/fields.py @@ -30,22 +30,23 @@ class ProcessableMixin(object): is being set on a field. """ def __init__(self, *args, **kwargs): - """ Pop pre/post processors + """ Pop before/after validation processors - :pre_processors: Processors that are run before session.flush() - :post_processors: Processors that are run after session.flush() but - before session.commit() + :before_validation: Processors that are run before session.flush() + :after_validation: Processors that are run after session.flush() + but before session.commit() """ - self.pre_processors = kwargs.pop('pre_processors', ()) - self.post_processors = kwargs.pop('post_processors', ()) + self.before_validation = kwargs.pop('before_validation', ()) + self.after_validation = kwargs.pop('after_validation', ()) super(ProcessableMixin, self).__init__(*args, **kwargs) - def apply_processors(self, instance, new_value, pre=False, post=False): + def apply_processors(self, instance, new_value, + before=False, after=False): processors = [] - if pre: - processors += list(self.pre_processors) - if post: - processors += list(self.post_processors) + if before: + processors += list(self.before_validation) + if after: + processors += list(self.after_validation) for proc in processors: new_value = proc(instance=instance, new_value=new_value) return new_value diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index 0837fed..df34b3b 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -637,7 +637,7 @@ def test_update_error(self, mock_upd, simple_model, memory_db): simple_model(id=4).update({'name': 'q'}) assert 'There was a conflict' in str(ex.value) - def test_apply_pre_processors_new_object(self, memory_db): + def test_apply_before_validation_new_object(self, memory_db): processor = lambda instance, new_value: 'foobar' processor2 = lambda instance, new_value: new_value + '+' @@ -645,17 +645,17 @@ class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) name = fields.StringField( - pre_processors=[processor], - post_processors=[processor2]) - email = fields.StringField(pre_processors=[processor]) + before_validation=[processor], + after_validation=[processor2]) + email = fields.StringField(before_validation=[processor]) memory_db() obj = MyModel(name='myname') - obj.apply_pre_processors() + obj.apply_before_validation() assert obj.name == 'foobar' assert obj.email == 'foobar' - def test_apply_pre_processors_existing_object(self, memory_db): + def test_apply_before_validation_existing_object(self, memory_db): processor = lambda instance, new_value: new_value + '-' processor2 = lambda instance, new_value: new_value + '+' @@ -663,9 +663,9 @@ class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) name = fields.StringField( - pre_processors=[processor], - post_processors=[processor2]) - email = fields.StringField(pre_processors=[processor]) + before_validation=[processor], + after_validation=[processor2]) + email = fields.StringField(before_validation=[processor]) memory_db() obj = MyModel(id=1, name='myname', email='FOO').save() @@ -675,20 +675,20 @@ class MyModel(docs.BaseDocument): obj = MyModel.get(id=1) assert obj.name == 'myname-+' obj.name = 'supername' - obj.apply_pre_processors() + obj.apply_before_validation() assert obj.name == 'supername-' assert obj.email == 'FOO-' - def test_apply_post_processors(self, memory_db): + def test_apply_after_validation(self, memory_db): memory_db() obj = docs.BaseDocument() obj.apply_processors = Mock() obj._columns_to_process = [1, 2, 3] - obj.apply_post_processors() + obj.apply_after_validation() obj.apply_processors.assert_called_once_with( - [1, 2, 3], post=True) + [1, 2, 3], after=True) - def test_apply_pre_processors(self, memory_db): + def test_apply_before_validation(self, memory_db): class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' id = fields.IdField(primary_key=True) @@ -696,31 +696,31 @@ class MyModel(docs.BaseDocument): obj = MyModel(id=1) obj.apply_processors = Mock() - obj.apply_pre_processors() + obj.apply_before_validation() obj.apply_processors.assert_called_once_with( - ['id', 'updated_at', '_version'], pre=True) + ['id', 'updated_at', '_version'], before=True) def test_apply_processors(self, memory_db): class MyModel(docs.BaseDocument): __tablename__ = 'mymodel' name = fields.StringField( primary_key=True, - pre_processors=[lambda instance, new_value: new_value + '-'], - post_processors=[ + before_validation=[lambda instance, new_value: new_value + '-'], + after_validation=[ lambda instance, new_value: new_value + '+']) memory_db() obj = MyModel(name='foo') - obj.apply_processors(pre=True) + obj.apply_processors(before=True) assert obj.name == 'foo-' - obj.apply_processors(post=True) + obj.apply_processors(after=True) assert obj.name == 'foo-+' obj.apply_processors() assert obj.name == 'foo-+' - obj.apply_processors(column_names=['name'], pre=True, post=True) + obj.apply_processors(column_names=['name'], before=True, after=True) assert obj.name == 'foo-+-+' From 2ab064038990d3850364c4cb4c45dda1728f3e90 Mon Sep 17 00:00:00 2001 From: Artem Kostiuk Date: Tue, 2 Jun 2015 12:44:07 +0300 Subject: [PATCH 16/18] Generate mapping for relationship fields --- nefertari_sqla/documents.py | 9 ++++++++ nefertari_sqla/tests/test_documents.py | 29 ++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/nefertari_sqla/documents.py b/nefertari_sqla/documents.py index cfc7bf8..18d64dc 100644 --- a/nefertari_sqla/documents.py +++ b/nefertari_sqla/documents.py @@ -120,6 +120,7 @@ def get_es_mapping(cls): } mapper = class_mapper(cls) columns = {c.name: c for c in mapper.columns} + relationships = {r.key: r for r in mapper.relationships} # Replace field 'id' with primary key field columns['id'] = columns.get(cls.pk_field()) @@ -132,6 +133,14 @@ def get_es_mapping(cls): continue properties[name] = TYPES_MAP[column_type] + for name, column in relationships.items(): + if name in cls._nested_relationships: + column_type = {'type': 'object'} + else: + rel_pk_field = column.mapper.class_.pk_field_type() + column_type = TYPES_MAP[rel_pk_field] + properties[name] = column_type + properties['_type'] = {'type': 'string'} return mapping diff --git a/nefertari_sqla/tests/test_documents.py b/nefertari_sqla/tests/test_documents.py index e205fca..acfc11a 100644 --- a/nefertari_sqla/tests/test_documents.py +++ b/nefertari_sqla/tests/test_documents.py @@ -79,10 +79,20 @@ class MyModel(docs.BaseDocument): groups = fields.ListField( item_type=fields.StringField, choices=['admin', 'user']) + + class MyModel2(docs.BaseDocument): + _nested_relationships = ['myself'] + __tablename__ = 'mymodel2' + name = fields.StringField(primary_key=True) + myself = fields.Relationship( + document='MyModel', backref_name='parent', + uselist=False, backref_uselist=False) + child_id = fields.ForeignKeyField( + ref_document='MyModel', ref_column='mymodel.name', + ref_column_type=fields.StringField) memory_db() - mapping = MyModel.get_es_mapping() - assert mapping == { + assert MyModel.get_es_mapping() == { 'mymodel': { 'properties': { '_type': {'type': 'string'}, @@ -91,6 +101,21 @@ class MyModel(docs.BaseDocument): 'id': {'type': 'string'}, 'my_id': {'type': 'long'}, 'name': {'type': 'string'}, + 'parent': {'type': 'string'}, + 'updated_at': {'format': 'dateOptionalTime', + 'type': 'date'} + } + } + } + assert MyModel2.get_es_mapping() == { + 'mymodel2': { + 'properties': { + '_type': {'type': 'string'}, + '_version': {'type': 'long'}, + 'child_id': {'type': 'string'}, + 'id': {'type': 'string'}, + 'name': {'type': 'string'}, + 'myself': {'type': 'object'}, 'updated_at': {'format': 'dateOptionalTime', 'type': 'date'} } From 8fd3d5f4a080c1dc5151a0b43dfa410b4797f8f8 Mon Sep 17 00:00:00 2001 From: Chris Hart Date: Wed, 3 Jun 2015 16:47:29 -0400 Subject: [PATCH 17/18] updated changelog for release 0.2.3 --- docs/source/changelog.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index deefd57..137596f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,11 @@ Changelog ========= +* :release:`0.2.3 <2015-06-03>` +* :bug:`-` Fixed password minimum length support by adding before and after validation processors +* :bug:`-` Fixed bug with Elasticsearch indexing of nested relationships +* :bug:`-` Fixed race condition in Elasticsearch indexing + * :release:`0.2.2 <2015-05-27>` * :bug:`-` Fixed login issue * :bug:`-` Fixed posting to singular resources e.g. /api/users//profile From 55e7bb98d48f501dc0a63da409ce412f59cd772a Mon Sep 17 00:00:00 2001 From: Chris Hart Date: Wed, 3 Jun 2015 16:55:28 -0400 Subject: [PATCH 18/18] bumped versions --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 71d4f5e..4179076 100644 --- a/setup.py +++ b/setup.py @@ -9,13 +9,13 @@ 'sqlalchemy_utils', 'elasticsearch', 'pyramid_tm', - 'nefertari==0.3.1' + 'nefertari==0.3.2' ] setup( name='nefertari_sqla', - version="0.2.2", + version="0.2.3", description='sqla engine for nefertari', classifiers=[ "Programming Language :: Python",