From 14c48e866458849a689b4f1474a7ab530ed442d3 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 3 Jan 2025 22:41:27 -0300 Subject: [PATCH 1/8] feat(pacer): add command to fetch docs filtered by page count from PACER --- .../management/commands/pacer_bulk_fetch.py | 199 ++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 cl/search/management/commands/pacer_bulk_fetch.py diff --git a/cl/search/management/commands/pacer_bulk_fetch.py b/cl/search/management/commands/pacer_bulk_fetch.py new file mode 100644 index 0000000000..13fe3ec942 --- /dev/null +++ b/cl/search/management/commands/pacer_bulk_fetch.py @@ -0,0 +1,199 @@ +import logging +import time +from datetime import datetime + +from django.contrib.auth.models import User +from django.core.management.base import CommandError +from django.db.models import Q + +from cl.lib.command_utils import VerboseCommand +from cl.recap.models import REQUEST_TYPE, PacerFetchQueue +from cl.recap.tasks import do_pacer_fetch +from cl.search.models import Court, RECAPDocument + +logger = logging.getLogger(__name__) + + +class Command(VerboseCommand): + help = "Download multiple documents from PACER with rate limiting" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.user = None + self.recap_documents = None + self.courts_with_docs = {} + self.total_launched = 0 + self.total_errors = 0 + + def add_arguments(self, parser) -> None: + parser.add_argument( + "--request-interval", + type=float, + help="Seconds between requests", + ) + parser.add_argument( + "--min-page-count", + type=int, + help="Get docs with this number of pages or more", + ) + parser.add_argument( + "--max-page-count", + type=int, + help="Get docs with this number of pages or less", + ) + parser.add_argument( + "--username", + type=str, + required=True, + help="Username to associate with the processing queues", + ) + parser.add_argument( + "--testing", + type=str, + help="Prevents creation of log file", + ) + + @staticmethod + def setup_logging(testing: bool = False) -> None: + if not testing: + logging.basicConfig( + filename=f'pacer_bulk_fetch_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + ) + + def set_user(self, username: str) -> None: + """Get user or raise CommandError""" + if not username: + raise CommandError( + "No username provided, cannot create PacerFetchQueues." + ) + try: + self.user = User.objects.get(username=username) + except User.DoesNotExist: + raise CommandError(f"User {username} does not exist") + + def identify_documents(self, options: dict) -> None: + """Get eligible documents grouped by court""" + filters = [ + Q(pacer_doc_id__isnull=False), + Q(is_available=False), + ] + if options.get("min_page_count"): + filters.append(Q(page_count__gte=options["min_page_count"])) + if options.get("max_page_count"): + filters.append(Q(page_count__lte=options["max_page_count"])) + + self.recap_documents = ( + RECAPDocument.objects.filter(*filters) + .values( + "id", + "page_count", + "docket_entry__docket__court_id", + "pacer_doc_id", + ) + .order_by("-page_count") + ) + + courts = ( + Court.objects.filter( + dockets__docket_entries__recap_documents__in=[ + recap_doc_id["id"] for recap_doc_id in self.recap_documents + ] + ) + .order_by("pk") + .distinct() + ) + + for court in courts: + self.courts_with_docs[court.pk] = [ + doc + for doc in self.recap_documents + if doc["docket_entry__docket__court_id"] == court.pk + ] + + def enqueue_pacer_fetch(self, doc: dict) -> None: + fq = PacerFetchQueue.objects.create( + request_type=REQUEST_TYPE.PDF, + recap_document_id=doc.get("id"), + user_id=self.user.pk, + ) + do_pacer_fetch(fq) + + self.total_launched += 1 + logger.info( + f"Launched download for doc {doc.get('id')} from court {doc.get('docket_entry__docket__court_id')}" + f"Progress: {self.total_launched}/{len(self.recap_documents)}" + ) + + def execute_round( + self, remaining_courts: dict, options: dict, is_last_round: bool + ) -> dict: + remaining_courts_copy = ( + remaining_courts.copy() + ) # don't remove elements from list we're iterating over + court_keys = remaining_courts.keys() + for court_index, court_id in enumerate(court_keys): + doc = remaining_courts[court_id].pop(0) + + try: + self.enqueue_pacer_fetch(doc) + except Exception as e: + self.total_errors += 1 + logger.error( + f"Error queuing document {doc.get("id")}: {str(e)}", + exc_info=True, + ) + finally: + # If this court doesn't have any more docs, remove from dict: + if len(remaining_courts[court_id]) == 0: + remaining_courts_copy.pop(court_id) + # Don't sleep in very last iteration: + if not is_last_round or court_index < len(court_keys) - 1: + time.sleep(float(options.get("request_interval", 2.0))) + + return remaining_courts_copy + + def process_documents(self, options: dict) -> None: + """Process documents in round-robin fashion by court""" + remaining_courts = self.courts_with_docs + court_doc_counts = [ + len(self.courts_with_docs[court_id]) + for court_id in self.courts_with_docs.keys() + ] + rounds = max(court_doc_counts) + + for i in range(rounds): + is_last_round = i == rounds - 1 + remaining_courts = self.execute_round( + remaining_courts, options, is_last_round + ) + + if self.total_errors: + logger.error( + f"Finished processing with {self.total_errors} error{"s" if self.total_errors > 1 else ""}." + ) + + def handle(self, *args, **options) -> None: + self.setup_logging(options.get("testing", False)) + + logger.info("Starting pacer_bulk_fetch command") + + try: + self.set_user(options.get("username", "")) + + self.identify_documents(options) + + logger.info( + f"{self.user} found {len(self.recap_documents)} documents across {len(self.courts_with_docs)} courts." + ) + + self.process_documents(options) + + logger.info( + f"Created {self.total_launched} processing queues for a total of {len(self.recap_documents)} docs found." + ) + + except Exception as e: + logger.error(f"Fatal error in command: {str(e)}", exc_info=True) + raise From b85778a92dfb1988fc5e67cd56eca18377041b89 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Mon, 6 Jan 2025 23:45:36 -0300 Subject: [PATCH 2/8] test(pacer): introduce tests for command to fetch docs from PACER --- cl/search/tests/test_pacer_bulk_fetch.py | 192 +++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 cl/search/tests/test_pacer_bulk_fetch.py diff --git a/cl/search/tests/test_pacer_bulk_fetch.py b/cl/search/tests/test_pacer_bulk_fetch.py new file mode 100644 index 0000000000..254b6ed841 --- /dev/null +++ b/cl/search/tests/test_pacer_bulk_fetch.py @@ -0,0 +1,192 @@ +import random +from unittest.mock import patch + +from cl.recap.models import PacerFetchQueue +from cl.search.factories import ( + CourtFactory, + DocketEntryFactory, + DocketFactory, + RECAPDocumentFactory, +) +from cl.search.management.commands.pacer_bulk_fetch import Command +from cl.search.models import Docket, RECAPDocument +from cl.tests.cases import TestCase +from cl.users.factories import UserFactory + + +class BulkFetchPacerDocsTest(TestCase): + @classmethod + def setUpTestData(cls): + cls.user = UserFactory() + + cls.courts = [CourtFactory() for _ in range(6)] + + dockets_per_court = 15 + entries_per_docket = 8 + + page_count_ranges = [ + (1000, 2000), + (500, 999), + (100, 499), + (1, 99), + ] + cls.big_page_count = 1000 + cls.big_docs_count = 0 + + for court in cls.courts: + [DocketFactory(court=court) for _ in range(dockets_per_court)] + + for docket in Docket.objects.all(): + docket_entries = [ + DocketEntryFactory(docket=docket) + for _ in range(entries_per_docket) + ] + + for de in docket_entries: + min_pages, max_pages = random.choice(page_count_ranges) + page_count = random.randint(min_pages, max_pages) + cls.big_docs_count += 1 if page_count >= 1000 else 0 + RECAPDocumentFactory( + docket_entry=de, + page_count=page_count, + is_available=False, + ) + + def setUp(self): + self.command = Command() + self.big_docs_created = RECAPDocument.objects.filter( + page_count__gte=self.big_page_count, + is_available=False, + pacer_doc_id__isnull=False, + ) + self.assertEqual(self.big_docs_count, self.big_docs_created.count()) + + @patch("time.sleep") + @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") + def test_document_filtering( + self, + mock_fetch, + mock_sleep, + ): + """Test document filtering according to command arguments passed.""" + self.command.handle( + min_page_count=self.big_page_count, + request_interval=1.0, + username=self.user.username, + testing=True, + ) + + self.assertEqual( + mock_fetch.call_count, + self.big_docs_count, + f"Expected {self.big_docs_count} documents to be processed", + ) + + fetch_queues = PacerFetchQueue.objects.all() + self.assertEqual( + fetch_queues.count(), + self.big_docs_count, + f"Expected {self.big_docs_count} fetch queues", + ) + + enqueued_doc_ids = [fq.recap_document_id for fq in fetch_queues] + big_doc_ids = self.big_docs_created.values_list("id", flat=True) + self.assertSetEqual(set(enqueued_doc_ids), set(big_doc_ids)) + + @patch("time.sleep") + @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") + def test_rate_limiting(self, mock_fetch, mock_sleep): + """Test rate limiting.""" + interval = 2.0 + self.command.handle( + min_page_count=1000, + request_interval=interval, + username=self.user.username, + testing=True, + ) + + self.assertEqual( + mock_sleep.call_count, + mock_fetch.call_count - 1, + "Sleep should be called between each fetch", + ) + + for call in mock_sleep.call_args_list: + self.assertEqual( + call.args[0], + interval, + f"Expected sleep interval of {interval} seconds", + ) + + @patch("time.sleep") + @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") + def test_error_handling(self, mock_fetch, mock_sleep): + """Test that errors are handled gracefully""" + mock_fetch.side_effect = Exception("PACER API error") + + self.command.handle( + min_page_count=1000, + request_interval=1.0, + username=self.user.username, + testing=True, + ) + + self.assertEqual( + PacerFetchQueue.objects.count(), + self.big_docs_count, + ) + + @patch("time.sleep") + @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") + def test_round_robin(self, mock_fetch, mock_sleep): + """ + Verify that each call to 'execute_round' never processes the same court + more than once. + """ + calls_per_round = [] + original_execute_round = self.command.execute_round + + def track_rounds_side_effect(remaining_courts, options, is_last_round): + """ + Compares the mock_fetch calls before and after calling execute_round, + then saves new calls that occurred during this round. + """ + start_index = len(mock_fetch.call_args_list) + updated_remaining = original_execute_round( + remaining_courts, options, is_last_round + ) + end_index = len(mock_fetch.call_args_list) + current_round_calls = mock_fetch.call_args_list[ + start_index:end_index + ] + calls_per_round.append(current_round_calls) + + return updated_remaining + + with patch.object( + Command, "execute_round", side_effect=track_rounds_side_effect + ): + # Run command with patched execute_round to save do_pacer_fetch + # calls in each round + self.command.handle( + min_page_count=1000, + request_interval=1.0, + username=self.user.username, + testing=True, + ) + + for round_index, round_calls in enumerate(calls_per_round, start=1): + court_ids_this_round = [] + + for call in round_calls: + fetch_queue_obj = call.args[0] + court_id = ( + fetch_queue_obj.recap_document.docket_entry.docket.court_id + ) + court_ids_this_round.append(court_id) + + self.assertEqual( + len(court_ids_this_round), + len(set(court_ids_this_round)), + f"Round {round_index} had duplicate courts: {court_ids_this_round}", + ) From 9dca38355181ce6a1f191eb9d9bc43296fe21428 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Tue, 7 Jan 2025 18:54:59 -0300 Subject: [PATCH 3/8] refactor(recap): abstract PACER doc fetch chain build from do_pacer_fetch - introduces new build_pdf_retrieval_task_chain method - refactors do_pacer_fetch to now use that new method instead --- cl/recap/tasks.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/cl/recap/tasks.py b/cl/recap/tasks.py index 026b1ca2ef..13f7bf00f4 100644 --- a/cl/recap/tasks.py +++ b/cl/recap/tasks.py @@ -141,6 +141,16 @@ async def process_recap_upload(pq: ProcessingQueue) -> None: docket = await process_recap_acms_docket(pq.pk) +def build_pdf_retrieval_task_chain(fq: PacerFetchQueue): + # Request by recap_document_id + rd_pk = fq.recap_document_id + return chain( + fetch_pacer_doc_by_rd.si(rd_pk, fq.pk), + extract_recap_pdf.si(rd_pk), + mark_fq_successful.si(fq.pk), + ) + + def do_pacer_fetch(fq: PacerFetchQueue): """Process a request made by a user to get an item from PACER. @@ -156,13 +166,7 @@ def do_pacer_fetch(fq: PacerFetchQueue): ) result = c.apply_async() elif fq.request_type == REQUEST_TYPE.PDF: - # Request by recap_document_id - rd_pk = fq.recap_document_id - result = chain( - fetch_pacer_doc_by_rd.si(rd_pk, fq.pk), - extract_recap_pdf.si(rd_pk), - mark_fq_successful.si(fq.pk), - ).apply_async() + result = build_pdf_retrieval_task_chain(fq).apply_async() elif fq.request_type == REQUEST_TYPE.ATTACHMENT_PAGE: result = fetch_attachment_page.apply_async(args=(fq.pk,)) return result From 6e4b14b97568b5eea7750f8173b4f2671b637552 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Wed, 8 Jan 2025 17:28:18 -0300 Subject: [PATCH 4/8] refactor(pacer): bulk fetch command now uses CeleryThrottle --- .../management/commands/pacer_bulk_fetch.py | 51 +++++++++++++++---- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/cl/search/management/commands/pacer_bulk_fetch.py b/cl/search/management/commands/pacer_bulk_fetch.py index 13fe3ec942..ea0d19fc3a 100644 --- a/cl/search/management/commands/pacer_bulk_fetch.py +++ b/cl/search/management/commands/pacer_bulk_fetch.py @@ -1,14 +1,16 @@ import logging -import time from datetime import datetime from django.contrib.auth.models import User from django.core.management.base import CommandError from django.db.models import Q +from cl import settings +from cl.lib.celery_utils import CeleryThrottle from cl.lib.command_utils import VerboseCommand +from cl.lib.pacer_session import get_or_cache_pacer_cookies from cl.recap.models import REQUEST_TYPE, PacerFetchQueue -from cl.recap.tasks import do_pacer_fetch +from cl.recap.tasks import build_pdf_retrieval_task_chain from cl.search.models import Court, RECAPDocument logger = logging.getLogger(__name__) @@ -24,6 +26,10 @@ def __init__(self, *args, **kwargs): self.courts_with_docs = {} self.total_launched = 0 self.total_errors = 0 + self.pacer_username = None + self.pacer_password = None + self.throttle = None + self.queue_name = None def add_arguments(self, parser) -> None: parser.add_argument( @@ -44,8 +50,12 @@ def add_arguments(self, parser) -> None: parser.add_argument( "--username", type=str, - required=True, - help="Username to associate with the processing queues", + help="Username to associate with the processing queues (defaults to 'recap-email')", + ) + parser.add_argument( + "--queue-name", + type=str, + help="Celery queue name used for processing tasks", ) parser.add_argument( "--testing", @@ -62,6 +72,25 @@ def setup_logging(testing: bool = False) -> None: format="%(asctime)s - %(levelname)s - %(message)s", ) + def setup_celery(self, options) -> None: + """Setup Celery by setting the queue_name and throttle.""" + self.queue_name = options.get("queue_name", "pacer_bulk_fetch") + self.throttle = CeleryThrottle(queue_name=self.queue_name) + + def handle_pacer_session(self, options) -> None: + """Make sure we have an active PACER session for the user.""" + self.pacer_username = options.get( + "pacer_username", settings.PACER_USERNAME + ) + self.pacer_password = options.get( + "pacer_password", settings.PACER_PASSWORD + ) + get_or_cache_pacer_cookies( + self.user.pk, + username=self.pacer_username, + password=self.pacer_password, + ) + def set_user(self, username: str) -> None: """Get user or raise CommandError""" if not username: @@ -113,17 +142,18 @@ def identify_documents(self, options: dict) -> None: ] def enqueue_pacer_fetch(self, doc: dict) -> None: + self.throttle.maybe_wait() + fq = PacerFetchQueue.objects.create( request_type=REQUEST_TYPE.PDF, recap_document_id=doc.get("id"), user_id=self.user.pk, ) - do_pacer_fetch(fq) - + build_pdf_retrieval_task_chain(fq).apply_async(queue=self.queue_name) self.total_launched += 1 logger.info( f"Launched download for doc {doc.get('id')} from court {doc.get('docket_entry__docket__court_id')}" - f"Progress: {self.total_launched}/{len(self.recap_documents)}" + f"\nProgress: {self.total_launched}/{len(self.recap_documents)}" ) def execute_round( @@ -148,9 +178,6 @@ def execute_round( # If this court doesn't have any more docs, remove from dict: if len(remaining_courts[court_id]) == 0: remaining_courts_copy.pop(court_id) - # Don't sleep in very last iteration: - if not is_last_round or court_index < len(court_keys) - 1: - time.sleep(float(options.get("request_interval", 2.0))) return remaining_courts_copy @@ -176,11 +203,13 @@ def process_documents(self, options: dict) -> None: def handle(self, *args, **options) -> None: self.setup_logging(options.get("testing", False)) + self.setup_celery(options) logger.info("Starting pacer_bulk_fetch command") try: - self.set_user(options.get("username", "")) + self.set_user(options.get("username", "recap-email")) + self.handle_pacer_session(options) self.identify_documents(options) From 88df4316e64c9008c4b987d0a2ccf4f29e2c94b2 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Wed, 8 Jan 2025 17:30:24 -0300 Subject: [PATCH 5/8] test(pacer): update tests for new implementation --- cl/search/tests/test_pacer_bulk_fetch.py | 131 +++++++++++++++-------- 1 file changed, 87 insertions(+), 44 deletions(-) diff --git a/cl/search/tests/test_pacer_bulk_fetch.py b/cl/search/tests/test_pacer_bulk_fetch.py index 254b6ed841..1998cbc104 100644 --- a/cl/search/tests/test_pacer_bulk_fetch.py +++ b/cl/search/tests/test_pacer_bulk_fetch.py @@ -1,5 +1,5 @@ import random -from unittest.mock import patch +from unittest.mock import MagicMock, patch from cl.recap.models import PacerFetchQueue from cl.search.factories import ( @@ -61,14 +61,26 @@ def setUp(self): ) self.assertEqual(self.big_docs_count, self.big_docs_created.count()) - @patch("time.sleep") - @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") + @patch( + "cl.search.management.commands.pacer_bulk_fetch.CeleryThrottle.maybe_wait" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.build_pdf_retrieval_task_chain" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.get_or_cache_pacer_cookies" + ) def test_document_filtering( self, - mock_fetch, - mock_sleep, + mock_pacer_cookies, + mock_chain_builder, + mock_throttle, ): """Test document filtering according to command arguments passed.""" + # Setup mock chain + mock_chain = MagicMock() + mock_chain_builder.return_value = mock_chain + self.command.handle( min_page_count=self.big_page_count, request_interval=1.0, @@ -77,7 +89,7 @@ def test_document_filtering( ) self.assertEqual( - mock_fetch.call_count, + mock_chain.apply_async.call_count, self.big_docs_count, f"Expected {self.big_docs_count} documents to be processed", ) @@ -93,40 +105,58 @@ def test_document_filtering( big_doc_ids = self.big_docs_created.values_list("id", flat=True) self.assertSetEqual(set(enqueued_doc_ids), set(big_doc_ids)) - @patch("time.sleep") - @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") - def test_rate_limiting(self, mock_fetch, mock_sleep): + @patch( + "cl.search.management.commands.pacer_bulk_fetch.CeleryThrottle.maybe_wait" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.build_pdf_retrieval_task_chain" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.get_or_cache_pacer_cookies" + ) + def test_rate_limiting( + self, + mock_pacer_cookies, + mock_chain_builder, + mock_throttle, + ): """Test rate limiting.""" - interval = 2.0 + # Setup mock chain + mock_chain = MagicMock() + mock_chain_builder.return_value = mock_chain + self.command.handle( min_page_count=1000, - request_interval=interval, username=self.user.username, testing=True, ) self.assertEqual( - mock_sleep.call_count, - mock_fetch.call_count - 1, - "Sleep should be called between each fetch", + mock_throttle.call_count, + self.big_docs_count, + "CeleryThrottle.maybe_wait should be called for each document", ) - for call in mock_sleep.call_args_list: - self.assertEqual( - call.args[0], - interval, - f"Expected sleep interval of {interval} seconds", - ) - - @patch("time.sleep") - @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") - def test_error_handling(self, mock_fetch, mock_sleep): + @patch( + "cl.search.management.commands.pacer_bulk_fetch.CeleryThrottle.maybe_wait" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.build_pdf_retrieval_task_chain" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.get_or_cache_pacer_cookies" + ) + def test_error_handling( + self, + mock_pacer_cookies, + mock_chain_builder, + mock_throttle, + ): """Test that errors are handled gracefully""" - mock_fetch.side_effect = Exception("PACER API error") + mock_chain_builder.side_effect = Exception("Chain building error") self.command.handle( min_page_count=1000, - request_interval=1.0, username=self.user.username, testing=True, ) @@ -134,40 +164,56 @@ def test_error_handling(self, mock_fetch, mock_sleep): self.assertEqual( PacerFetchQueue.objects.count(), self.big_docs_count, + "PacerFetchQueue objects should still be created even if chain building fails", ) - @patch("time.sleep") - @patch("cl.search.management.commands.pacer_bulk_fetch.do_pacer_fetch") - def test_round_robin(self, mock_fetch, mock_sleep): + @patch( + "cl.search.management.commands.pacer_bulk_fetch.CeleryThrottle.maybe_wait" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.build_pdf_retrieval_task_chain" + ) + @patch( + "cl.search.management.commands.pacer_bulk_fetch.get_or_cache_pacer_cookies" + ) + def test_round_robin( + self, + mock_pacer_cookies, + mock_chain_builder, + mock_throttle, + ): """ Verify that each call to 'execute_round' never processes the same court more than once. """ + mock_chain = MagicMock() + mock_chain_builder.return_value = mock_chain + calls_per_round = [] original_execute_round = self.command.execute_round def track_rounds_side_effect(remaining_courts, options, is_last_round): """ - Compares the mock_fetch calls before and after calling execute_round, - then saves new calls that occurred during this round. + Tracks PacerFetchQueue creation before and after calling execute_round + to identify which courts were processed in each round. """ - start_index = len(mock_fetch.call_args_list) + start_count = PacerFetchQueue.objects.count() updated_remaining = original_execute_round( remaining_courts, options, is_last_round ) - end_index = len(mock_fetch.call_args_list) - current_round_calls = mock_fetch.call_args_list[ - start_index:end_index + end_count = PacerFetchQueue.objects.count() + + # Get the fetch queues created in this round + current_round_queues = PacerFetchQueue.objects.order_by("pk")[ + start_count:end_count ] - calls_per_round.append(current_round_calls) + calls_per_round.append(current_round_queues) return updated_remaining with patch.object( Command, "execute_round", side_effect=track_rounds_side_effect ): - # Run command with patched execute_round to save do_pacer_fetch - # calls in each round self.command.handle( min_page_count=1000, request_interval=1.0, @@ -175,14 +221,11 @@ def track_rounds_side_effect(remaining_courts, options, is_last_round): testing=True, ) - for round_index, round_calls in enumerate(calls_per_round, start=1): + for round_index, round_queues in enumerate(calls_per_round, start=1): court_ids_this_round = [] - for call in round_calls: - fetch_queue_obj = call.args[0] - court_id = ( - fetch_queue_obj.recap_document.docket_entry.docket.court_id - ) + for queue in round_queues: + court_id = queue.recap_document.docket_entry.docket.court_id court_ids_this_round.append(court_id) self.assertEqual( From 2915cb9397a1f2e70859da555fd6442c5c3665ea Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Thu, 9 Jan 2025 11:42:36 -0300 Subject: [PATCH 6/8] fix(pacer): enable rate limiting for task to fetch doc from PACER --- cl/recap/tasks.py | 11 ++++++++--- .../management/commands/pacer_bulk_fetch.py | 17 +++++++++++------ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/cl/recap/tasks.py b/cl/recap/tasks.py index 13f7bf00f4..d9d95f4279 100644 --- a/cl/recap/tasks.py +++ b/cl/recap/tasks.py @@ -141,11 +141,15 @@ async def process_recap_upload(pq: ProcessingQueue) -> None: docket = await process_recap_acms_docket(pq.pk) -def build_pdf_retrieval_task_chain(fq: PacerFetchQueue): - # Request by recap_document_id +def build_pdf_retrieval_task_chain( + fq: PacerFetchQueue, rate_limit: str = None +): rd_pk = fq.recap_document_id + pacer_fetch_task = fetch_pacer_doc_by_rd.si(rd_pk, fq.pk) + if rate_limit: + pacer_fetch_task = pacer_fetch_task.set(rate_limit=rate_limit) return chain( - fetch_pacer_doc_by_rd.si(rd_pk, fq.pk), + pacer_fetch_task, extract_recap_pdf.si(rd_pk), mark_fq_successful.si(fq.pk), ) @@ -166,6 +170,7 @@ def do_pacer_fetch(fq: PacerFetchQueue): ) result = c.apply_async() elif fq.request_type == REQUEST_TYPE.PDF: + # Request by recap_document_id result = build_pdf_retrieval_task_chain(fq).apply_async() elif fq.request_type == REQUEST_TYPE.ATTACHMENT_PAGE: result = fetch_attachment_page.apply_async(args=(fq.pk,)) diff --git a/cl/search/management/commands/pacer_bulk_fetch.py b/cl/search/management/commands/pacer_bulk_fetch.py index ea0d19fc3a..140ec5395c 100644 --- a/cl/search/management/commands/pacer_bulk_fetch.py +++ b/cl/search/management/commands/pacer_bulk_fetch.py @@ -30,12 +30,13 @@ def __init__(self, *args, **kwargs): self.pacer_password = None self.throttle = None self.queue_name = None + self.rate_limit = None def add_arguments(self, parser) -> None: parser.add_argument( - "--request-interval", + "--rate-limit", type=float, - help="Seconds between requests", + help="The maximum rate for requests, e.g. '1/m', or '10/2h' or similar. Defaults to 1/2s", ) parser.add_argument( "--min-page-count", @@ -50,7 +51,7 @@ def add_arguments(self, parser) -> None: parser.add_argument( "--username", type=str, - help="Username to associate with the processing queues (defaults to 'recap-email')", + help="Username to associate with the processing queues (defaults to 'recap')", ) parser.add_argument( "--queue-name", @@ -73,8 +74,9 @@ def setup_logging(testing: bool = False) -> None: ) def setup_celery(self, options) -> None: - """Setup Celery by setting the queue_name and throttle.""" + """Setup Celery by setting the queue_name, rate_limit and throttle.""" self.queue_name = options.get("queue_name", "pacer_bulk_fetch") + self.rate_limit = options.get("rate_limit", "1/2s") self.throttle = CeleryThrottle(queue_name=self.queue_name) def handle_pacer_session(self, options) -> None: @@ -149,7 +151,10 @@ def enqueue_pacer_fetch(self, doc: dict) -> None: recap_document_id=doc.get("id"), user_id=self.user.pk, ) - build_pdf_retrieval_task_chain(fq).apply_async(queue=self.queue_name) + build_pdf_retrieval_task_chain( + fq, + rate_limit=self.rate_limit, + ).apply_async(queue=self.queue_name) self.total_launched += 1 logger.info( f"Launched download for doc {doc.get('id')} from court {doc.get('docket_entry__docket__court_id')}" @@ -208,7 +213,7 @@ def handle(self, *args, **options) -> None: logger.info("Starting pacer_bulk_fetch command") try: - self.set_user(options.get("username", "recap-email")) + self.set_user(options.get("username", "recap")) self.handle_pacer_session(options) self.identify_documents(options) From e7e7d14250e8b54bc2ac86f5980e56106c901453 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Thu, 9 Jan 2025 11:44:46 -0300 Subject: [PATCH 7/8] test(pacer): test rate limiting --- cl/search/tests/test_pacer_bulk_fetch.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cl/search/tests/test_pacer_bulk_fetch.py b/cl/search/tests/test_pacer_bulk_fetch.py index 1998cbc104..c5ee60e6e6 100644 --- a/cl/search/tests/test_pacer_bulk_fetch.py +++ b/cl/search/tests/test_pacer_bulk_fetch.py @@ -125,12 +125,24 @@ def test_rate_limiting( mock_chain = MagicMock() mock_chain_builder.return_value = mock_chain + rate_limit = "10/m" self.command.handle( min_page_count=1000, + rate_limit=rate_limit, username=self.user.username, testing=True, ) + # Verify the rate limit was passed correctly + for call in mock_chain_builder.call_args_list: + with self.subTest(call=call): + _, kwargs = call + self.assertEqual( + kwargs.get("rate_limit"), + rate_limit, + "Rate limit should be passed to chain builder", + ) + self.assertEqual( mock_throttle.call_count, self.big_docs_count, From 00eabb5574f57b813cb71dd29b6fd629405d1ef0 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Thu, 9 Jan 2025 11:45:52 -0300 Subject: [PATCH 8/8] test(pacer): enhance test by adding subTest for round-robin asserts --- cl/search/tests/test_pacer_bulk_fetch.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cl/search/tests/test_pacer_bulk_fetch.py b/cl/search/tests/test_pacer_bulk_fetch.py index c5ee60e6e6..ec5219a65a 100644 --- a/cl/search/tests/test_pacer_bulk_fetch.py +++ b/cl/search/tests/test_pacer_bulk_fetch.py @@ -240,8 +240,12 @@ def track_rounds_side_effect(remaining_courts, options, is_last_round): court_id = queue.recap_document.docket_entry.docket.court_id court_ids_this_round.append(court_id) - self.assertEqual( - len(court_ids_this_round), - len(set(court_ids_this_round)), - f"Round {round_index} had duplicate courts: {court_ids_this_round}", - ) + with self.subTest( + court_ids_this_round=court_ids_this_round, + round_index=round_index, + ): + self.assertEqual( + len(court_ids_this_round), + len(set(court_ids_this_round)), + f"Round {round_index} had duplicate courts: {court_ids_this_round}", + )