Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

XP-480 revise message names #222

Merged
merged 7 commits into from
Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[settings]
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
use_parentheses=True
line_length=100
force_sort_within_sections=True
include_trailing_comma=True
known_third_party=xain_proto,xain_sdk
janpetschexain marked this conversation as resolved.
Show resolved Hide resolved
line_length=100
multi_line_output=3
use_parentheses=True
67 changes: 31 additions & 36 deletions docs/network_architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ A **Rendezvous** method that allows *Participants* to register with a
about the *Participant* in order to keep track of what the *Participant* is
doing.

A **StartTraining** method that allows *Participants* to get the current global
A **StartTrainingRound** method that allows *Participants* to get the current global
model as well as signaling their intent to participate in a given round.

An **EndTraining** method that allows *Participants* to submit their updated
An **EndTrainingRound** method that allows *Participants* to submit their updated
models after they finished their training task.


Expand Down Expand Up @@ -192,7 +192,7 @@ either:

# rest of the method logic

*Periodically check if a the rcp call is active:*
*Periodically check if the rcp call is active:*
janpetschexain marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

Expand All @@ -218,7 +218,7 @@ it's participant list.

.. code-block:: python

def Hearthbeat(self, request, context):
def Heartbeat(self, request, context):
self.participants[context.peer()].expires = time.now() + KEEPALIVE_TIME
return PingResponse()

Expand Down Expand Up @@ -276,12 +276,12 @@ having finished training.

Once all :math:`N` have finished training, :math:`C` collects together all the
trained data and aggregates them generating a new global model. It either
increments the round and repeats, or if there are more rounds to go, it
increments the round and repeats, or if there are no more rounds to go, it
transitions to the **FINISHED** state signaling the participants to disconnect.

So far we've only discussed the lifecycle of a *successful* interaction with
all participants i.e. without faults, dropouts, etc. The true picture (taking
into account of `fault tolerance <https://hackmd.io/gzGSJZ2xQTyERNjTpqguqg>`_)
into account `fault tolerance <https://hackmd.io/gzGSJZ2xQTyERNjTpqguqg>`_)
will be more complicated than above but this is still useful to give the basic
structure.

Expand All @@ -300,33 +300,31 @@ single-machine "prototype" :code:`fl/Coordinator`:
.. code-block:: python

# note: code updated since time of writing but idea remains the same
def train_local(p, theta, epochs, epoch_base):
theta_update, history = p.train_round(theta, epochs, epoch_base)
def train_local(p, weights, epochs, epoch_base):
weights_update, history = p.train_round(weights, epochs, epoch_base)
metrics = p.metrics()
return theta_update, history, metrics
return weights_update, history, metrics

To do its training, :math:`P` will invoke its own :code:`train_round` function.
For this, it requires the following data (annotated with their types) from
:math:`C`

* :code:`theta: KerasWeights` where :code:`KerasWeights = List[ndarray]`
* :code:`weights: List[ndarray]`
* :code:`epochs: int`
* :code:`epoch_base: int`

In return :math:`P` sends back a pair of data

* :code:`theta_update: Tuple[KerasWeights, int]`
* :code:`history: KerasHistory` where :code:`KerasHistory = Dict[str, List[float]]`
* :code:`weights_update: Tuple[List[ndarray], int]`
* :code:`history: Dict[str, List[float]]`

After a :code:`train_round`, :math:`C` also needs from :math:`P`
a :code:`metrics` of type

* :code:`Metrics = Tuple[int, VolumeByClass]` where :code:`VolumeByClass = List[int]`
a :code:`metrics` of type :code:`Dict[str, ndarray]`.

.. note::

It is worth bearing in mind that since we are working with gRPC, all service
calls must be initiated by the client (as discussed above) i.e. :math:`P`. This
calls must be initiated by the client (as discussed above), i.e. :math:`P`. This
is completely unlike the code excerpt above, where it is naturally :math:`C`
that calls :math:`P`.

Expand All @@ -342,30 +340,28 @@ exchanging training data. The Coordinator exposes two service methods

.. code-block:: proto

rpc StartTraining(StartTrainingRequest) returns (StartTrainingReply) {}
rpc EndTraining(EndTrainingRequest) returns (EndTrainingReply) {}
rpc StartTrainingRound(StartTrainingRoundRequest) returns (StartTrainingRoundResponse) {}
rpc EndTrainingRound(EndTrainingRoundRequest) returns (EndTrainingRoundResponse) {}

where the request and reply data are given as the following protobuf messages:
where the request and response data are given as the following protobuf messages:

.. code-block:: proto

message StartTrainingRequest {}
message StartTrainingRoundRequest {}

message StartTrainingReply {
repeated numproto.protobuf.NDArray theta = 1;
message StartTrainingRoundResponse {
repeated numproto.protobuf.NDArray weights = 1;
int32 epochs = 2;
int32 epoch_base = 3;
}

message EndTrainingRequest {
ThetaUpdate theta_update = 1;
map<string, HistoryValue> history = 2;
Metrics metrics = 3; /* also bundled here */
message EndTrainingRoundRequest {
repeated numproto.protobuf.NDArray weights = 1;
int32 number_samples = 2;
map<string, numproto.protobuf.NDArray> metrics = 3;
}

/* message defns for ThetaUpdate, HistoryValue, Metrics omitted */

message EndTrainingReply {}
message EndTrainingRoundResponse {}


Note that while most of the Python data types to be exchanged can be
Expand All @@ -376,14 +372,13 @@ this conversion.
Training Round Communication
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The communication is
summarised in the following sequence diagram.
The communication is summarised in the following sequence diagram.
In a training round, :math:`C` is in the state :code:`ROUND`. The selected
participant :math:`P` is in :code:`TRAINING` state. The first message is by
:math:`P` essentially kicks off the exchange. :math:`C` responds with the global
model :math:`\theta` (and other data as specified in
:code:`StartTrainingReply`). Then :math:`P` carries out the training locally.
When complete, it sends the updated model :math:`\theta'` (and other metadata)
model :math:`\weights` (and other data as specified in
:code:`StartTrainingRoundResponse`). Then :math:`P` carries out the training locally.
When complete, it sends the updated model :math:`\weights'` (and other metadata)
back. :math:`C` responds with an acknowledgement.

.. image:: _static/sequence2.png
janpetschexain marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -427,8 +422,8 @@ After a successful rendezvous, :math:`P` is in **Wait for Selection**. :math:`P`
this state as long as it keeps receiving :code:`STANDBY` heartbeats. At some round
:math:`i`, :math:`C` may select :math:`P` for the round by responding with a :code:`ROUND` :math:`i`
heartbeat. At this point, :math:`P` moves to **Training** where the above sequence of
training messages (:code:`StartTraining` :math:`\rightarrow \theta \rightarrow \theta'
\rightarrow` :code:`EndTraining`) occur. Having received the :code:`EndTraining` reply from
training messages (:code:`StartTrainingRound` :math:`\rightarrow \weights \rightarrow \weights'
\rightarrow` :code:`EndTrainingRound`) occur. Having received the :code:`EndTrainingRound` response from
:math:`C`, :math:`P` makes an "internal" transition to **Post-training** where it waits
until the start of the next round. If it has been selected again, it will
observe :code:`ROUND` :math:`i+1`. If not, it observes :code:`STANDBY`. Alternatively, if round
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"numproto==0.3", # Apache License 2.0
"structlog==19.2.0", # Apache License 2.0
# TODO: change xain-proto requirement to "xain-proto==0.2.0" once it is released
"xain-proto @ git+https://github.com/xainag/xain-proto.git@0e52b2fd1ceabbcccd443b05e2438a9ce0c65178#egg=xain_proto-0.2.0&subdirectory=python", # Apache License 2.0
"xain-proto @ git+https://github.com/xainag/xain-proto.git@master#egg=xain_proto-0.1.0&subdirectory=python", # Apache License 2.0
janpetschexain marked this conversation as resolved.
Show resolved Hide resolved
"boto3==1.10.48", # Apache License 2.0
]

Expand All @@ -49,7 +49,7 @@
"pytest==5.3.2", # MIT license
"pytest-cov==2.8.1", # MIT
"pytest-watch==4.2.0", # MIT
"xain-sdk @ git+https://github.com/xainag/xain-sdk.git@development#egg=xain_sdk-0.2.0",
"xain-sdk @ git+https://github.com/xainag/xain-sdk.git@development#egg=xain_sdk-0.1.0",
janpetschexain marked this conversation as resolved.
Show resolved Hide resolved
]

docs_require = ["Sphinx==2.2.1", "m2r==0.2.1", "sphinxcontrib-mermaid==0.3.1"]
Expand Down
Loading