Skip to content

Commit

Permalink
Retry the file if parser fails
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Jan 9, 2025
1 parent 0db8e2d commit e31fb6e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
19 changes: 18 additions & 1 deletion cohere/compass/clients/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,24 @@

# 3rd party imports
import requests
from requests.exceptions import InvalidSchema
from tenacity import (
retry,
retry_if_not_exception_type,
stop_after_attempt,
wait_fixed,
)

# Local imports
from cohere.compass import (
ProcessFileParameters,
)
from cohere.compass.constants import DEFAULT_MAX_ACCEPTED_FILE_SIZE_BYTES
from cohere.compass.constants import (
DEFAULT_MAX_ACCEPTED_FILE_SIZE_BYTES,
DEFAULT_MAX_RETRIES,
DEFAULT_SLEEP_RETRY_SECONDS,
)
from cohere.compass.exceptions import CompassClientError
from cohere.compass.models import (
CompassDocument,
MetadataConfig,
Expand Down Expand Up @@ -195,6 +207,11 @@ def _get_metadata(
else:
return custom_context

@retry(
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
wait=wait_fixed(DEFAULT_SLEEP_RETRY_SECONDS),
retry=retry_if_not_exception_type((InvalidSchema, CompassClientError)),
)
def process_file(
self,
*,
Expand Down
16 changes: 12 additions & 4 deletions cohere/compass/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Python imports
import base64
import glob
import logging
import os
import uuid
from collections.abc import Iterable, Iterator
Expand All @@ -23,18 +24,18 @@
T = TypeVar("T")
U = TypeVar("U")

logger = logging.getLogger(__name__)


def imap_queued(
executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int
) -> Iterator[U]:
"""
Similar to Python's `map`, but uses an executor to parallelize the calls.
:param executor: the executor to use.
:param f: the function to call.
:param it: the iterable to map over.
:param max_queued: the maximum number of futures to keep in flight.
:returns: an iterator over the results.
"""
assert max_queued >= 1
Expand All @@ -46,11 +47,18 @@ def imap_queued(
done, futures_set = futures.wait(
futures_set, return_when=futures.FIRST_COMPLETED
)

for future in done:
yield future.result()
try:
yield future.result()
except Exception as e:
logger.exception(f"Error in processing file: {e}")

for future in futures.as_completed(futures_set):
yield future.result()
try:
yield future.result()
except Exception as e:
logger.exception(f"Error in processing file: {e}")


def get_fs(document_path: str) -> AbstractFileSystem:
Expand Down

0 comments on commit e31fb6e

Please sign in to comment.