Skip to content

Commit

Permalink
Peridic chatops proxy sync (#4565)
Browse files Browse the repository at this point in the history
Adds a pediodic job to sync tenants with chatops-proxy. Register request
will behave as upsert, allowing us to backfill new tenant's columns:
stack_id and stack_slug. Upsert is not merged on chatops-proxy, so
that's why task handling 409 status on /tenants/register request.
On top of that, I did small refactoring and introduced a new
register_oncall_tenant func, which receives org as an argument to not to
write `register_tenant(org.uuid, org.stack_id, org.stack_slug,.....)`
every time.
Part of grafana/oncall-gateway#247
Need to be merged after #4559.
  • Loading branch information
Konstantinov-Innokentii authored Jun 24, 2024
1 parent 5cf921b commit 48b7eca
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 30 deletions.
18 changes: 18 additions & 0 deletions engine/apps/chatops_proxy/register_oncall_tenant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# register_oncall_tenant moved to separate file from engine/apps/chatops_proxy/utils.py to avoid circular imports.
from django.conf import settings

from apps.chatops_proxy.client import SERVICE_TYPE_ONCALL, ChatopsProxyAPIClient


def register_oncall_tenant(org):
"""
register_oncall_tenant registers oncall organization as a tenant in chatops-proxy.
"""
client = ChatopsProxyAPIClient(settings.ONCALL_GATEWAY_URL, settings.ONCALL_GATEWAY_API_TOKEN)
client.register_tenant(
str(org.uuid),
settings.ONCALL_BACKEND_REGION,
SERVICE_TYPE_ONCALL,
org.stack_id,
org.stack_slug,
)
78 changes: 70 additions & 8 deletions engine/apps/chatops_proxy/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from functools import partial

from celery.utils.log import get_task_logger
from django.conf import settings

from common.custom_celery_tasks import shared_dedicated_queue_retry_task

from .client import ChatopsProxyAPIClient, ChatopsProxyAPIException
from .register_oncall_tenant import register_oncall_tenant

task_logger = get_task_logger(__name__)

Expand All @@ -19,27 +22,41 @@ def register_oncall_tenant_async(**kwargs):
service_type = kwargs.get("service_type")
stack_id = kwargs.get("stack_id")
stack_slug = kwargs.get("stack_slug")
org_id = kwargs.get("org_id")

client = ChatopsProxyAPIClient(settings.ONCALL_GATEWAY_URL, settings.ONCALL_GATEWAY_API_TOKEN)
# Temporary hack to support both old and new set of arguments
if org_id:
from apps.user_management.models import Organization

try:
org = Organization.objects.get(pk=org_id)
except Organization.DoesNotExist:
task_logger.info(f"register_oncall_tenant_async: organization {org_id} was not found")
return
register_func = partial(register_oncall_tenant, org)
else:
client = ChatopsProxyAPIClient(settings.ONCALL_GATEWAY_URL, settings.ONCALL_GATEWAY_API_TOKEN)
register_func = partial(
client.register_tenant, service_tenant_id, cluster_slug, service_type, stack_id, stack_slug
)
try:
client.register_tenant(service_tenant_id, cluster_slug, service_type, stack_id, stack_slug)
register_func()
except ChatopsProxyAPIException as api_exc:
task_logger.error(
f'msg="Failed to register OnCall tenant: {api_exc.msg}" service_tenant_id={service_tenant_id} cluster_slug={cluster_slug}'
)
# TODO: remove this check once new upsert tenant api is released
if api_exc.status == 409:
# 409 Indicates that it's impossible to register tenant, because tenant already registered.
# Not retrying in this case, because manual conflict-resolution needed.
task_logger.info(f"register_oncall_tenant_async: tenant for organization {org_id} already exists")
return
else:
# Otherwise keep retrying task
task_logger.error(
f"register_oncall_tenant_async: failed to register tenant for organization {org_id}: {api_exc.msg}"
)
raise api_exc
except Exception as e:
# Keep retrying task for any other exceptions too
task_logger.error(
f"Failed to register OnCall tenant: {e} service_tenant_id={service_tenant_id} cluster_slug={cluster_slug}"
)
task_logger.error(f"register_oncall_tenant_async: failed to register tenant for organization {org_id}: {e}")
raise e


Expand Down Expand Up @@ -122,3 +139,48 @@ def unlink_slack_team_async(**kwargs):
f'msg="Failed to unlink slack_team: {e}" service_tenant_id={service_tenant_id} slack_team_id={slack_team_id}'
)
raise e


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=0,
)
def start_sync_org_with_chatops_proxy():
from apps.user_management.models import Organization

organization_qs = Organization.objects.all()
organization_pks = organization_qs.values_list("pk", flat=True)

max_countdown = 60 * 30 # 30 minutes, feel free to adjust
for idx, organization_pk in enumerate(organization_pks):
countdown = idx % max_countdown
sync_org_with_chatops_proxy.apply_async(kwargs={"org_id": organization_pk}, countdown=countdown)


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=3,
)
def sync_org_with_chatops_proxy(**kwargs):
from apps.user_management.models import Organization

org_id = kwargs.get("org_id")
task_logger.info(f"sync_org_with_chatops_proxy: started org_id={org_id}")

try:
org = Organization.objects.get(pk=org_id)
except Organization.DoesNotExist:
task_logger.info(f"sync_org_with_chatops_proxy: organization {org_id} was not found")
return

try:
register_oncall_tenant(org)
except ChatopsProxyAPIException as api_exc:
# TODO: once tenants upsert api is released, remove this check
if api_exc.status == 409:
task_logger.info(f"sync_org_with_chatops_proxy: tenant for organization {org_id} already exists")
# 409 Indicates that it's impossible to register tenant, because tenant already registered.
return
raise api_exc
29 changes: 11 additions & 18 deletions engine/apps/chatops_proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.conf import settings

from .client import PROVIDER_TYPE_SLACK, SERVICE_TYPE_ONCALL, ChatopsProxyAPIClient, ChatopsProxyAPIException
from .register_oncall_tenant import register_oncall_tenant
from .tasks import (
link_slack_team_async,
register_oncall_tenant_async,
Expand Down Expand Up @@ -48,32 +49,24 @@ def get_slack_oauth_response_from_chatops_proxy(stack_id) -> dict:
return slack_installation.oauth_response


def register_oncall_tenant(service_tenant_id: str, cluster_slug: str, stack_id: int, stack_slug: str):
def register_oncall_tenant_with_async_fallback(org):
"""
register_oncall_tenant tries to register oncall tenant synchronously and fall back to task in case of any exceptions
to make sure that tenant is registered.
First attempt is synchronous to register tenant ASAP to not miss any chatops requests.
"""
client = ChatopsProxyAPIClient(settings.ONCALL_GATEWAY_URL, settings.ONCALL_GATEWAY_API_TOKEN)
try:
client.register_tenant(
service_tenant_id,
cluster_slug,
SERVICE_TYPE_ONCALL,
stack_id,
stack_slug,
)
register_oncall_tenant(org)
except Exception as e:
logger.error(
f"create_oncall_connector: failed "
f"oncall_org_id={service_tenant_id} backend={cluster_slug} stack_id={stack_id} exc={e}"
)
logger.error(f"create_oncall_connector: failed organization_id={org} exc={e}")
register_oncall_tenant_async.apply_async(
kwargs={
"service_tenant_id": service_tenant_id,
"cluster_slug": cluster_slug,
"service_tenant_id": str(org.uuid),
"cluster_slug": settings.ONCALL_BACKEND_REGION,
"service_type": SERVICE_TYPE_ONCALL,
"stack_id": stack_id,
"stack_id": org.stack_id,
"stack_slug": org.stack_slug,
"org_id": org.id,
},
countdown=2,
)
Expand Down Expand Up @@ -147,7 +140,7 @@ def unlink_slack_team(service_tenant_id: str, slack_team_id: str):
def uninstall_slack(stack_id: int, grafana_user_id: int) -> bool:
"""
uninstall_slack uninstalls slack integration from chatops-proxy and returns bool indicating if it was removed.
If such installation does not exist - returns True as well.s
If such installation does not exist - returns True as well.
"""
client = ChatopsProxyAPIClient(settings.ONCALL_GATEWAY_URL, settings.ONCALL_GATEWAY_API_TOKEN)
try:
Expand All @@ -161,4 +154,4 @@ def uninstall_slack(stack_id: int, grafana_user_id: int) -> bool:
)
return False

return removed is True
return removed
10 changes: 6 additions & 4 deletions engine/apps/user_management/models/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from mirage import fields as mirage_fields

from apps.alerts.models import MaintainableObject
from apps.chatops_proxy.utils import register_oncall_tenant, unlink_slack_team, unregister_oncall_tenant
from apps.chatops_proxy.utils import (
register_oncall_tenant_with_async_fallback,
unlink_slack_team,
unregister_oncall_tenant,
)
from apps.user_management.subscription_strategy import FreePublicBetaSubscriptionStrategy
from apps.user_management.types import AlertGroupTableColumn
from common.insight_log import ChatOpsEvent, ChatOpsTypePlug, write_chatops_insight_log
Expand Down Expand Up @@ -61,9 +65,7 @@ class OrganizationQuerySet(models.QuerySet):
def create(self, **kwargs):
instance = super().create(**kwargs)
if settings.FEATURE_MULTIREGION_ENABLED:
register_oncall_tenant(
str(instance.uuid), settings.ONCALL_BACKEND_REGION, instance.stack_id, instance.stack_slug
)
register_oncall_tenant_with_async_fallback(instance)
return instance

def delete(self):
Expand Down
7 changes: 7 additions & 0 deletions engine/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@ class BrokerTypes:
},
}

if FEATURE_MULTIREGION_ENABLED:
CELERY_BEAT_SCHEDULE["start_sync_org_with_chatops_proxy"] = {
"task": "apps.chatops_proxy.tasks.start_sync_org_with_chatops_proxy",
"schedule": crontab(hour="*/24"), # Every 24 hours, feel free to adjust
"args": (),
}

if ESCALATION_AUDITOR_ENABLED:
CELERY_BEAT_SCHEDULE["check_escalations"] = {
"task": "apps.alerts.tasks.check_escalation_finished.check_escalation_finished_task",
Expand Down
2 changes: 2 additions & 0 deletions engine/settings/celery_task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
"apps.chatops_proxy.tasks.unlink_slack_team_async": {"queue": "default"},
"apps.chatops_proxy.tasks.register_oncall_tenant_async": {"queue": "default"},
"apps.chatops_proxy.tasks.unregister_oncall_tenant_async": {"queue": "default"},
"apps.chatops_proxy.tasks.start_sync_org_with_chatops_proxy": {"queue": "default"},
"apps.chatops_proxy.tasks.sync_org_with_chatops_proxy": {"queue": "default"},
# CRITICAL
"apps.alerts.tasks.acknowledge_reminder.acknowledge_reminder_task": {"queue": "critical"},
"apps.alerts.tasks.acknowledge_reminder.unacknowledge_timeout_task": {"queue": "critical"},
Expand Down

0 comments on commit 48b7eca

Please sign in to comment.