Skip to content

Commit

Permalink
Merge pull request #26 from teams-notifier/feat-rework-draft-ready-tr…
Browse files Browse the repository at this point in the history
…ansition

feat: rework draft handling and draft to ready transition
  • Loading branch information
babs authored Jan 12, 2025
2 parents 95482c0 + eec082b commit eb7fb80
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ repos:
additional_dependencies:
- pydantic
- types-PyYAML
- types-python-dateutil
args:
- --check-untyped-defs
- --disallow-any-generics
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
python-dotenv
python-dateutil
pydantic
asyncpg
httpx
Expand Down
81 changes: 61 additions & 20 deletions webhook/merge_request.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#!/usr/bin/env python3
import datetime
import hashlib
import logging
import uuid
from typing import Any

import asyncpg
import dateutil.parser
import httpx
from pydantic import BaseModel

Expand All @@ -14,6 +17,8 @@
from db import dbh
from gitlab_model import MergeRequestPayload

logger = logging.getLogger(__name__)


class MRMessRef(BaseModel):
merge_request_message_ref_id: int
Expand Down Expand Up @@ -138,11 +143,9 @@ async def merge_request(
conversation_tokens: list[str],
participant_ids_filter: list[int],
):
payload_fingerprint = hashlib.sha256(mr.model_dump_json().encode("utf8")).hexdigest()
logger.debug("payload fingerprint: %s", payload_fingerprint)
mri = await dbh.get_merge_request_ref_infos(mr)
convtoken_to_msgrefs = await get_or_create_message_refs(
mri.merge_request_ref_id,
conversation_tokens,
)

participant_found = True
if participant_ids_filter:
Expand All @@ -159,7 +162,7 @@ async def merge_request(
connection: asyncpg.Connection

if mr.object_attributes.action in ("update"):
... # Update MR info (head_pipeline_id)
# Update MR info (head_pipeline_id)
async with await database.acquire() as connection:
row = await connection.fetchrow(
"""UPDATE merge_request_ref
Expand All @@ -173,6 +176,42 @@ async def merge_request(
if row is not None:
mri.merge_request_extra_state = row["merge_request_extra_state"]

# if it's a transition from draft to ready
# - Delete all messages related to this MR prior to the current event update
# then create_or_update_message will re-post new message
# to have cards being the most recent in the feeds.
# Rows are used as lock to avoid race condition when multiple instances can receive hooks
# for the same MR (multiple webhook same project, multiple instances [kube?])
if mr.changes and "draft" in mr.changes and not mr.object_attributes.draft:
assert mr.object_attributes.updated_at is not None
update_ref_datetime = dateutil.parser.parse(mr.object_attributes.updated_at)
message_expiration = datetime.timedelta(seconds=0)
async with connection.transaction():
res = await connection.fetch(
"""SELECT merge_request_message_ref_id, message_id
FROM merge_request_message_ref
WHERE merge_request_ref_id = $1 AND created_at < $2
FOR UPDATE""",
mri.merge_request_ref_id,
update_ref_datetime,
)
for row in res:
message_id = row.get("message_id")
if message_id is not None:
await connection.execute(
"""INSERT INTO msg_to_delete
(message_id, expire_at)
VALUES
($1, now()+$2::INTERVAL)""",
str(message_id),
message_expiration,
)
await connection.execute(
"DELETE FROM merge_request_message_ref WHERE merge_request_message_ref_id = $1",
row.get("merge_request_message_ref_id"),
)
periodic_cleanup.reschedule()

if mr.object_attributes.action in ("approved", "unapproved"):
v = mr.user.model_dump()
v["status"] = mr.object_attributes.action
Expand All @@ -189,13 +228,23 @@ async def merge_request(
)

mri = await dbh.get_merge_request_ref_infos(mr)
card = render(mri)
should_be_collapsed: bool = mr.object_attributes.draft or mr.object_attributes.work_in_progress
card = render(
mri,
collapsed=should_be_collapsed,
show_collapsible=should_be_collapsed,
)
summary = (
f"MR {mri.merge_request_payload.object_attributes.state}:"
f" {mri.merge_request_payload.object_attributes.title}\n"
f"on {mri.merge_request_payload.project.path_with_namespace}"
)

convtoken_to_msgrefs = await get_or_create_message_refs(
mri.merge_request_ref_id,
conversation_tokens,
)

if mr.object_attributes.action in ("open", "reopen") or True:
async with httpx.AsyncClient() as client:
for ct in conversation_tokens:
Expand All @@ -215,21 +264,13 @@ async def merge_request(
or not participant_found,
)

if (
mr.object_attributes.action
in (
"merge",
"close",
)
or mr.object_attributes.state
in (
"closed",
"merged",
)
or mr.object_attributes.draft
or mr.object_attributes.work_in_progress
if mr.object_attributes.action in (
"merge",
"close",
) or mr.object_attributes.state in (
"closed",
"merged",
):

message_expiration = datetime.timedelta(seconds=30)
async with await database.acquire() as connection:
res = await connection.fetch(
Expand Down

0 comments on commit eb7fb80

Please sign in to comment.