Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ModelController documentation #2707

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions docs/programming_guide/controllers/model_controller.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
.. _model_controller:

###################
ModelController API
SYangster marked this conversation as resolved.
Show resolved Hide resolved
###################

The FLARE :mod:`ModelController<nvflare.app_common.workflows.model_controller>` API provides an easy way for users to write and customize FLModel-based controller workflows.

* Highly flexible with a simple API (run routine and basic communication and utility functions)
* :ref:`fl_model`for the communication data structure, everything else is pure Python
* Option to support pre-existing components and FLARE-specific functionalities

.. note::

The ModelController API is a high-level API meant to simplify writing workflows.
If users prefer or need the full flexibility of the Controller with all the capabilites of FLARE functions, refer to the :ref:`controllers`.


Core Concepts
=============

As an example, we can take a look at the popular federated learning workflow, "FedAvg" which has the following steps:

#. FL server initializes an initial model
#. For each round (global iteration):

#. FL server sends the global model to clients
#. Each FL client starts with this global model and trains on their own data
#. Each FL client sends back their trained model
#. FL server aggregates all the models and produces a new global model


To implement this workflow using the ModelController there are a few essential parts:

* Import and subclass the :class:`nvflare.app_common.workflows.model_controller.ModelController`.
* Implement the ``run()`` routine for the workflow logic.
* Utilize ``send_model()`` / ``send_model_and_wait()`` for communication to send tasks with FLModel to target clients, and receive FLModel results.
* Customize workflow using predefined utility functions and components, or implement your own logics.


Here is an example of the FedAvg workflow using the :class:`BaseFedAvg<nvflare.app_common.workflows.base_fedavg.BaseFedAvg>` base class:

.. code-block:: python

# BaseFedAvg subclasses ModelController and defines common functions and variables such as aggregate(), update_model(), self.start_round, self.num_rounds
class FedAvg(BaseFedAvg):

# run routine that user must implement
def run(self) -> None:
self.info("Start FedAvg.")

# load model (by default uses persistor, can provide custom method)
model = self.load_model()
model.start_round = self.start_round
model.total_rounds = self.num_rounds

# for each round (global iteration)
for self.current_round in range(self.start_round, self.start_round + self.num_rounds):
self.info(f"Round {self.current_round} started.")
model.current_round = self.current_round

# obtain self.num_clients clients
clients = self.sample_clients(self.num_clients)

# send model to target clients with default train task, wait to receive results
results = self.send_model_and_wait(targets=clients, data=model)

# use BaseFedAvg aggregate function
aggregate_results = self.aggregate(
results, aggregate_fn=self.aggregate_fn
) # using default aggregate_fn with `WeightedAggregationHelper`. Can overwrite self.aggregate_fn with signature Callable[List[FLModel], FLModel]

# update global model with agggregation results
model = self.update_model(model, aggregate_results)

# save model (by default uses persistor, can provide custom method)
self.save_model(model)

self.info("Finished FedAvg.")


Below is a comprehensive table overview of the :class:`ModelController<nvflare.app_common.workflows.model_controller.ModelController>` API:


.. list-table:: ModelController API
:widths: 25 35 50
:header-rows: 1

* - API
- Description
- API Doc Link
* - run
- Run routine for workflow.
- :func:`run<nvflare.app_common.workflows.model_controller.ModelController.run>`
* - send_model_and_wait
- Send a task with data to targets (blocking) and wait for results..
- :func:`send_model_and_wait<nvflare.app_common.workflows.model_controller.ModelController.send_model_and_wait>`
* - send_model
- Send a task with data to targets (non-blocking) with callback.
- :func:`send_model<nvflare.app_common.workflows.model_controller.ModelController.send_model>`
* - sample_clients
- Returns a list of num_clients clients.
- :func:`sample_clients<nvflare.app_common.workflows.model_controller.ModelController.sample_clients>`
* - save_model
- Save model with persistor.
- :func:`save_model<nvflare.app_common.workflows.model_controller.ModelController.save_model>`
* - load_model
- Load model from persistor.
- :func:`load_model<nvflare.app_common.workflows.model_controller.ModelController.load_model>`


Communication
=============

The ModelController uses a task based communication where tasks are sent to targets, and targets execute the tasks and return results.
The :ref:`fl_model` is standardized data structure object that is sent along with each task, and :ref:`fl_model` responses are received for the results.

.. note::

The :ref:`fl_model` object can be any type of data depending on the specific task.
For example, in the "train" and "validate" tasks we send the model parameters along with the task so the target clients can train and validate the model.
However in many other tasks that do not involve sending the model (e.g. "submit_model"), the :ref:`fl_model` can contain any type of data (e.g. metadata, metrics etc.) or may be not be needed at all.


send_model_and_wait
-------------------
:func:`send_model_and_wait<nvflare.app_common.workflows.model_controller.ModelController.send_model_and_wait>` is the core communication function which enables users to send tasks to targets, and wait for responses.

The ``data`` is an :ref:`fl_model` object, and the ``task_name`` is the task for the target executors to execute (Client API executors by default support "train", "validate", and "submit_model", however executors can be written for any arbitrary task name).

``targets`` can be chosen from client names obtained with ``sample_clients()``.

Returns the :ref:`fl_model` responses from the target clients once the task is completed (``min_responses`` have been received, or ``timeout`` time has passed).

send_model
SYangster marked this conversation as resolved.
Show resolved Hide resolved
----------
:func:`send_model<nvflare.app_common.workflows.model_controller.ModelController.send_model>` is the non-blocking version of
:func:`send_model_and_wait<nvflare.app_common.workflows.model_controller.ModelController.send_model_and_wait>` with a user-defined callback when receiving responses.

A callback with the signature ``Callable[[FLModel], None]`` can be passed in, which will be called when a response is received from each target.

The task is standing until either ``min_responses`` have been received, or ``timeout`` time has passed.
Since this call is asynchronous, the Controller :func:`get_num_standing_tasks<nvflare.apis.impl.controller.Controller.get_num_standing_tasks>` method can be used to get the number of standing tasks for synchronization purposes.


Saving & Loading
================

persistor
---------
The :func:`save_model<nvflare.app_common.workflows.model_controller.ModelController.save_model>` and :func:`load_model<nvflare.app_common.workflows.model_controller.ModelController.load_model>`
functions utilize the configured :class:`ModelPersistor<nvflare.app_common.abstract.model_persistor.ModelPersistor>` set in the ModelController ``persistor_id: str = "persistor"`` init argument.

custom save & load
------------------
Users can also choose to instead create their own custom save and load functions rather than use a persistor.

For example we can use PyTorch's save and load functions for the model parameters, and save the FLModel metadata with :mod:`FOBS<nvflare.fuel.utils.fobs>` separately to different filepaths.

.. code-block:: python

import torch
from nvflare.fuel.utils import fobs

class MyController(ModelController):
...
def save_model(self, model, filepath=""):
params = model.params
# PyTorch save
torch.save(params, filepath)

# save FLModel metadata
model.params = {}
fobs.dumpf(model, filepath + ".metadata")
model.params = params

def load_model(self, filepath=""):
# PyTorch load
params = torch.load(filepath)

# load FLModel metadata
model = fobs.loadf(filepath + ".metadata")
model.params = params
return model


Note: for non-primitive data types such as ``torch.nn.Module`` (used for the initial PyTorch model), we must configure a corresponding FOBS decomposer for serialization and deserialization.
Read more at :github_nvflare_link:`Flare Object Serializer (FOBS) <nvflare/fuel/utils/fobs/README.rst>`.

.. code-block:: python

from nvflare.app_opt.pt.decomposers import TensorDecomposer

fobs.register(TensorDecomposer)


Additional Functionalities
==========================

In some cases, more advanced FLARE-specific functionalities may be of use.

The :mod:`BaseModelController<nvflare.app_common.workflows.base_model_controller>` class provides access to the engine ``self.engine`` and FLContext ``self.fl_ctx`` if needed.
Functions such as ``get_component()`` and ``build_component()`` can be used to load or dynamically build components.

Furthermore, the underlying :mod:`Controller<nvflare.apis.impl.controller>` class offers additional communication functions and task related utilities.
Many of our pre-existing workflows are based on this lower-level Controller API.
For more details refer to the :ref:`controllers` section.

Examples
========

Examples of basic workflows using the ModelController API:

* :github_nvflare_link:`Cyclic <nvflare/app_common/workflows/cyclic.py>`
* :github_nvflare_link:`BaseFedAvg <nvflare/app_common/workflows/base_fedavg.py>`
* :github_nvflare_link:`FedAvg <nvflare/app_common/workflows/fedavg.py>`

Advanced examples:

* :github_nvflare_link:`Scaffold <nvflare/app_common/workflows/scaffold.py>`
* :github_nvflare_link:`FedOpt <nvflare/app_opt/pt/fedopt_ctl.py>`
* :github_nvflare_link:`PTFedAvgEarlyStopping <nvflare/app_opt/pt/fedavg_early_stopping.py>`
* :github_nvflare_link:`Kaplan-Meier <examples/advanced/kaplan-meier-he/src/kaplan_meier_wf_he.py>`
* :github_nvflare_link:`Logistic Regression Newton Raphson <examples/advanced/lr-newton-raphson/job/newton_raphson/app/custom/newton_raphson_workflow.py>`
* :github_nvflare_link:`FedBPT <research/fed-bpt/src/global_es.py>`
4 changes: 2 additions & 2 deletions docs/programming_guide/fl_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
FLModel
=======

We define a standard data structure :mod:`FLModel<nvflare.app_common.abstract.FLModel>`
We define a standard data structure :mod:`FLModel<nvflare.app_common.abstract.fl_model>`
that captures the common attributes needed for exchanging learning results.

This is particularly useful when NVFlare system needs to exchange learning
Expand All @@ -14,4 +14,4 @@ information from received FLModel, run local training, and put the results
in a new FLModel to be sent back.

For a detailed explanation of each attributes, please refer to the API doc:
:mod:`FLModel<nvflare.app_common.abstract.FLModel>`
:mod:`FLModel<nvflare.app_common.abstract.fl_model>`
37 changes: 35 additions & 2 deletions docs/programming_guide/workflows_and_controllers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,49 @@ A workflow has one or more controllers, each implementing a specific coordinatio
CrossSiteValidation controller implements a strategy to let every client site evaluate every other site's model. You can put together
a workflow that uses any number of controllers.

We have implemented several server controlled federated learning workflows (fed-average, cyclic controller, cross-site evaluation) with the server-side :ref:`controllers <controllers>`.
We provide the FLModel-based :ref:`model_controller` which provides a straightforward way for users to write controllers.
We also have the original :ref:`Controller API <controllers>` with more FLARE-specific functionalities, which many of our existing workflows are based upon.

We have implemented several server controlled federated learning workflows (fed-average, cyclic controller, cross-site evaluation) with the server-side controllers.
In these workflows, FL clients get tasks assigned by the controller, execute the tasks, and submit results back to the server.

In certain cases, if the server cannot be trusted, it should not be involved in communication with sensitive information.
To address this concern, NVFlare introduces Client Controlled Workflows (CCWF) to facilitate peer-to-peer communication among clients.

Please refer to the following sections for more details.

Controllers can be configured in ``config_fed_server.json`` in the workflows section:

.. code-block:: json

workflows = [
{
id = "fedavg_ctl",
name = "FedAvg",
args {
min_clients = 2,
num_rounds = 3,
persistor_id = "persistor"
}
}
]

To configure controllers using the JobAPI, define the controller and send it to the server.
This code will automatically generate the server configuration for the controller:

.. code-block:: python

controller = FedAvg(
num_clients=2,
num_rounds=3,
persistor_id = "persistor"
)
job.to(controller, "server")

Please refer to the following sections for more details about the different types of controllers.

.. toctree::
:maxdepth: 3

controllers/model_controller
controllers/controllers
controllers/client_controlled_workflows
6 changes: 3 additions & 3 deletions nvflare/app_common/workflows/base_model_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ def save_model(self, model):
else:
self.error("persistor not configured, model will not be saved")

def sample_clients(self, num_clients=None):
clients = self.engine.get_clients()
def sample_clients(self, num_clients: int = None) -> List[str]:
clients = [client.name for client in self.engine.get_clients()]
SYangster marked this conversation as resolved.
Show resolved Hide resolved

if num_clients:
check_positive_int("num_clients", num_clients)
Expand All @@ -375,7 +375,7 @@ def sample_clients(self, num_clients=None):
f"num_clients ({num_clients}) is greater than the number of available clients. Returning all ({len(clients)}) available clients."
)

self.info(f"Sampled clients: {[client.name for client in clients]}")
self.info(f"Sampled clients: {clients}")

return clients

Expand Down
8 changes: 4 additions & 4 deletions nvflare/app_common/workflows/model_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ def send_model(
callback=callback,
)

def load_model(self):
def load_model(self) -> FLModel:
"""Load initial model from persistor. If persistor is not configured, returns empty FLModel.

Returns:
FLModel
"""
return super().load_model()

def save_model(self, model: FLModel):
def save_model(self, model: FLModel) -> None:
"""Saves model with persistor. If persistor is not configured, does not save.

Args:
Expand All @@ -122,12 +122,12 @@ def save_model(self, model: FLModel):
"""
super().save_model(model)

def sample_clients(self, num_clients=None):
def sample_clients(self, num_clients: int = None) -> List[str]:
"""Returns a list of `num_clients` clients.

Args:
num_clients: number of clients to return. If None or > number available clients, returns all available clients. Defaults to None.

Returns: list of clients.
Returns: list of clients names.
"""
return super().sample_clients(num_clients)
Loading