From 2ff50a94ec7e61092247b3aa9f059899418ee76c Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Wed, 23 Oct 2024 19:14:32 +0700 Subject: [PATCH] fix(backend): Fix error pin output not being propagated into the next nodes --- .../backend/backend/executor/manager.py | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 033d2bec3de1..58a6a7af4cfe 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -155,18 +155,19 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult: # changes during execution. ⚠️ This means a set of credentials can only be used by # one (running) block at a time; simultaneous execution of blocks using same # credentials is not supported. - credentials = creds_lock = None + creds_lock = None if CREDENTIALS_FIELD_NAME in input_data: credentials_meta = CredentialsMetaInput(**input_data[CREDENTIALS_FIELD_NAME]) credentials, creds_lock = creds_manager.acquire(user_id, credentials_meta.id) extra_exec_kwargs["credentials"] = credentials output_size = 0 - try: - credit = db_client.get_or_refill_credit(user_id) - if credit < 0: - raise ValueError(f"Insufficient credit: {credit}") + end_status = ExecutionStatus.COMPLETED + credit = db_client.get_or_refill_credit(user_id) + if credit < 0: + raise ValueError(f"Insufficient credit: {credit}") + try: for output_name, output_data in node_block.execute( input_data, **extra_exec_kwargs ): @@ -185,31 +186,41 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult: ): yield execution - # Release lock on credentials ASAP - if creds_lock: - creds_lock.release() - - r = update_execution(ExecutionStatus.COMPLETED) - s = input_size + output_size - t = ( - (r.end_time - r.start_time).total_seconds() - if r.end_time and r.start_time - else 0 - ) - db_client.spend_credits(user_id, credit, node_block.id, input_data, s, t) - except Exception as e: + end_status = ExecutionStatus.FAILED error_msg = str(e) log_metadata.exception(f"Node execution failed with error {error_msg}") db_client.upsert_execution_output(node_exec_id, "error", error_msg) - update_execution(ExecutionStatus.FAILED) - raise e + for execution in _enqueue_next_nodes( + db_client=db_client, + node=node, + output=("error", error_msg), + user_id=user_id, + graph_exec_id=graph_exec_id, + graph_id=graph_id, + log_metadata=log_metadata, + ): + yield execution + raise e finally: # Ensure credentials are released even if execution fails if creds_lock: creds_lock.release() + + # Update execution status and spend credits + res = update_execution(end_status) + if end_status == ExecutionStatus.COMPLETED: + s = input_size + output_size + t = ( + (res.end_time - res.start_time).total_seconds() + if res.end_time and res.start_time + else 0 + ) + db_client.spend_credits(user_id, credit, node_block.id, input_data, s, t) + + # Update execution stats if execution_stats is not None: execution_stats.update(node_block.execution_stats) execution_stats["input_size"] = input_size