Skip to content

Commit

Permalink
Added best model and last model extraction technique in docs/workflow…
Browse files Browse the repository at this point in the history
…_interface.rst

Signed-off-by: Parth Mandaliya <[email protected]>
  • Loading branch information
ParthM-GitHub committed Sep 28, 2023
1 parent d9d513a commit b699a1c
Showing 1 changed file with 68 additions and 17 deletions.
85 changes: 68 additions & 17 deletions docs/workflow_interface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ A :code:`Runtime` defines where the flow will be executed, who the participants
local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators)
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`self` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`flow` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator.

In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object
In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object.

In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns the collaborator private attributes :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`). Some important points to remember while creating callback function and private attributes are:

Expand Down Expand Up @@ -221,7 +221,7 @@ For users with large servers or multiple GPUs they wish to take advantage of, we
.. code-block:: python
# Aggregator
aggregator_ = Aggregator()
aggregator_ = Aggregator(num_gpus=0.2)
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
Expand All @@ -234,7 +234,7 @@ For users with large servers or multiple GPUs they wish to take advantage of, we
collaborators.append(
Collaborator(
name=collaborator_name,
num_gpus=0.25, # Number of the GPU allocated to Participant
num_gpus=0.2, # Number of the GPU allocated to Participant
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
Expand All @@ -247,7 +247,9 @@ For users with large servers or multiple GPUs they wish to take advantage of, we
# The Ray Backend will now be used for local execution
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')
In the above example, we have used :code:`num_gpus=0.25` while instantiating Collaborator to specify that each participant shall use 1/4th of GPU - this results in one GPU being dedicated for a total of 4 collaborators. Users can tune these arguments based on their Federated Learning requirements and available hardware resources. Configurations where one Participant is shared across GPUs is not supported. For e.g. trying to run 5 participants on 2 GPU hardware with :code:`num_gpus=0.4` will not work since 80% of each GPU is allocated to 2 collaborators and 5th collaborator does not have any available GPU remaining for use
In the above example, we have used :code:`num_gpus=0.2` while instantiating Aggregator and Collaborator to specify that each participant shall use 1/5th of GPU - this results in one GPU being dedicated for a total of 4 collaborators and 1 Aggregator. Users can tune these arguments based on their Federated Learning requirements and available hardware resources. Configurations where one Participant is shared across GPUs is not supported. For e.g. trying to run 5 participants on 2 GPU hardware with :code:`num_gpus=0.4` will not work since 80% of each GPU is allocated to 4 participants and 5th participant does not have any available GPU remaining for use.

**Note:** It is not necessary to have ALL the participants use GPUs. For e.g. only the Collaborator are allocated to GPUs. In this scenario user should ensure that the artifacts returned by Collaborators to Aggregator (e.g. locally trained model object) should be loaded back to CPU before exiting the collaborator step (i.e. before the join step). As Tensorflow manages the object allocation by default therefore this step is needed only for Pytorch.

Debugging with the Metaflow Client
==================================
Expand All @@ -266,19 +268,38 @@ After the flow has started running, you can use the Metaflow Client to get inter

.. code-block:: python
from metaflow import Flow, Run, Task, Step
from metaflow import Metaflow, Flow, Step, Task
# Initialize Metaflow object and obtain list of executed flows:
m = Metaflow()
list(m)
> [Flow('FederatedFlow'), Flow('AggregatorValidationFlow'), Flow('FederatedFlow_MNIST_Watermarking')]
# The name of the flow is the name of the class
flow = Flow('FederatedFlow')
run = flow.latest_run
# Identify the Flow name
flow_name = 'FederatedFlow'
# List all instances of Federatedflow executed under distinct run IDs
flow = Flow(flow_name)
list(flow)
> [Run('FederatedFlow/1692946840822001'),
Run('FederatedFlow/1692946796234386'),
Run('FederatedFlow/1692902602941163'),
Run('FederatedFlow/1692902559123920'),]
# To Retrieve the latest run of the Federatedflow
run = Flow(flow_name).latest_run
print(run)
> Run('FederatedFlow/1692946840822001')
list(run)
> [Step('FederatedFlow/1671152854447797/end'),
Step('FederatedFlow/1671152854447797/join'),
Step('FederatedFlow/1671152854447797/local_model_validation'),
Step('FederatedFlow/1671152854447797/train'),
Step('FederatedFlow/1671152854447797/aggregated_model_validation'),
Step('FederatedFlow/1671152854447797/start')]
step = Step('FederatedFlow/1671152854447797/aggregated_model_validation')
> [Step('FederatedFlow/1692946840822001/end'),
Step('FederatedFlow/1692946840822001/join'),
Step('FederatedFlow/1692946840822001/local_model_validation'),
Step('FederatedFlow/1692946840822001/train'),
Step('FederatedFlow/1692946840822001/aggregated_model_validation'),
Step('FederatedFlow/1692946840822001/start')]
step = Step('FederatedFlow/1692946840822001/aggregated_model_validation')
for task in step:
if task.data.input == 'Portland':
print(task.data)
Expand Down Expand Up @@ -308,6 +329,37 @@ And if we wanted to get log or error message for that task, you can just run:
print(portland_task.stderr)
> [No output]
Also, If we wanted to get the best model and the last model, you can just run:

.. code-block:: python
# Choose the specific step containing the desired models (e.g., 'join' step):
step = Step('FederatedFlow/1692946840822001/join')
list(step)
> [Task('FederatedFlow/1692946840822001/join/12'),--> Round 3
Task('FederatedFlow/1692946840822001/join/9'), --> Round 2
Task('FederatedFlow/1692946840822001/join/6'), --> Round 1
Task('FederatedFlow/1692946840822001/join/3')] --> Round 0
"""The sequence of tasks represents each round, with the most recent task corresponding to the final round and the preceding tasks indicating the previous rounds
in chronological order.
To determine the best model, analyze the command line logs and model accuracy for each round. Then, provide the corresponding task ID associated with that Task"""
task = Task('FederatedFlow/1692946840822001/join/9')
# Access the best model and its associated data
best_model = task.data.model
best_local_model_accuracy = task.data.local_model_accuracy
best_aggregated_model_accuracy = t.data.aggregated_model_accuracy
# To retrieve the last model, select the most recent Task i.e last round.
task = Task('FederatedFlow/1692946840822001/join/12')
last_model = task.data.model
# Save the chosen models using a suitable framework (e.g., PyTorch in this example):
import torch
torch.save(last_model.state_dict(), PATH)
torch.save(best_model.state_dict(), PATH)
While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, `checkpoint` is disabled by default.

Runtimes: Future Plans
Expand All @@ -327,4 +379,3 @@ Our goal is to make it a one line change to configure where and how a flow is ex
federated_runtime = FederatedRuntime(...)
flow.runtime = federated_runtime
flow.run()

0 comments on commit b699a1c

Please sign in to comment.