Skip to content

Commit

Permalink
Add NEPTUNE_SYNC_AFTER_STOP_TIMEOUT environment variable (#1260)
Browse files Browse the repository at this point in the history
Co-authored-by: Rafal Jankowski <[email protected]>
  • Loading branch information
shnela and Raalsky authored Mar 15, 2023
1 parent 54801d2 commit 07edd7f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- Added ability to provide repository path with `GitRef` to `init_run` ([#1292](https://github.com/neptune-ai/neptune-client/pull/1292))
- Added `SupportsNamespaces` interface in `neptune.typing` for proper type annotations of Handler and Neptune objects ([#1280](https://github.com/neptune-ai/neptune-client/pull/1280))
- Added `NEPTUNE_SYNC_AFTER_STOP_TIMEOUT` environment variable ([#1260](https://github.com/neptune-ai/neptune-client/pull/1260))
- `Run`, `Model`, `ModelVersion` and `Project` could be created with constructor in addition to `init_*` functions ([#1246](https://github.com/neptune-ai/neptune-client/pull/1246))

### Fixes
Expand Down
3 changes: 3 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"NEPTUNE_SYNC_BATCH_TIMEOUT_ENV",
"NEPTUNE_SUBPROCESS_KILL_TIMEOUT",
"NEPTUNE_FETCH_TABLE_STEP_SIZE",
"NEPTUNE_SYNC_AFTER_STOP_TIMEOUT",
"NEPTUNE_REQUEST_TIMEOUT",
]

Expand Down Expand Up @@ -54,6 +55,8 @@

NEPTUNE_FETCH_TABLE_STEP_SIZE = "NEPTUNE_FETCH_TABLE_STEP_SIZE"

NEPTUNE_SYNC_AFTER_STOP_TIMEOUT = "NEPTUNE_SYNC_AFTER_STOP_TIMEOUT"

NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT"

S3_ENDPOINT_URL = "S3_ENDPOINT_URL"
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ASYNC_DIRECTORY,
NEPTUNE_DATA_DIRECTORY,
)
from neptune.envs import NEPTUNE_SYNC_AFTER_STOP_TIMEOUT
from neptune.exceptions import NeptuneSynchronizationAlreadyStoppedException
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
Expand All @@ -49,7 +50,7 @@

class AsyncOperationProcessor(OperationProcessor):
STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = 300
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300"))

def __init__(
self,
Expand Down Expand Up @@ -148,6 +149,9 @@ def _wait_for_queue_empty(self, initial_queue_size: int, seconds: Optional[float

while True:
if seconds is None:
if self._consumer.last_backoff_time == 0:
# reset `waiting_start` on successful action
waiting_start = monotonic()
wait_time = self.STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS
else:
wait_time = max(
Expand Down

0 comments on commit 07edd7f

Please sign in to comment.