Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdown method to Flare Client API #3152

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

YuanTingHsieh
Copy link
Collaborator

@YuanTingHsieh YuanTingHsieh commented Jan 17, 2025

Issue

  • Flare API does not close the Flare agent and CellPipe resources
  • LauncherExecutor does not shutdown its threadpoolexecutor
  • SubprocessLauncher have a racing issue when multiple threads try to stop/start the process and monitor the process

Description

  • Adds the shutdown functionality to the Flare API.
  • In addition, introduces the context concept so prepare for future if multiple connections need to be made.
  • Existing API usage is still compatible.
  • Makes corresponding changes to tracking and lightning API
  • Adds shutdown of threadpoolexecutor in LauncherExecutor
  • Adds a lock in SubprocessLauncher

New API Usage:

import nvflare.client as flare
from nvflare.client import FLModel

with flare.init() as ctx:
    input_model = flare.receive(ctx=ctx)
    # do some training
    flare.send(FLModel(xxx), ctx=ctx)

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

Copy link
Collaborator

@yanchengnv yanchengnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments and questions.

nvflare/client/__init__.py Outdated Show resolved Hide resolved
nvflare/client/__init__.py Outdated Show resolved Hide resolved
nvflare/client/api.py Outdated Show resolved Hide resolved
nvflare/client/api.py Outdated Show resolved Hide resolved
nvflare/client/ex_process/api.py Show resolved Hide resolved
Copy link
Collaborator

@yanchengnv yanchengnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be able to shutdown the API.

nvflare/client/api.py Outdated Show resolved Hide resolved
@YuanTingHsieh YuanTingHsieh force-pushed the add_flare_client_context branch from 0ece3c2 to ca7b69c Compare January 28, 2025 05:09
@YuanTingHsieh YuanTingHsieh changed the title Enhance Flare Client API to Support Multiple Contexts and Server Connections Add shutdown method to Flare Client API Jan 28, 2025
@YuanTingHsieh YuanTingHsieh force-pushed the add_flare_client_context branch 2 times, most recently from 83f8165 to 94544fe Compare January 30, 2025 05:47
Copy link
Collaborator

@yanchengnv yanchengnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments for corrections and improvement.

nvflare/client/api.py Outdated Show resolved Hide resolved
return client_api.receive(timeout)
global default_context
local_ctx = ctx if ctx else default_context
return local_ctx.api.receive(timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check local_ctx is not None. If none, raise exception.

return client_api.send(model, clear_cache)
global default_context
local_ctx = ctx if ctx else default_context
return local_ctx.api.send(model, clear_cache)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check local_ctx is not None. If none, raise exception.

return client_api.system_info()
global default_context
local_ctx = ctx if ctx else default_context
return local_ctx.api.system_info()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check local_ctx is not None. If none, raise exception.

return client_api.get_config()
global default_context
local_ctx = ctx if ctx else default_context
return local_ctx.api.get_config()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check local_ctx is not None. If none, raise exception.

def _create_client_api(self, api_type: ClientAPIType) -> APISpec:
"""Creates a new client_api based on the provided API type."""
if api_type == ClientAPIType.IN_PROCESS_API:
return data_bus.get_data(CLIENT_API_KEY)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it doesn't exist in data_bus? Should raise exception.

@@ -206,3 +205,7 @@ def clear(self):
model_registry = self.get_model_registry()
model_registry.clear()
self.receive_called = False

def shutdown(self):
model_registry = self.get_model_registry()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the app calls shutdown multiple times?
How do you ensure that cell agent is closed? You need to keep flare_agent in "self" and then call self.flare_agent.stop().


def shutdown(self):
self.stop = True
self.stop_reason = "API shutdown called."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any resources to clean up? Anything to stop?

@@ -114,6 +114,6 @@ def clear(self) -> None:
def __str__(self):
return f"{self.__class__.__name__}(config: {self.config.get_config()})"

def __del__(self):
def shutdown(self):
Copy link
Collaborator

@yanchengnv yanchengnv Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does task registry have flare_agent? This design seems very odd. It is created when init the API, hence it should belong to the API, and shutdown there too.

In general, the creator of a resource is responsible for its lifecycle (create and shutdown/close/stop/destroy).

class SummaryWriter:
class _BaseWriter:
def __init__(self, ctx: Optional[APIContext] = None):
global default_context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should minimize use of gloabl.
You can do: ctx = api.get_default_ctx().

@holgerroth holgerroth self-requested a review February 5, 2025 02:54
@YuanTingHsieh YuanTingHsieh force-pushed the add_flare_client_context branch from 71ffe16 to df38c1c Compare February 7, 2025 05:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants