Skip to content

Commit

Permalink
fix(backend): Fix error pin output not being propagated into the next…
Browse files Browse the repository at this point in the history
… nodes
  • Loading branch information
majdyz committed Oct 23, 2024
1 parent d9b8e0d commit 2ff50a9
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,19 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
# credentials is not supported.
credentials = creds_lock = None
creds_lock = None
if CREDENTIALS_FIELD_NAME in input_data:
credentials_meta = CredentialsMetaInput(**input_data[CREDENTIALS_FIELD_NAME])
credentials, creds_lock = creds_manager.acquire(user_id, credentials_meta.id)
extra_exec_kwargs["credentials"] = credentials

output_size = 0
try:
credit = db_client.get_or_refill_credit(user_id)
if credit < 0:
raise ValueError(f"Insufficient credit: {credit}")
end_status = ExecutionStatus.COMPLETED
credit = db_client.get_or_refill_credit(user_id)
if credit < 0:
raise ValueError(f"Insufficient credit: {credit}")

try:
for output_name, output_data in node_block.execute(
input_data, **extra_exec_kwargs
):
Expand All @@ -185,31 +186,41 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
):
yield execution

# Release lock on credentials ASAP
if creds_lock:
creds_lock.release()

r = update_execution(ExecutionStatus.COMPLETED)
s = input_size + output_size
t = (
(r.end_time - r.start_time).total_seconds()
if r.end_time and r.start_time
else 0
)
db_client.spend_credits(user_id, credit, node_block.id, input_data, s, t)

except Exception as e:
end_status = ExecutionStatus.FAILED
error_msg = str(e)
log_metadata.exception(f"Node execution failed with error {error_msg}")
db_client.upsert_execution_output(node_exec_id, "error", error_msg)
update_execution(ExecutionStatus.FAILED)

raise e
for execution in _enqueue_next_nodes(
db_client=db_client,
node=node,
output=("error", error_msg),
user_id=user_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
log_metadata=log_metadata,
):
yield execution

raise e
finally:
# Ensure credentials are released even if execution fails
if creds_lock:
creds_lock.release()

# Update execution status and spend credits
res = update_execution(end_status)
if end_status == ExecutionStatus.COMPLETED:
s = input_size + output_size
t = (
(res.end_time - res.start_time).total_seconds()
if res.end_time and res.start_time
else 0
)
db_client.spend_credits(user_id, credit, node_block.id, input_data, s, t)

# Update execution stats
if execution_stats is not None:
execution_stats.update(node_block.execution_stats)
execution_stats["input_size"] = input_size
Expand Down

0 comments on commit 2ff50a9

Please sign in to comment.