Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-arrange getting started examples #2805

Merged
merged 4 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/resources/controller_executor_no_filter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 29 additions & 0 deletions examples/advanced/job_api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Additional Examples for NVIDIA FLARE Job API

ypu probably already has looked at [getting started](../../getting_started) examples,
chesterxgchen marked this conversation as resolved.
Show resolved Hide resolved
and [hello-world](../../hello-world) examples. Here are additional examples for advanced algorithms

### Basic Concepts
At the heart of NVFlare lies the concept of collaboration through "tasks." An FL controller assigns tasks
(e.g., training on local data) to one or more FL clients, processes returned results (e.g., model weight updates),
and may assign additional tasks based on these results and other factors (e.g., a pre-configured number of training rounds).
The clients run executors which can listen for tasks and perform the necessary computations locally, such as model training.
This task-based interaction repeats until the experiment’s objectives are met.

We can also add data filters (for example, for [homomorphic encryption](https://www.usenix.org/conference/atc20/presentation/zhang-chengliang)
or [differential privacy filters](https://arxiv.org/abs/1910.00962)) to the task data
or results received or produced by the server or clients.

![NVIDIA FLARE Overview](../../../docs/resources/nvflare_overview.svg)

### Examples
We have several examples to illustrate job APIs
Each example folder includes basic job configurations for running different FL algorithms.
such as [FedOpt](https://arxiv.org/abs/2003.00295), or [SCAFFOLD](https://arxiv.org/abs/1910.06378).

### 1. [PyTorch Examples](./pt/README.md)
### 2. [Tensorflow Examples](./tf/README.md)
### 3. [Scikit-Learn Examples](./sklearn/README.md)

> [!NOTE]
> More examples can be found at https://nvidia.github.io/NVFlare.
53 changes: 53 additions & 0 deletions examples/advanced/job_api/pt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Advanced Job API Examples with PyTorch

[![PyTorch Logo](https://upload.wikimedia.org/wikipedia/commons/c/c6/PyTorch_logo_black.svg)](https://pytorch.org)

We provide several advanced examples with NVFlare's Job API.
All examples in this folder are based on using [PyTorch](https://pytorch.org/) as the model training framework.
Furthermore, we support [PyTorch Lightning](https://lightning.ai).

## Setup environment
First, install nvflare and dependencies:
```commandline
pip install -r requirements.txt
```

## Examples
You can also run any of the below scripts directly using
```commandline
python "script_name.py"
```

```commandline
python fedavg_script_executor_lightning_cifar10.py
```
### 1. [Federated averaging using the script executor](./fedavg_script_executor_cifar10.py)
Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html).


### 2. [Federated averaging using script executor and differential privacy filter](./fedavg_script_executor_dp_filter_cifar10.py)
Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html)
with additional [differential privacy filters](https://arxiv.org/abs/1910.00962) on the client side.
```commandline
python fedavg_script_executor_dp_filter_cifar10.py
```
### 3. [Swarm learning using script executor](./swarm_script_executor_cifar10.py)
chesterxgchen marked this conversation as resolved.
Show resolved Hide resolved
Implementation of [swarm learning](https://www.nature.com/articles/s41586-021-03583-3) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html)
```commandline
python swarm_script_executor_cifar10.py
```
### 4. [Cyclic weight transfer using script executor](./cyclic_cc_script_executor_cifar10.py)
Implementation of [cyclic weight transfer](https://arxiv.org/abs/1709.05929) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html)
```commandline
python cyclic_cc_script_executor_cifar10.py
```
### 5. [Federated averaging using model learning](./fedavg_model_learner_xsite_val_cifar10.py))
Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [model learner class](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/model_learner.html),
followed by [cross site validation](https://nvflare.readthedocs.io/en/main/programming_guide/controllers/cross_site_model_evaluation.html)
for federated model evaluation.
```commandline
python fedavg_model_learner_xsite_val_cifar10.py
```

> [!NOTE]
> More examples can be found at https://nvidia.github.io/NVFlare.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
num_rounds=num_rounds,
)
job.to(controller, "server")
# job.to_server(controller)

# Define the initial global model and send to server
job.to(Net(), "server")
# job.to_server(Net())

# Add clients
for i in range(n_clients):
executor = ScriptExecutor(
task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}"
)
job.to(executor, f"site-{i}", gpu=0)
# job.to_clients(executor)

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir")
Binary file added examples/advanced/job_api/pt/figs/tb_loss.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions examples/advanced/job_api/pt/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
nvflare~=2.5.0rc
torch
torchvision
pytorch_lightning
tensorboard
137 changes: 137 additions & 0 deletions examples/advanced/job_api/pt/src/cifar10_fl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from net import Net

# (1) import nvflare client API
import nvflare.client as flare

# (optional) metrics
from nvflare.client.tracking import SummaryWriter

# (optional) set a fix place so we don't need to download everytime
DATASET_PATH = "/tmp/nvflare/data"
# If available, we use GPU to speed things up.
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


def main():
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 4
epochs = 2

trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

net = Net()

# (2) initializes NVFlare client API
flare.init()

summary_writer = SummaryWriter()
while flare.is_running():
# (3) receives FLModel from NVFlare
input_model = flare.receive()
print(f"current_round={input_model.current_round}")

# (4) loads model from NVFlare
net.load_state_dict(input_model.params)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# (optional) use GPU to speed things up
net.to(DEVICE)
# (optional) calculate total steps
steps = epochs * len(trainloader)
for epoch in range(epochs): # loop over the dataset multiple times

running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# get the inputs; data is a list of [inputs, labels]
# (optional) use GPU to speed things up
inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE)

# zero the parameter gradients
optimizer.zero_grad()

# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# print statistics
running_loss += loss.item()
if i % 2000 == 1999: # print every 2000 mini-batches
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
global_step = input_model.current_round * steps + epoch * len(trainloader) + i

summary_writer.add_scalar(tag="loss_for_each_batch", scalar=running_loss, global_step=global_step)
running_loss = 0.0

print("Finished Training")

PATH = "./cifar_net.pth"
torch.save(net.state_dict(), PATH)

# (5) wraps evaluation logic into a method to re-use for
# evaluation on both trained and received model
def evaluate(input_weights):
net = Net()
net.load_state_dict(input_weights)
# (optional) use GPU to speed things up
net.to(DEVICE)

correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
for data in testloader:
# (optional) use GPU to speed things up
images, labels = data[0].to(DEVICE), data[1].to(DEVICE)
# calculate outputs by running images through the network
outputs = net(images)
# the class with the highest energy is what we choose as prediction
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

print(f"Accuracy of the network on the 10000 test images: {100 * correct // total} %")
return 100 * correct // total

# (6) evaluate on received model for model selection
accuracy = evaluate(input_model.params)
summary_writer.add_scalar(tag="global_model_accuracy", scalar=accuracy, global_step=input_model.current_round)
# (7) construct trained FL model
output_model = flare.FLModel(
params=net.cpu().state_dict(),
metrics={"accuracy": accuracy},
meta={"NUM_STEPS_CURRENT_ROUND": steps},
)
# (8) send model back to NVFlare
flare.send(output_model)


if __name__ == "__main__":
main()
108 changes: 108 additions & 0 deletions examples/advanced/job_api/pt/src/cifar10_lightning_fl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torchvision
import torchvision.transforms as transforms
from lit_net import LitNet
from pytorch_lightning import LightningDataModule, Trainer, seed_everything
from torch.utils.data import DataLoader, random_split

# (1) import nvflare lightning client API
import nvflare.client.lightning as flare

seed_everything(7)


DATASET_PATH = "/tmp/nvflare/data"
BATCH_SIZE = 4

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])


class CIFAR10DataModule(LightningDataModule):
def __init__(self, data_dir: str = DATASET_PATH, batch_size: int = BATCH_SIZE):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size

def prepare_data(self):
torchvision.datasets.CIFAR10(root=self.data_dir, train=True, download=True, transform=transform)
torchvision.datasets.CIFAR10(root=self.data_dir, train=False, download=True, transform=transform)

def setup(self, stage: str):
# Assign train/val datasets for use in dataloaders
if stage == "fit" or stage == "validate":
cifar_full = torchvision.datasets.CIFAR10(
root=self.data_dir, train=True, download=False, transform=transform
)
self.cifar_train, self.cifar_val = random_split(cifar_full, [0.8, 0.2])

# Assign test dataset for use in dataloader(s)
if stage == "test" or stage == "predict":
self.cifar_test = torchvision.datasets.CIFAR10(
root=self.data_dir, train=False, download=False, transform=transform
)

def train_dataloader(self):
return DataLoader(self.cifar_train, batch_size=self.batch_size)

def val_dataloader(self):
return DataLoader(self.cifar_val, batch_size=self.batch_size)

def test_dataloader(self):
return DataLoader(self.cifar_test, batch_size=self.batch_size)

def predict_dataloader(self):
return DataLoader(self.cifar_test, batch_size=self.batch_size)


def main():
model = LitNet()
cifar10_dm = CIFAR10DataModule()
if torch.cuda.is_available():
trainer = Trainer(max_epochs=1, accelerator="gpu", devices=1 if torch.cuda.is_available() else None)
else:
trainer = Trainer(max_epochs=1, devices=None)

# (2) patch the lightning trainer
flare.patch(trainer)

while flare.is_running():
# (3) receives FLModel from NVFlare
# Note that we don't need to pass this input_model to trainer
# because after flare.patch the trainer.fit/validate will get the
# global model internally
input_model = flare.receive()
print(f"\n[Current Round={input_model.current_round}, Site = {flare.get_site_name()}]\n")

# (4) evaluate the current global model to allow server-side model selection
print("--- validate global model ---")
trainer.validate(model, datamodule=cifar10_dm)

# perform local training starting with the received global model
print("--- train new model ---")
trainer.fit(model, datamodule=cifar10_dm)

# test local model
print("--- test new model ---")
trainer.test(ckpt_path="best", datamodule=cifar10_dm)

# get predictions
print("--- prediction with new best model ---")
trainer.predict(ckpt_path="best", datamodule=cifar10_dm)


if __name__ == "__main__":
main()
Loading
Loading