Skip to content

Commit

Permalink
#2053 - Stop Celery Chains on Expected Aborts (#2116)
Browse files Browse the repository at this point in the history
  • Loading branch information
k-macmillan authored Nov 12, 2024
1 parent a7a340b commit 00ba3fa
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 57 deletions.
20 changes: 10 additions & 10 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,10 @@ fileignoreconfig:
checksum: ae4e31c6eb56d91ec80ae09d13baf4558cf461c65f08893b93fee43f036a17a7
- filename: app/template/rest.py
checksum: 1e5bdac8bc694d50f8f656dec127dd036b7b1b5b6156e3282d3411956c71ba0b
- filename: cd/application-deployment/dev/dev.env
checksum: a6bed7de359c7cec67940c1f0113826365400684c5a3bd182e8237d48ad5c1f1
- filename: cd/application-deployment/dev/vaec-api-task-definition.json
checksum: f328ff821339b802eb1d82559e624d5b719857c813d427da5aaa39b240331ddd
- filename: cd/application-deployment/perf/perf.env
checksum: 1b3b7539dd80b0661594082956e61fd86451692946d845cfe676798aac75618d
- filename: cd/application-deployment/prod/prod.env
checksum: 55252b1cb0e16b02301ae8bffb1015f7da5286d4bce0b415a95842cdb368c275
- filename: cd/application-deployment/staging/staging.env
checksum: 9e5161e8a0a13974d9b67d8a7e61d1b3fed9657a7e2dfeb6d82fd8ace64e2715
- filename: ci/docker-compose-test.yml
checksum: e3efec2749e8c19e60f5bfc68eafabe24eba647530a482ceccfc4e0e62cff424
- filename: ci/.local.env
checksum: 8caee8cf67974ad5195c835d3d266d81dbc4c635c547b7cc49704c0593e7833b
- filename: lambda_functions/pinpoint_callback/pinpoint_callback_lambda.py
checksum: 7bd4900e14b1fa789bbb2568b8a8d7a400e3c8350ba32fb44cc0b5b66a2df037
- filename: lambda_functions/ses_callback/ses_callback_lambda.py
Expand Down Expand Up @@ -81,4 +71,14 @@ fileignoreconfig:
checksum: 7f8a30dd84b3ceb0d08bae949b5b127fd408ee2fd8097eb7d4b291ede61f8d0f
- filename: tests/app/celery/test_process_delivery_status_result_tasks.py
checksum: 62fa6216b62971d62c2e53f6b31aeeb659d7a1e404665362ee89cb3ec04793a6
- filename: cd/application-deployment/dev/dev.env
checksum: 2ecdf2787dd15a7971471b5b6b799adf478a8021552e690de76b682f5e7344a3
- filename: cd/application-deployment/perf/perf.env
checksum: 06912f9617483c19b076d92f0036c125f0a2f80e10a1665e5ddc5ce31a354a5c
- filename: cd/application-deployment/prod/prod.env
checksum: 64f46f118e9f652c663bc53225ddf39f1ef85040ea3e24fceb71ea752984d46c
- filename: cd/application-deployment/staging/staging.env
checksum: ce893a6a8405ad66d7b8efa8e1b1991c9ae6bebb245d500e93d148e1142b714d
- filename: ci/.local.env
checksum: 806fc75f59d611f5fd02af9653eaee61db0c920cc49ff70c1aebb92d2aa7a9db
version: "1.0"
29 changes: 24 additions & 5 deletions app/celery/contact_information_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
update_notification_status_by_id,
)
from app.exceptions import NotificationTechnicalFailureException, NotificationPermanentFailureException
from app.feature_flags import FeatureFlag, is_feature_enabled
from app.models import (
Notification,
RecipientIdentifier,
Expand Down Expand Up @@ -106,7 +107,7 @@ def get_profile_result(
)


def handle_lookup_contact_info_exception(
def handle_lookup_contact_info_exception( # noqa: C901
lookup_task: Task, notification: Notification, recipient_identifier: RecipientIdentifier, e: Exception
):
"""
Expand Down Expand Up @@ -146,7 +147,11 @@ def handle_lookup_contact_info_exception(
notification.id, NOTIFICATION_PERMANENT_FAILURE, status_reason=e.failure_reason
)
check_and_queue_callback_task(notification)
raise NotificationPermanentFailureException(message) from e
if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
lookup_task.request.chain = None
else:
raise NotificationPermanentFailureException(message) from e
elif isinstance(e, (VAProfileIDNotFoundException, VAProfileNonRetryableException)):
current_app.logger.exception(e)
message = (
Expand All @@ -157,7 +162,11 @@ def handle_lookup_contact_info_exception(
notification.id, NOTIFICATION_PERMANENT_FAILURE, status_reason=e.failure_reason
)
check_and_queue_callback_task(notification)
raise NotificationPermanentFailureException(message) from e
if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
lookup_task.request.chain = None
else:
raise NotificationPermanentFailureException(message) from e
elif isinstance(e, CommunicationItemNotFoundException):
current_app.logger.info(
'Communication item for recipient %s not found on notification %s',
Expand All @@ -170,12 +179,22 @@ def handle_lookup_contact_info_exception(
status=NOTIFICATION_PERMANENT_FAILURE,
status_reason='No recipient opt-in found for explicit preference',
)
raise e
check_and_queue_callback_task(notification)
if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
lookup_task.request.chain = None
else:
raise e
else:
# Means the default_send is True and this does not require an explicit opt-in
return None
elif isinstance(e, NotificationPermanentFailureException):
raise e
# check_and_queue_callback_task is called upstream
if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
lookup_task.request.chain = None
else:
raise e
else:
current_app.logger.exception(f'Unhandled exception for notification {notification.id}: {e}')
raise e
Expand Down
12 changes: 8 additions & 4 deletions app/celery/lookup_va_profile_id_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from celery import Task
from flask import current_app
from notifications_utils.statsd_decorators import statsd

Expand All @@ -9,6 +10,7 @@
from app.celery.exceptions import AutoRetryException
from app.dao import notifications_dao
from app import mpi_client
from app.feature_flags import FeatureFlag, is_feature_enabled
from app.va.identifier import IdentifierType, UnsupportedIdentifierException
from app.va.mpi import (
MpiRetryableException,
Expand All @@ -32,7 +34,7 @@
)
@statsd(namespace='tasks')
def lookup_va_profile_id(
self,
self: Task,
notification_id,
):
current_app.logger.info(f'Retrieving VA Profile ID from MPI for notification {notification_id}')
Expand Down Expand Up @@ -62,7 +64,6 @@ def lookup_va_profile_id(
msg = handle_max_retries_exceeded(notification_id, 'lookup_va_profile_id')
check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(msg)

except (
BeneficiaryDeceasedException,
IdentifierNotFound,
Expand All @@ -81,8 +82,11 @@ def lookup_va_profile_id(
notification_id, NOTIFICATION_PERMANENT_FAILURE, status_reason=e.failure_reason
)
check_and_queue_callback_task(notification)
raise NotificationPermanentFailureException(message) from e

if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
self.request.chain = None
else:
raise NotificationPermanentFailureException(message) from e
except Exception as e:
message = (
f'Failed to retrieve VA Profile ID from MPI for notification: {notification_id} '
Expand Down
25 changes: 18 additions & 7 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
NotificationTechnicalFailureException,
InvalidProviderException,
)
from app.feature_flags import FeatureFlag, is_feature_enabled
from app.v2.errors import RateLimitError

from celery import Task
from flask import current_app
from notifications_utils.field import NullValueForNonConditionalPlaceholderException
from notifications_utils.recipients import InvalidEmailError, InvalidPhoneError
Expand All @@ -39,7 +42,7 @@
)
@statsd(namespace='tasks')
def deliver_sms(
self,
self: Task,
notification_id,
sms_sender_id=None,
):
Expand Down Expand Up @@ -74,14 +77,18 @@ def deliver_sms(
)
raise NotificationTechnicalFailureException from e
except NonRetryableException as e:
# Max retries exceeded, celery raised exception
# Likely an opted out from pinpoint
log_and_update_permanent_failure(
notification.id,
'deliver_sms',
e,
'ERROR: NonRetryableException - permanent failure, not retrying',
)
raise NotificationPermanentFailureException from e
if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
self.request.chain = None
else:
raise NotificationPermanentFailureException from e
except (NullValueForNonConditionalPlaceholderException, AttributeError, RuntimeError) as e:
log_and_update_technical_failure(notification_id, 'deliver_sms', e)
raise NotificationTechnicalFailureException(f'Found {type(e).__name__}, NOT retrying...', e, e.args)
Expand All @@ -107,7 +114,7 @@ def deliver_sms(
retry_backoff_max=60,
)
@statsd(namespace='tasks')
def deliver_sms_with_rate_limiting(
def deliver_sms_with_rate_limiting( # noqa: C901
self,
notification_id,
sms_sender_id=None,
Expand Down Expand Up @@ -148,14 +155,18 @@ def deliver_sms_with_rate_limiting(
)
raise NotificationPermanentFailureException from e
except NonRetryableException as e:
# Max retries exceeded, celery raised exception
# Likely an opted out from pinpoint
log_and_update_permanent_failure(
notification.id,
'deliver_sms_with_rate_limiting',
e,
'ERROR: NonRetryableException - permanent failure, not retrying',
)
raise NotificationTechnicalFailureException from e
if is_feature_enabled(FeatureFlag.CLEAR_CELERY_CHAIN):
# Expected chain termination
self.request.chain = None
else:
raise NotificationTechnicalFailureException from e
except RateLimitError:
retry_time = sms_sender.rate_limit_interval / sms_sender.rate_limit
current_app.logger.info(
Expand Down Expand Up @@ -194,7 +205,7 @@ def deliver_sms_with_rate_limiting(
)
@statsd(namespace='tasks')
def deliver_email(
self,
self: Task,
notification_id: str,
sms_sender_id=None,
):
Expand Down
1 change: 1 addition & 0 deletions app/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class FeatureFlag(Enum):
V3_ENABLED = 'V3_ENABLED'
COMP_AND_PEN_MESSAGES_ENABLED = 'COMP_AND_PEN_MESSAGES_ENABLED'
VA_PROFILE_EMAIL_STATUS_ENABLED = 'VA_PROFILE_EMAIL_STATUS_ENABLED'
CLEAR_CELERY_CHAIN = 'CLEAR_CELERY_CHAIN'


def accept_recipient_identifiers_enabled():
Expand Down
1 change: 1 addition & 0 deletions cd/application-deployment/dev/dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ AWS_PINPOINT_APP_ID=df55c01206b742d2946ef226410af94f
AWS_SES_EMAIL_FROM_USER=dev-do-not-reply
CHECK_GITHUB_SCOPE_ENABLED=True
CHECK_TEMPLATE_NAME_EXISTS_ENABLED=True
CLEAR_CELERY_CHAIN=True
COMP_AND_PEN_DYNAMODB_NAME=dev-bip-payment-notification-table
COMP_AND_PEN_MESSAGES_ENABLED=True
DD_ENV=dev
Expand Down
1 change: 1 addition & 0 deletions cd/application-deployment/perf/perf.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ AWS_PINPOINT_APP_ID=f8cab892fe2740c2901560b55a398440
AWS_SES_EMAIL_FROM_USER=perf-do-not-reply
CHECK_GITHUB_SCOPE_ENABLED=False
CHECK_TEMPLATE_NAME_EXISTS_ENABLED=False
CLEAR_CELERY_CHAIN=True
COMP_AND_PEN_DYNAMODB_NAME=perf-bip-payment-notification-table
COMP_AND_PEN_MESSAGES_ENABLED=True
COMP_AND_PEN_PERF_TO_NUMBER=+14254147755
Expand Down
1 change: 1 addition & 0 deletions cd/application-deployment/prod/prod.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ AWS_PINPOINT_APP_ID=9535150638b04a49b49755af2b2d316b
AWS_SES_EMAIL_FROM_USER=do-not-reply
CHECK_GITHUB_SCOPE_ENABLED=False
CHECK_TEMPLATE_NAME_EXISTS_ENABLED=False
CLEAR_CELERY_CHAIN=False
COMP_AND_PEN_DYNAMODB_NAME=prod-bip-payment-notification-table
COMP_AND_PEN_MESSAGES_ENABLED=True
DD_ENV=prod
Expand Down
1 change: 1 addition & 0 deletions cd/application-deployment/staging/staging.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ AWS_PINPOINT_APP_ID=164e77155a7a45299b3bc15562732540
AWS_SES_EMAIL_FROM_USER=staging-do-not-reply
CHECK_GITHUB_SCOPE_ENABLED=False
CHECK_TEMPLATE_NAME_EXISTS_ENABLED=False
CLEAR_CELERY_CHAIN=True
COMP_AND_PEN_DYNAMODB_NAME=staging-bip-payment-notification-table
COMP_AND_PEN_MESSAGES_ENABLED=True
DD_ENV=staging
Expand Down
1 change: 1 addition & 0 deletions ci/.local.env
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ VETEXT_URL=http://host.docker.internal:7008/api/vetext/pub

# Feature flags
ACCEPT_RECIPIENT_IDENTIFIERS_ENABLED=True
CLEAR_CELERY_CHAIN=True
NIGHTLY_NOTIF_CSV_ENABLED=True
NOTIFICATION_FAILURE_REASON_ENABLED=True
PROVIDER_STRATEGIES_ENABLED=True
Expand Down
21 changes: 10 additions & 11 deletions tests/app/celery/test_contact_information_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from app.celery.contact_information_tasks import lookup_contact_info
from app.celery.exceptions import AutoRetryException
from app.constants import EMAIL_TYPE, NOTIFICATION_PERMANENT_FAILURE, NOTIFICATION_TECHNICAL_FAILURE, SMS_TYPE
from app.exceptions import NotificationTechnicalFailureException, NotificationPermanentFailureException
from app.exceptions import NotificationTechnicalFailureException
from app.models import RecipientIdentifier
from app.va.identifier import IdentifierType
from app.va.va_profile import (
Expand Down Expand Up @@ -130,8 +130,7 @@ def test_should_not_retry_on_non_retryable_exception(client, mocker, sample_temp
'app.celery.contact_information_tasks.update_notification_status_by_id'
)

with pytest.raises(NotificationPermanentFailureException):
lookup_contact_info(notification.id)
lookup_contact_info(notification.id)

mocked_va_profile_client.get_email.assert_called_with(mocker.ANY, notification)
recipient_identifier = mocked_va_profile_client.get_email.call_args[0][0]
Expand Down Expand Up @@ -257,8 +256,7 @@ def test_should_update_notification_to_permanent_failure_on_no_contact_info_exce
'app.celery.contact_information_tasks.update_notification_status_by_id'
)

with pytest.raises(NotificationPermanentFailureException):
lookup_contact_info(notification.id)
lookup_contact_info(notification.id)

mocked_va_profile_client.get_email.assert_called_with(mocker.ANY, notification)
recipient_identifier = mocked_va_profile_client.get_email.call_args[0][0]
Expand All @@ -283,13 +281,13 @@ def test_should_update_notification_to_permanent_failure_on_no_contact_info_exce
),
(
NoContactInfoException,
NotificationPermanentFailureException,
None,
NOTIFICATION_PERMANENT_FAILURE,
NoContactInfoException.failure_reason,
),
(
VAProfileNonRetryableException,
NotificationPermanentFailureException,
None,
NOTIFICATION_PERMANENT_FAILURE,
VAProfileNonRetryableException.failure_reason,
),
Expand Down Expand Up @@ -387,6 +385,7 @@ def test_get_email_or_sms_with_permission_utilizes_default_send(
profile['communicationPermissions'][0]['communicationChannelId'] = notification_type.id

mocker.patch('app.va.va_profile.va_profile_client.VAProfileClient.get_profile', return_value=profile)
mock_handle_exception = mocker.patch('app.celery.contact_information_tasks.handle_lookup_contact_info_exception')

if default_send:
# Leaving this logic so it's easier to understand
Expand All @@ -395,13 +394,13 @@ def test_get_email_or_sms_with_permission_utilizes_default_send(
lookup_contact_info(notification.id)
else:
# Implicit + user has opted out
with pytest.raises(NotificationPermanentFailureException):
lookup_contact_info(notification.id)
lookup_contact_info(notification.id)
mock_handle_exception.assert_called_once()
else:
if user_set:
# Explicit + User has opted in - this command will execute and not raise an exception
lookup_contact_info(notification.id)
else:
# Explicit + User has not defined opted in
with pytest.raises(NotificationPermanentFailureException):
lookup_contact_info(notification.id)
lookup_contact_info(notification.id)
mock_handle_exception.assert_called_once()
Loading

0 comments on commit 00ba3fa

Please sign in to comment.