Skip to content

Commit

Permalink
Refactor to allow cancellation of sleeps, and react to shutdown.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelkaye committed Sep 19, 2023
1 parent c610a0f commit f0a1279
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
1 change: 1 addition & 0 deletions trafficlight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async def startup() -> None:
@app.after_serving
async def shutdown() -> None:
trafficlight.http.adapter.stop_background_tasks = True
await trafficlight.http.adapter.interrupt_tasks()
if kiwi.kiwi_client:
await kiwi.kiwi_client.end_run()
await adapter_shutdown()
Expand Down
27 changes: 24 additions & 3 deletions trafficlight/http/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, cast
from typing import Any, Dict, Set, cast

from quart import Blueprint, current_app, request
from werkzeug.utils import secure_filename
Expand Down Expand Up @@ -63,6 +63,7 @@ async def run() -> None:

current_app.add_background_task(run)
return

logger.debug(
"Not enough client_types to run any test(have %s)",
[str(item) for item in available_adapters],
Expand Down Expand Up @@ -115,20 +116,40 @@ async def cleanup_unresponsive_adapters() -> None:
)


sleeping_tasks: Set[asyncio.Future[None]] = set()


async def interrupt_tasks() -> None:
for task in sleeping_tasks:
task.cancel()


async def loop_cleanup_unresponsive_adapters() -> None:
while not stop_background_tasks:
logging.info("Running sweep for idle adapters")
await cleanup_unresponsive_adapters()
await asyncio.sleep(30)

sleep_task: asyncio.Future[None] = asyncio.ensure_future(asyncio.sleep(30))
try:
sleeping_tasks.add(sleep_task)
except asyncio.CancelledError:
pass # we don't mind this task being cancelled.
finally:
sleeping_tasks.remove(sleep_task)
logging.info("Finished sweep task")


async def loop_check_for_new_tests() -> None:
while not stop_background_tasks:
logging.info("Running sweep for new tests")
await check_for_new_tests()
await asyncio.sleep(30)
sleep_task: asyncio.Future[None] = asyncio.ensure_future(asyncio.sleep(30))
try:
sleeping_tasks.add(sleep_task)
except asyncio.CancelledError:
pass # we don't mind this task being cancelled.
finally:
sleeping_tasks.remove(sleep_task)
logging.info("Finished new test task")


Expand Down

0 comments on commit f0a1279

Please sign in to comment.