From d8e99272feeaa465ca26468da486777b239a4b74 Mon Sep 17 00:00:00 2001 From: Eben Freeman Date: Mon, 8 Sep 2014 19:37:38 -0700 Subject: [PATCH] Improve syncback concurrency limit. Summary: There's obviously more work to be done here, but in the meantime this commit removes the global limit on the number of syncback actions that can be tried concurrently. Instead, it bounds the number of concurrent syncback greenlets for each (account_id, action_type) pair. The reason for this is that previously, if say archiving was broken for a particular account, but there were a bunch of archive actions outstanding for it, the entire syncback service could grind to a halt. Not ideal. Smaller associated changes: * Make the SyncbackWorker class just be a simple function. * Refactor to hoist sleeping-before-retrying out of the database session scope. * Remove nested database session in delete_draft. Test Plan: Run some syncback actions. Reviewers: charles Reviewed By: charles Differential Revision: https://review.inboxapp.com/D440 --- inbox/actions/__init__.py | 31 ++++++------- inbox/transactions/actions.py | 84 ++++++++++++++++------------------- 2 files changed, 52 insertions(+), 63 deletions(-) diff --git a/inbox/actions/__init__.py b/inbox/actions/__init__.py index f739a270a..ee740e82a 100644 --- a/inbox/actions/__init__.py +++ b/inbox/actions/__init__.py @@ -31,7 +31,6 @@ from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound from inbox.models import Account, Message, Thread -from inbox.models.session import session_scope from inbox.models.action_log import schedule_action from inbox.sendmail.base import (generate_attachments, get_sendmail_client, SendMailException) @@ -134,23 +133,21 @@ def save_draft(account_id, message_id, db_session): def delete_draft(account_id, draft_id, db_session, args): """ Delete a draft from the remote backend. """ inbox_uid = args.get('inbox_uid') + account = db_session.query(Account).get(account_id) - with session_scope(ignore_soft_deletes=False) as db_session: - account = db_session.query(Account).get(account_id) - - # Non-Inbox created draft, therefore standard delete - if inbox_uid is None: - draft = db_session.query(Message).get(draft_id) - remote_delete = \ - module_registry[account.provider].remote_delete - remote_delete(account, account.drafts_folder.name, - draft.thread_id, db_session) - # Inbox created draft, therefore use X-INBOX header - else: - remote_delete_draft = \ - module_registry[account.provider].remote_delete_draft - remote_delete_draft(account, account.drafts_folder.name, - inbox_uid, db_session) + # Non-Inbox created draft, therefore standard delete + if inbox_uid is None: + draft = db_session.query(Message).get(draft_id) + remote_delete = \ + module_registry[account.provider].remote_delete + remote_delete(account, account.drafts_folder.name, + draft.thread_id, db_session) + # Inbox created draft, therefore use X-INBOX header + else: + remote_delete_draft = \ + module_registry[account.provider].remote_delete_draft + remote_delete_draft(account, account.drafts_folder.name, + inbox_uid, db_session) def send_directly(account_id, draft_id, db_session): diff --git a/inbox/transactions/actions.py b/inbox/transactions/actions.py index ce32b22ad..d0e87da24 100644 --- a/inbox/transactions/actions.py +++ b/inbox/transactions/actions.py @@ -5,7 +5,9 @@ * Make this more robust across multiple machines. If you started two instances talking to the same database backend things could go really badly. """ +from collections import defaultdict import gevent +from gevent.coros import BoundedSemaphore from sqlalchemy import asc from inbox.util.concurrency import retry_with_logging @@ -43,14 +45,18 @@ } +CONCURRENCY_LIMIT = 3 + + class SyncbackService(gevent.Greenlet): """Asynchronously consumes the action log and executes syncback actions.""" - def __init__(self, poll_interval=1, chunk_size=100, max_pool_size=22): + def __init__(self, poll_interval=1, chunk_size=100): + semaphore_factory = lambda: BoundedSemaphore(CONCURRENCY_LIMIT) + self.semaphore_map = defaultdict(semaphore_factory) self.keep_running = True self.running = False self.log = logger.new(component='syncback') - self.worker_pool = gevent.pool.Pool(max_pool_size) self.poll_interval = poll_interval self.chunk_size = chunk_size self._scheduled_actions = set() @@ -72,15 +78,15 @@ def _process_log(self): namespace = db_session.query(Namespace). \ get(log_entry.namespace_id) self._scheduled_actions.add(log_entry.id) - worker = SyncbackWorker(action_function, log_entry.id, - log_entry.record_id, - namespace.account_id, - syncback_service=self, - extra_args=log_entry.extra_args) self.log.info('delegating action', action_id=log_entry.id, msg=log_entry.action) - self.worker_pool.start(worker) + semaphore = self.semaphore_map[(namespace.account_id, + log_entry.action)] + gevent.spawn(syncback_worker, semaphore, action_function, + log_entry.id, log_entry.record_id, + namespace.account_id, syncback_service=self, + extra_args=log_entry.extra_args) def remove_from_schedule(self, log_entry_id): self._scheduled_actions.discard(log_entry_id) @@ -117,46 +123,32 @@ def stop(self): gevent.sleep() -class SyncbackWorker(gevent.Greenlet): - """A greenlet spawned to execute a single syncback action.""" - def __init__(self, func, action_log_id, record_id, account_id, - syncback_service, retry_interval=30, extra_args=None): - self.func = func - self.action_log_id = action_log_id - self.record_id = record_id - self.account_id = account_id - self.syncback_service = syncback_service - self.retry_interval = retry_interval - self.extra_args = extra_args - - self.log = logger.new(record_id=record_id, action_log_id=action_log_id, - action=self.func, account_id=self.account_id, - extra_args=extra_args) - gevent.Greenlet.__init__(self) - - def _run(self): - # Not ignoring soft-deleted objects here because if you, say, delete a - # draft, we still need to access the object to delete it on the remote. - with session_scope(ignore_soft_deletes=False) as db_session: +def syncback_worker(semaphore, func, action_log_id, record_id, account_id, + syncback_service, retry_interval=30, extra_args=None): + with semaphore: + log = logger.new(record_id=record_id, action_log_id=action_log_id, + action=func, account_id=account_id, + extra_args=extra_args) + # Not ignoring soft-deleted objects here because if you, say, + # delete a draft, we still need to access the object to delete it + # on the remote. try: - if self.extra_args: - self.func(self.account_id, self.record_id, db_session, - self.extra_args) - else: - self.func(self.account_id, self.record_id, db_session) + with session_scope(ignore_soft_deletes=False) as db_session: + if extra_args: + func(account_id, record_id, db_session, extra_args) + else: + func(account_id, record_id, db_session) + action_log_entry = db_session.query(ActionLog).get( + action_log_id) + action_log_entry.executed = True + db_session.commit() + log.info('syncback action completed', + action_id=action_log_id) + syncback_service.remove_from_schedule(action_log_id) except Exception: - log_uncaught_errors(self.log) + log_uncaught_errors(log) # Wait for a bit, then remove the log id from the scheduled set # so that it can be retried. - gevent.sleep(self.retry_interval) - self.syncback_service.remove_from_schedule(self.action_log_id) + gevent.sleep(retry_interval) + syncback_service.remove_from_schedule(action_log_id) raise - else: - action_log_entry = db_session.query(ActionLog).get( - self.action_log_id) - action_log_entry.executed = True - db_session.commit() - - self.log.info('syncback action completed', - action_id=self.action_log_id) - self.syncback_service.remove_from_schedule(self.action_log_id)