Skip to content

Commit

Permalink
feat(api): added auto-retries to batch scoring (#773)
Browse files Browse the repository at this point in the history
* feat(api): added auto-retries to batch scoring

* removed duplicate test file
  • Loading branch information
lucianHymer authored Jan 28, 2025
1 parent 6ceeb92 commit f055ed6
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from scorer.settings import (
BULK_MODEL_SCORE_BATCH_SIZE,
BULK_MODEL_SCORE_REQUESTS_RESULTS_FOLDER,
BULK_MODEL_SCORE_RETRY_SLEEP,
BULK_SCORE_REQUESTS_ADDRESS_LIST_FOLDER,
BULK_SCORE_REQUESTS_BUCKET_NAME,
S3_BUCKET,
Expand Down Expand Up @@ -169,39 +170,52 @@ def update_average_duration(self, duration):
) / self.total_lambda_calls

async def process_address(self, address, model_list):
try:
start_time = time.time()
analysis = await handle_get_analysis(address, model_list, False, True)
end_time = time.time()
duration = end_time - start_time

self.update_average_duration(duration)

details_dict = {
"models": {
model: {
"score": score.score,
"num_transactions": score.num_transactions,
"first_funder": score.first_funder,
"first_funder_amount": score.first_funder_amount,
"first_funder_timestamp": score.first_funder_timestamp,
}
for model, score in analysis.details.models.items()
num_attempts = 5
last_attempt = num_attempts - 1

for attempt in range(num_attempts):
try:
return await self._process_address(address, model_list)
except Exception as e:
if attempt == last_attempt:
self.stderr.write(
self.style.ERROR(
f"Error processing address {address}: {str(e)}"
)
)
return address, e
else:
await asyncio.sleep(BULK_MODEL_SCORE_RETRY_SLEEP)

async def _process_address(self, address, model_list):
start_time = time.time()
analysis = await handle_get_analysis(address, model_list, False, True)
end_time = time.time()
duration = end_time - start_time

self.update_average_duration(duration)

details_dict = {
"models": {
model: {
"score": score.score,
"num_transactions": score.num_transactions,
"first_funder": score.first_funder,
"first_funder_amount": score.first_funder_amount,
"first_funder_timestamp": score.first_funder_timestamp,
}
for model, score in analysis.details.models.items()
}
result = json.dumps(details_dict)
}
result = json.dumps(details_dict)

self.stdout.write(f"Processed address {address}:")
self.stdout.write(f" Duration: {duration:.2f} seconds")
self.stdout.write(
f" Average Duration: {self.average_lambda_duration:.2f} seconds"
)
self.stdout.write(f"Processed address {address}:")
self.stdout.write(f" Duration: {duration:.2f} seconds")
self.stdout.write(
f" Average Duration: {self.average_lambda_duration:.2f} seconds"
)

return address, result
except Exception as e:
error = self.style.ERROR(f"Error processing address {address}: {str(e)}")
self.stderr.write(error)
return address, error
return address, result

async def create_and_upload_results_csv(self, request_id, results, filename):
csv_buffer = StringIO()
Expand Down
90 changes: 0 additions & 90 deletions api/registry/test/test_command_process_batch_address_upload.py

This file was deleted.

1 change: 1 addition & 0 deletions api/scorer/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@
"BULK_MODEL_SCORE_REQUESTS_RESULTS_FOLDER", default="model-score-results"
)
BULK_MODEL_SCORE_BATCH_SIZE = env("BULK_MODEL_SCORE_BATCH_SIZE", default=50)
BULK_MODEL_SCORE_RETRY_SLEEP = env("BULK_MODEL_SCORE_RETRY_SLEEP", default=10)

S3_BUCKET = env("S3_BUCKET", default="bulk-score-requests")
S3_OBJECT_KEY = env("S3_OBJECT_KEY", default="test_file.csv")
Expand Down
Loading

0 comments on commit f055ed6

Please sign in to comment.