Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API for Pythonic Job Creation #2563

Merged
merged 39 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9e7d82d
Job API and examples
holgerroth May 6, 2024
e97c28a
fix import
holgerroth May 6, 2024
166b206
don't export by default. fix typo
holgerroth May 7, 2024
677cb06
allow providing objects directly instead of ids
holgerroth May 8, 2024
d2d4e7c
Merge branch 'main' of github.com:NVIDIA/NVFlare into pythonic_gettin…
holgerroth May 8, 2024
19b5ce8
restructure files
holgerroth May 8, 2024
73789ba
use optional import
holgerroth May 8, 2024
274acc9
move ScriptExecutor
holgerroth May 8, 2024
576145c
use optional import in ScriptExecutor
holgerroth May 8, 2024
11da0b1
add license header
holgerroth May 8, 2024
3822d4a
Merge branch 'main' of github.com:NVIDIA/NVFlare into pythonic_gettin…
holgerroth May 9, 2024
44389db
make kmeans preprocessing optional
holgerroth May 10, 2024
18c29f6
introduce as_id() command and revert some changes
holgerroth May 10, 2024
a9031d4
enable recursive referencing
holgerroth May 10, 2024
bfd04d1
fix component count
holgerroth May 10, 2024
14e5322
increase count starting at 1
holgerroth May 10, 2024
3b1d9e2
rename files
holgerroth May 10, 2024
abd4b4a
Merge branch 'main' of github.com:NVIDIA/NVFlare into pythonic_gettin…
holgerroth May 10, 2024
64387de
improve docstring, list clients only once
holgerroth May 13, 2024
7f28755
Merge branch 'pythonic_getting_started' of github.com:holgerroth/NVFl…
holgerroth May 14, 2024
6e0517b
Moved the get_component_init_parameters to class_utils.
yhwen May 14, 2024
afa37c9
merge
holgerroth May 14, 2024
30e4139
check constructor args only
holgerroth May 14, 2024
5ceb717
add tf example; only add components once
holgerroth May 14, 2024
4b1b522
save tf net args
holgerroth May 14, 2024
78b4ad8
add tensorboard persistor
holgerroth May 15, 2024
d5a155d
Merge branch 'pythonic_getting_started' of github.com:holgerroth/NVFl…
holgerroth May 16, 2024
70df39b
merge
holgerroth May 16, 2024
643538f
add requirements; support tf in script executor
holgerroth May 16, 2024
2018c8a
update tasks for converter
holgerroth May 16, 2024
f1092ab
Merge branch 'main' into pythonic_getting_started
holgerroth May 16, 2024
77e284a
reformatting; move ScriptExecutor
holgerroth May 16, 2024
8c15094
address comments
holgerroth May 17, 2024
1a66384
Merge branch 'main' into pythonic_getting_started
holgerroth May 17, 2024
6200faa
Merge branch 'main' into pythonic_getting_started
holgerroth May 20, 2024
67d86e9
Merge branch 'main' into pythonic_getting_started
holgerroth May 21, 2024
5648ecb
Merge branch 'main' into pythonic_getting_started
holgerroth May 22, 2024
eb40e32
fix imports for inprocess executor; update TF network
holgerroth May 22, 2024
d3837d8
Merge branch 'main' into pythonic_getting_started
YuanTingHsieh May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions examples/hello-world/job_api/pt/client_api_kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import os

from src.kmeans_assembler import KMeansAssembler
from src.split_csv import distribute_header_file, split_csv

from nvflare import FedJob, ScriptExecutor
from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator
from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator
from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather
from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor
from nvflare.client.config import ExchangeFormat


def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample_rate, site_name_prefix="site-"):
input_file = input_data_path
output_directory = output_dir
num_parts = site_num
site_name_prefix = site_name_prefix
sample_rate = sample_rate
split_csv(input_file, output_directory, num_parts, site_name_prefix, sample_rate)
distribute_header_file(input_header_path, output_directory, num_parts, site_name_prefix)


if __name__ == "__main__":
n_clients = 3
num_rounds = 2
train_script = "src/kmeans_fl.py"
data_input_dir = "/tmp/nvflare/higgs/data"
data_output_dir = "/tmp/nvflare/higgs/split_data"

# Download data
os.makedirs(data_input_dir, exist_ok=True)
higgs_zip_file = os.path.join(data_input_dir, "higgs.zip")
if not os.path.exists(higgs_zip_file):
os.system(
f"curl -o {higgs_zip_file} https://archive.ics.uci.edu/static/public/280/higgs.zip"
) # This might take a while. The file is 2.8 GB.
os.system(f"unzip -d {data_input_dir} {higgs_zip_file}")
os.system(
f"gunzip -c {os.path.join(data_input_dir, 'HIGGS.csv.gz')} > {os.path.join(data_input_dir, 'higgs.csv')}"
)

# Generate the csv header file
# Your list of data
features = [
"label",
"lepton_pt",
"lepton_eta",
"lepton_phi",
"missing_energy_magnitude",
"missing_energy_phi",
"jet_1_pt",
"jet_1_eta",
"jet_1_phi",
"jet_1_b_tag",
"jet_2_pt",
"jet_2_eta",
"jet_2_phi",
"jet_2_b_tag",
"jet_3_pt",
"jet_3_eta",
"jet_3_phi",
"jet_3_b_tag",
"jet_4_pt",
"jet_4_eta",
"jet_4_phi",
"jet_4_b_tag",
"m_jj",
"m_jjj",
"m_lv",
"m_jlv",
"m_bb",
"m_wbb",
"m_wwbb",
]

# Specify the file path
file_path = os.path.join(data_input_dir, "headers.csv")

with open(file_path, "w", newline="") as file:
csv_writer = csv.writer(file)
csv_writer.writerow(features)

print(f"features written to {file_path}")

# Split the data
split_higgs(
input_data_path=os.path.join(data_input_dir, "higgs.csv"),
input_header_path=os.path.join(data_input_dir, "headers.csv"),
output_dir=data_output_dir,
site_num=n_clients,
sample_rate=0.3,
)

# Create the federated learning job
job = FedJob(name="kmeans")

controller = ScatterAndGather(
min_clients=n_clients,
num_rounds=num_rounds,
wait_time_after_min_received=0,
aggregator_id="aggregator",
persistor_id="persistor", # TODO: Allow adding python objects rather than ids
shareable_generator_id="shareable_generator",
train_task_name="train", # Client will start training once received such task.
train_timeout=0,
)
job.to(controller, "server")

# For kmeans with sklean, we need a custom persistor
# JoblibModelParamPersistor is a persistor which save/read the model to/from file with JobLib format.
persistor = JoblibModelParamPersistor(initial_params={"n_clusters": 2})
# When assigning the persistor to the server, we need to specify the id that's expected
# by the ScatterAndGather controller.
job.to(persistor, "server")

# Similarly, ScatterAndGather expects a "shareable_generator" which we need to assign to the server.
job.to(FullModelShareableGenerator(), "server")

# ScatterAndGather also expects an "aggregator" which we define here.
# The actual aggregation function is defined by an "assembler" to specify how to handle the collected updates.
aggregator = CollectAndAssembleAggregator(
assembler_id="kmeans_assembler"
) # TODO: Allow adding KMeansAssembler() directly
job.to(aggregator, "server")

# This is the assembler designed for k-Means algorithm.
# As CollectAndAssembleAggregator expects an assembler_id, we need to specify it here.
job.to(KMeansAssembler(), "server", id="kmeans_assembler")

# Add clients
for i in range(n_clients):
executor = ScriptExecutor(
task_script_path=train_script,
task_script_args=f"--data_root_dir {data_output_dir}",
params_exchange_format=ExchangeFormat.RAW, # kmeans requires raw values only rather than PyTorch Tensors (the default)
holgerroth marked this conversation as resolved.
Show resolved Hide resolved
)
job.to(executor, f"site-{i+1}", gpu=0) # HIGGs data splitter assumes site names start from 1

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir")
44 changes: 44 additions & 0 deletions examples/hello-world/job_api/pt/client_api_lightning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.lit_net import LitNet

from nvflare import FedAvg, FedJob, ScriptExecutor

if __name__ == "__main__":
n_clients = 2
num_rounds = 2
train_script = "src/cifar10_lightning_fl.py"

job = FedJob(name="cifar10_fedavg_lightning")

# Define the controller workflow and send to server
controller = FedAvg(
min_clients=n_clients,
num_rounds=num_rounds,
)
job.to(controller, "server")

# Define the initial global model and send to server
job.to(LitNet(), "server")

# Add clients
for i in range(n_clients):
executor = ScriptExecutor(
task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}"
)
job.to(executor, f"site-{i}", gpu=0)

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir")
44 changes: 44 additions & 0 deletions examples/hello-world/job_api/pt/client_api_pt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

holgerroth marked this conversation as resolved.
Show resolved Hide resolved
from src.net import Net

from nvflare import FedAvg, FedJob, ScriptExecutor

if __name__ == "__main__":
n_clients = 2
num_rounds = 2
train_script = "src/cifar10_fl.py"

job = FedJob(name="cifar10_fedavg")

# Define the controller workflow and send to server
controller = FedAvg(
min_clients=n_clients,
num_rounds=num_rounds,
)
job.to(controller, "server")

# Define the initial global model and send to server
job.to(Net(), "server")

# Add clients
for i in range(n_clients):
executor = ScriptExecutor(
task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}"
)
job.to(executor, f"site-{i}", gpu=0)

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir")
47 changes: 47 additions & 0 deletions examples/hello-world/job_api/pt/client_api_pt_cyclic_cc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.net import Net

from nvflare import FedJob, ScriptExecutor
from nvflare.app_common.ccwf import CyclicClientController, CyclicServerController
from nvflare.app_common.ccwf.comps.simple_model_shareable_generator import SimpleModelShareableGenerator
from nvflare.app_opt.pt.file_model_persistor import PTFileModelPersistor

if __name__ == "__main__":
n_clients = 2
num_rounds = 3
train_script = "src/cifar10_fl.py"

job = FedJob(name="cifar10_cyclic")

controller = CyclicServerController(num_rounds=num_rounds, max_status_report_interval=300)
job.to(controller, "server")

for i in range(n_clients):
executor = ScriptExecutor(
task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}"
)
job.to(executor, f"site-{i}", gpu=0)

# Add client-side controller for cyclic workflow
executor = CyclicClientController()
job.to(executor, f"site-{i}", tasks=["cyclic_*"])

# In swarm learning, each client uses a model persistor and shareable_generator
job.to(PTFileModelPersistor(model=Net()), f"site-{i}", id="persistor")
job.to(SimpleModelShareableGenerator(), f"site-{i}", id="shareable_generator")

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir")
46 changes: 46 additions & 0 deletions examples/hello-world/job_api/pt/client_api_pt_dp_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.net import Net

from nvflare import FedAvg, FedJob, FilterType, ScriptExecutor
from nvflare.app_common.filters.percentile_privacy import PercentilePrivacy

if __name__ == "__main__":
n_clients = 2
num_rounds = 2
train_script = "src/cifar10_fl.py"

job = FedJob(name="cifar10_fedavg_privacy")

# Define the controller workflow and send to server
controller = FedAvg(
min_clients=n_clients,
num_rounds=num_rounds,
)
job.to(controller, "server")

# Define the initial global model and send to server
job.to(Net(), "server")

for i in range(n_clients):
executor = ScriptExecutor(task_script_path=train_script, task_script_args="")
job.to(executor, f"site-{i}", tasks=["train"], gpu=0)

# add privacy filter.
pp_filter = PercentilePrivacy(percentile=10, gamma=0.01)
job.to(pp_filter, f"site-{i}", tasks=["train"], filter_type=FilterType.TASK_RESULT)

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir")
Loading
Loading