Skip to content

Commit

Permalink
fix(pacer): enable rate limiting for task to fetch doc from PACER
Browse files Browse the repository at this point in the history
  • Loading branch information
elisa-a-v committed Jan 9, 2025
1 parent 88df431 commit 2915cb9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
11 changes: 8 additions & 3 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,15 @@ async def process_recap_upload(pq: ProcessingQueue) -> None:
docket = await process_recap_acms_docket(pq.pk)


def build_pdf_retrieval_task_chain(fq: PacerFetchQueue):
# Request by recap_document_id
def build_pdf_retrieval_task_chain(
fq: PacerFetchQueue, rate_limit: str = None
):
rd_pk = fq.recap_document_id
pacer_fetch_task = fetch_pacer_doc_by_rd.si(rd_pk, fq.pk)
if rate_limit:
pacer_fetch_task = pacer_fetch_task.set(rate_limit=rate_limit)
return chain(
fetch_pacer_doc_by_rd.si(rd_pk, fq.pk),
pacer_fetch_task,
extract_recap_pdf.si(rd_pk),
mark_fq_successful.si(fq.pk),
)
Expand All @@ -166,6 +170,7 @@ def do_pacer_fetch(fq: PacerFetchQueue):
)
result = c.apply_async()
elif fq.request_type == REQUEST_TYPE.PDF:
# Request by recap_document_id
result = build_pdf_retrieval_task_chain(fq).apply_async()
elif fq.request_type == REQUEST_TYPE.ATTACHMENT_PAGE:
result = fetch_attachment_page.apply_async(args=(fq.pk,))
Expand Down
17 changes: 11 additions & 6 deletions cl/search/management/commands/pacer_bulk_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ def __init__(self, *args, **kwargs):
self.pacer_password = None
self.throttle = None
self.queue_name = None
self.rate_limit = None

def add_arguments(self, parser) -> None:
parser.add_argument(
"--request-interval",
"--rate-limit",
type=float,
help="Seconds between requests",
help="The maximum rate for requests, e.g. '1/m', or '10/2h' or similar. Defaults to 1/2s",
)
parser.add_argument(
"--min-page-count",
Expand All @@ -50,7 +51,7 @@ def add_arguments(self, parser) -> None:
parser.add_argument(
"--username",
type=str,
help="Username to associate with the processing queues (defaults to 'recap-email')",
help="Username to associate with the processing queues (defaults to 'recap')",
)
parser.add_argument(
"--queue-name",
Expand All @@ -73,8 +74,9 @@ def setup_logging(testing: bool = False) -> None:
)

def setup_celery(self, options) -> None:
"""Setup Celery by setting the queue_name and throttle."""
"""Setup Celery by setting the queue_name, rate_limit and throttle."""
self.queue_name = options.get("queue_name", "pacer_bulk_fetch")
self.rate_limit = options.get("rate_limit", "1/2s")
self.throttle = CeleryThrottle(queue_name=self.queue_name)

def handle_pacer_session(self, options) -> None:
Expand Down Expand Up @@ -149,7 +151,10 @@ def enqueue_pacer_fetch(self, doc: dict) -> None:
recap_document_id=doc.get("id"),
user_id=self.user.pk,
)
build_pdf_retrieval_task_chain(fq).apply_async(queue=self.queue_name)
build_pdf_retrieval_task_chain(
fq,
rate_limit=self.rate_limit,
).apply_async(queue=self.queue_name)
self.total_launched += 1
logger.info(
f"Launched download for doc {doc.get('id')} from court {doc.get('docket_entry__docket__court_id')}"
Expand Down Expand Up @@ -208,7 +213,7 @@ def handle(self, *args, **options) -> None:
logger.info("Starting pacer_bulk_fetch command")

try:
self.set_user(options.get("username", "recap-email"))
self.set_user(options.get("username", "recap"))
self.handle_pacer_session(options)

self.identify_documents(options)
Expand Down

0 comments on commit 2915cb9

Please sign in to comment.