From e253a6af279ecb30d42c20fe19b944befafb80f8 Mon Sep 17 00:00:00 2001 From: PhilippPlank <32519998+PhilippPlank@users.noreply.github.com> Date: Wed, 16 Feb 2022 19:48:05 +0100 Subject: [PATCH] Refactoring py/ports (#131) * - refactor creation of ImplicitVarPort into a common function * - refactor creation of ImplicitVarPort into a common function * - refactor inheritage of PyInPort and PyOutPort to a common parent class and removed anonymous argument list for their init functions * - Adressed requested changes of PR (rename of class, adding comments) * - fix unit tests - try resolving conflict * - Adressed requested changes of PR (adding comments) --- src/lava/magma/compiler/builders/builder.py | 4 +- src/lava/magma/compiler/compiler.py | 17 +- src/lava/magma/core/model/py/ports.py | 522 +++++++++++++++++-- src/lava/magma/core/process/ports/ports.py | 84 +-- tests/lava/magma/core/model/py/test_ports.py | 9 +- tests/lava/magma/core/model/test_py_model.py | 8 +- 6 files changed, 535 insertions(+), 109 deletions(-) diff --git a/src/lava/magma/compiler/builders/builder.py b/src/lava/magma/compiler/builders/builder.py index 6cba83b68..5275b745b 100644 --- a/src/lava/magma/compiler/builders/builder.py +++ b/src/lava/magma/compiler/builders/builder.py @@ -29,7 +29,7 @@ from lava.magma.compiler.utils import VarInitializer, PortInitializer, \ VarPortInitializer from lava.magma.core.model.py.ports import ( - AbstractPyPort, + AbstractPyIOPort, PyInPort, PyOutPort, PyRefPort, @@ -340,7 +340,7 @@ def build(self): for name, p in self.py_ports.items(): # Build PyPort lt = self._get_lava_type(name) - port_cls = ty.cast(ty.Type[AbstractPyPort], lt.cls) + port_cls = ty.cast(ty.Type[AbstractPyIOPort], lt.cls) csp_ports = [] if name in self.csp_ports: csp_ports = self.csp_ports[name] diff --git a/src/lava/magma/compiler/compiler.py b/src/lava/magma/compiler/compiler.py index 9a1cbbd2f..962f40694 100644 --- a/src/lava/magma/compiler/compiler.py +++ b/src/lava/magma/compiler/compiler.py @@ -34,7 +34,7 @@ from lava.magma.core.model.py.ports import RefVarTypeMapping from lava.magma.core.model.sub.model import AbstractSubProcessModel from lava.magma.core.process.ports.ports import AbstractPort, VarPort, \ - ImplicitVarPort + ImplicitVarPort, RefPort from lava.magma.core.process.process import AbstractProcess from lava.magma.core.resources import CPU, NeuroCore from lava.magma.core.run_configs import RunConfig @@ -217,21 +217,10 @@ def _propagate_var_ports(proc: AbstractProcess): for vp in proc.var_ports: v = vp.var.aliased_var if v is not None: - sub_proc = v.process # Create an implicit Var port in the sub process - new_vp = ImplicitVarPort(v) - # Propagate name and parent process of Var to VarPort - new_vp.name = "_" + v.name + "_implicit_port" - new_vp.process = sub_proc - # VarPort name could shadow existing attribute - if hasattr(sub_proc, new_vp.name): - raise AssertionError( - "Name of implicit VarPort might conflict" - " with existing attribute.") - setattr(sub_proc, new_vp.name, new_vp) - sub_proc.var_ports.add_members({new_vp.name: new_vp}) + imp_vp = RefPort.create_implicit_var_port(v) # Connect the VarPort to the new VarPort - vp.connect(new_vp) + vp.connect(imp_vp) def _expand_sub_proc_model(self, model_cls: ty.Type[AbstractSubProcessModel], diff --git a/src/lava/magma/core/model/py/ports.py b/src/lava/magma/core/model/py/ports.py index e2c35bc9f..faf0e5103 100644 --- a/src/lava/magma/core/model/py/ports.py +++ b/src/lava/magma/core/model/py/ports.py @@ -9,20 +9,127 @@ from lava.magma.compiler.channels.interfaces import AbstractCspPort from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort from lava.magma.core.model.interfaces import AbstractPortImplementation +from lava.magma.core.model.model import AbstractProcessModel from lava.magma.runtime.mgmt_token_enums import enum_to_np, enum_equal class AbstractPyPort(AbstractPortImplementation): + """Abstract class for Ports implemented in Python. + + Ports at the Process level provide an interface to connect + Processes with each other. Once two Processes have been connected by Ports, + they can exchange data. + Lava provides four types of Ports: InPorts, OutPorts, RefPorts and VarPorts. + An OutPort of a Process can be connected to one or multiple InPorts of other + Processes to transfer data from the OutPort to the InPorts. A RefPort of a + Process can be connected to a VarPort of another Process. The difference to + In-/OutPorts is that a VarPort is directly linked to a Var and via a + RefPort the Var can be directly modified from a different Process. + To exchange data, PyPorts provide an interface to send and receive messages + via channels implemented by a backend messaging infrastructure, which has + been inspired by the Communicating Sequential Processes (CSP) paradigm. + Thus, a channel denotes a CSP channel of the messaging infrastructure and + CSP Ports denote the low level ports also used in the messaging + infrastructure. PyPorts are the implementation for message exchange in + Python, using the low level CSP Ports of the backend messaging + infrastructure. A PyPort may have one or multiple connection to other + PyPorts. These connections are represented by csp_ports, which is a list of + CSP ports corresponding to the connected PyPorts. + """ @property @abstractmethod def csp_ports(self) -> ty.List[AbstractCspPort]: - """Returns all csp ports of the port.""" + """ + Abstract property to get a list of the corresponding CSP Ports of all + connected PyPorts. The CSP Port is the low level interface of the + backend messaging infrastructure which is used to send and receive data. + + Returns + ------- + A list of all CSP Ports connected to the PyPort. + """ pass -class PyInPort(AbstractPyPort): +class AbstractPyIOPort(AbstractPyPort): + """Abstract class of an input/output Port implemented in python. + + A PyIOPort can either be an input or an output Port and is the common + abstraction of PyInPort/PyOutPort. + _csp_ports is a list of CSP Ports which are used to send/receive data by + connected PyIOPorts. + + Parameters + ---------- + csp_ports : list + A list of CSP Ports used by this IO Port. + + process_model : AbstractProcessModel + The process model used by the process of the Port. + + shape : tuple + The shape of the Port. + + d_type: type + The data type of the Port. + + Attributes + ---------- + _csp_ports : list + A list of CSP Ports used by this IO Port. + """ + def __init__(self, + csp_ports: ty.List[AbstractCspPort], + process_model: AbstractProcessModel, + shape: ty.Tuple[int, ...], + d_type: type): + + self._csp_ports = csp_ports + super().__init__(process_model, shape, d_type) + + @property + def csp_ports(self) -> ty.List[AbstractCspPort]: + """Property to get the corresponding CSP Ports of all connected + PyPorts (csp_ports). The CSP Port is the low level interface of the + backend messaging infrastructure which is used to send and receive data. + + Returns + ------- + A list of all CSP Ports connected to the PyPort. + """ + return self._csp_ports + + +class PyInPort(AbstractPyIOPort): """Python implementation of InPort used within AbstractPyProcessModel. - If buffer is empty, recv() will be blocking. + + PyInPort is an input Port that can be used in a Process to receive data sent + from a connected PyOutPort of another Process over a channel. PyInPort can + receive (recv()) the data, which removes it from the channel, look (peek()) + at the data which keeps it on the channel or check (probe()) if there is + data on the channel. The different class attributes are used to select the + type of OutPorts via LavaPyType declarations in PyProcModels, e.g., + LavaPyType(PyInPort.VEC_DENSE, np.int32, precision=24) creates a PyInPort. + A PyOutPort (source) can be connected to one or multiple PyInPorts (target). + + Class attributes + ---------------- + VEC_DENSE : PyInPortVectorDense, default=None + Type of PyInPort. CSP Port sends data as dense vector. + + VEC_SPARSE : PyInPortVectorSparse, default=None + Type of PyInPort. CSP Port sends data as sparse vector (data + indices), + so only entries which have changed in a vector need to be communicated. + + SCALAR_DENSE : PyInPortScalarDense, default=None + Type of PyInPort. CSP Port sends data element by element for the whole + data structure. So the CSP channel does need less memory to transfer + data. + + SCALAR_SPARSE : PyInPortScalarSparse, default=None + Type of PyInPort. CSP Port sends data element by element, but after each + element the index of the data entry is also given. So only entries which + need to be changed need to be communicated. """ VEC_DENSE: ty.Type["PyInPortVectorDense"] = None @@ -30,75 +137,121 @@ class PyInPort(AbstractPyPort): SCALAR_DENSE: ty.Type["PyInPortScalarDense"] = None SCALAR_SPARSE: ty.Type["PyInPortScalarSparse"] = None - def __init__(self, csp_recv_ports: ty.List[CspRecvPort], *args): - self._csp_recv_ports = csp_recv_ports - super().__init__(*args) - - @property - def csp_ports(self) -> ty.List[AbstractCspPort]: - """Returns all csp ports of the port.""" - return self._csp_recv_ports - @abstractmethod def recv(self): + """Abstract method to receive data (vectors/scalars) sent from connected + OutPorts (source Ports). Removes the retrieved data from the channel. + Expects data on the channel and will block execution if there is no data + to retrieve on the channel. + + Returns + ------- + The scalar or vector received from a connected OutPort. If the InPort is + connected to several OutPorts, their input is added in a point-wise + fashion. + """ pass @abstractmethod def peek(self): + """Abstract method to receive data (vectors/scalars) sent from connected + OutPorts (source Ports). Keeps the data on the channel. + + Returns + ------- + The scalar or vector received from a connected OutPort. If the InPort is + connected to several OutPorts, their input is added in a point-wise + fashion. + """ pass def probe(self) -> bool: - """Executes probe method of all csp ports and accumulates the returned - bool values with AND operation. The accumulator acc is initialized to - True. + """Method to check (probe) if there is data (vectors/scalars) + to receive from connected OutPorts (source Ports). + + Returns + ------- + result : bool + Returns True only when there is data to receive from all connected + OutPort channels. - Returns the accumulated bool value. """ - # Returns True only when probe returns True for all _csp_recv_ports. return ft.reduce( lambda acc, csp_port: acc and csp_port.probe(), - self._csp_recv_ports, + self.csp_ports, True, ) class PyInPortVectorDense(PyInPort): + """Python implementation of PyInPort for dense vector data.""" def recv(self) -> np.ndarray: + """Method to receive data (vectors/scalars) sent from connected + OutPorts (source Ports). Removes the retrieved data from the channel. + Expects data on the channel and will block execution if there is no data + to retrieve on the channel. + + Returns + ------- + result : ndarray of shape _shape + The vector received from a connected OutPort. If the InPort is + connected to several OutPorts, their input is added in a point-wise + fashion. + """ return ft.reduce( lambda acc, csp_port: acc + csp_port.recv(), - self._csp_recv_ports, + self.csp_ports, np.zeros(self._shape, self._d_type), ) def peek(self) -> np.ndarray: + """Method to receive data (vectors) sent from connected + OutPorts (source Ports). Keeps the data on the channel. + + Returns + ------- + result : ndarray of shape _shape + The vector received from a connected OutPort. If the InPort is + connected to several OutPorts, their input is added in a point-wise + fashion. + """ return ft.reduce( lambda acc, csp_port: acc + csp_port.peek(), - self._csp_recv_ports, + self.csp_ports, np.zeros(self._shape, self._d_type), ) class PyInPortVectorSparse(PyInPort): + """Python implementation of PyInPort for sparse vector data.""" def recv(self) -> ty.Tuple[np.ndarray, np.ndarray]: + """TBD""" pass def peek(self) -> ty.Tuple[np.ndarray, np.ndarray]: + """TBD""" pass class PyInPortScalarDense(PyInPort): + """Python implementation of PyInPort for dense scalar data.""" def recv(self) -> int: + """TBD""" pass def peek(self) -> int: + """TBD""" pass class PyInPortScalarSparse(PyInPort): + """Python implementation of PyInPort for sparse scalar data.""" def recv(self) -> ty.Tuple[int, int]: + """TBD""" pass def peek(self) -> ty.Tuple[int, int]: + """TBD""" pass @@ -108,50 +261,93 @@ def peek(self) -> ty.Tuple[int, int]: PyInPort.SCALAR_SPARSE = PyInPortScalarSparse -class PyOutPort(AbstractPyPort): - """Python implementation of OutPort used within AbstractPyProcessModels.""" +class PyOutPort(AbstractPyIOPort): + """Python implementation of OutPort used within AbstractPyProcessModels. + + PyOutPort is an output Port sending data to a connected input Port + (PyInPort) over a channel. PyOutPort can send (send()) the data by adding it + to the channel, or it can clear (flush()) the channel to remove any data + from it. The different class attributes are used to select the type of + OutPorts via LavaPyType declarations in PyProcModels, e.g., LavaPyType( + PyOutPort.VEC_DENSE, np.int32, precision=24) creates a PyOutPort. + A PyOutPort (source) can be connected to one or multiple PyInPorts (target). + + Class attributes + ---------------- + VEC_DENSE : PyOutPortVectorDense, default=None + Type of PyInPort. CSP Port sends data as dense vector. + + VEC_SPARSE : PyOutPortVectorSparse, default=None + Type of PyInPort. CSP Port sends data as sparse vector (data + indices), + so only entries which have changed in a vector need to be communicated. + + SCALAR_DENSE : PyOutPortScalarDense, default=None + Type of PyInPort. CSP Port sends data element by element for the whole + data structure. So the CSP channel does need less memory to transfer + data. + + SCALAR_SPARSE : PyOutPortScalarSparse, default=None + Type of PyInPort. CSP Port sends data element by element, but after each + element the index of the data entry is also given. So only entries which + need to be changed need to be communicated. + """ VEC_DENSE: ty.Type["PyOutPortVectorDense"] = None VEC_SPARSE: ty.Type["PyOutPortVectorSparse"] = None SCALAR_DENSE: ty.Type["PyOutPortScalarDense"] = None SCALAR_SPARSE: ty.Type["PyOutPortScalarSparse"] = None - def __init__(self, csp_send_ports: ty.List[CspSendPort], *args): - self._csp_send_ports = csp_send_ports - super().__init__(*args) - - @property - def csp_ports(self) -> ty.List[AbstractCspPort]: - """Returns all csp ports of the port.""" - return self._csp_send_ports - @abstractmethod def send(self, data: ty.Union[np.ndarray, int]): + """Abstract method to send data to the connected Port PyInPort (target). + + Parameters + ---------- + data : ndarray or int + The data (vector or scalar) to be sent to the PyInPort (target). + """ pass def flush(self): + """TBD""" pass class PyOutPortVectorDense(PyOutPort): + """Python implementation of PyOutPort for dense vector data.""" + def send(self, data: np.ndarray): - """Sends data only if port is not dangling.""" - for csp_port in self._csp_send_ports: + """Abstract method to send data to the connected in Port (target). + + Sends data only if the OutPort is connected to at least one InPort. + + Parameters + ---------- + data : ndarray + The data vector to be sent to the in Port (target). + """ + for csp_port in self.csp_ports: csp_port.send(data) class PyOutPortVectorSparse(PyOutPort): + """Python implementation of PyOutPort for sparse vector data.""" def send(self, data: np.ndarray, idx: np.ndarray): + """TBD""" pass class PyOutPortScalarDense(PyOutPort): + """Python implementation of PyOutPort for dense scalar data.""" def send(self, data: int): + """TBD""" pass class PyOutPortScalarSparse(PyOutPort): + """Python implementation of PyOutPort for sparse scalar data.""" def send(self, data: int, idx: int): + """TBD""" pass @@ -162,12 +358,67 @@ def send(self, data: int, idx: int): class VarPortCmd: + """Helper class to specify constants. Used for communication between + PyRefPorts and PyVarPorts.""" GET = enum_to_np(0) SET = enum_to_np(1) class PyRefPort(AbstractPyPort): - """Python implementation of RefPort used within AbstractPyProcessModels.""" + """Python implementation of RefPort used within AbstractPyProcessModels. + + A PyRefPort is a Port connected to a VarPort of a variable Var of another + Process. It is used to get or set the value of the referenced Var across + Processes. A PyRefPort is connected via two CSP channels and corresponding + CSP ports to a PyVarPort. One channel is used to send data from the + PyRefPort to the PyVarPort and the other channel is used to receive data + from the PyVarPort. PyRefPorts can get the value of a referenced Var + (read()) or set the value of a referenced Var (write()). + + Parameters + ---------- + csp_send_port : CspSendPort or None + Used to send data to the referenced Port PyVarPort (target). + + csp_recv_port: CspRecvPort or None + Used to receive data from the referenced Port PyVarPort (source). + + process_model : AbstractProcessModel + The process model used by the process of the Port. + + shape : tuple, default=tuple() + The shape of the Port. + + d_type: type, default=int + The data type of the Port. + + Attributes + ---------- + _csp_send_port : CspSendPort + Used to send data to the referenced Port PyVarPort (target). + + _csp_recv_port : CspRecvPort + Used to receive data from the referenced Port PyVarPort (source). + + Class attributes + ---------------- + VEC_DENSE : PyRefPortVectorDense, default=None + Type of PyInPort. CSP Port sends data as dense vector. + + VEC_SPARSE : PyRefPortVectorSparse, default=None + Type of PyInPort. CSP Port sends data as sparse vector (data + indices), + so only entries which have changed in a vector need to be communicated. + + SCALAR_DENSE : PyRefPortScalarDense, default=None + Type of PyInPort. CSP Port sends data element by element for the whole + data structure. So the CSP channel does need less memory to transfer + data. + + SCALAR_SPARSE : PyRefPortScalarSparse, default=None + Type of PyInPort. CSP Port sends data element by element, but after each + element the index of the data entry is also given. So only entries which + need to be changed need to be communicated. + """ VEC_DENSE: ty.Type["PyRefPortVectorDense"] = None VEC_SPARSE: ty.Type["PyRefPortVectorSparse"] = None @@ -176,27 +427,44 @@ class PyRefPort(AbstractPyPort): def __init__(self, csp_send_port: ty.Optional[CspSendPort], - csp_recv_port: ty.Optional[CspRecvPort], *args): + csp_recv_port: ty.Optional[CspRecvPort], + process_model: AbstractProcessModel, + shape: ty.Tuple[int, ...] = tuple(), + d_type: type = int): self._csp_recv_port = csp_recv_port self._csp_send_port = csp_send_port - super().__init__(*args) + super().__init__(process_model, shape, d_type) @property def csp_ports(self) -> ty.List[AbstractCspPort]: - """Returns all csp ports of the port.""" + """Property to get the corresponding CSP Ports of all connected + PyPorts (csp_ports). The CSP Port is the low level interface of the + backend messaging infrastructure which is used to send and receive data. + + Returns + ------- + A list of all CSP Ports connected to the PyPort. + """ if self._csp_send_port is not None and self._csp_recv_port is not None: return [self._csp_send_port, self._csp_recv_port] else: - # In this case the port was not connected + # In this case the Port was not connected return [] + @abstractmethod def read( self, ) -> ty.Union[ np.ndarray, ty.Tuple[np.ndarray, np.ndarray], int, ty.Tuple[int, int] ]: + """Abstract method to request and return data from a VarPort. + Returns + ------- + The value of the referenced var. + """ pass + @abstractmethod def write( self, data: ty.Union[ @@ -206,12 +474,27 @@ def write( ty.Tuple[int, int], ], ): + """Abstract method to write data to a VarPort to set its Var. + + Parameters + ---------- + data : ndarray, tuple of ndarray, int, tuple of int + The new value of the referenced Var. + """ pass class PyRefPortVectorDense(PyRefPort): + """Python implementation of RefPort for dense vector data.""" def read(self) -> np.ndarray: - """Requests the data from a VarPort and returns the data.""" + """Method to request and return data from a referenced Var using a + PyVarPort. + + Returns + ------- + result : ndarray of shape _shape + The value of the referenced Var. + """ if self._csp_send_port and self._csp_recv_port: header = np.ones(self._csp_send_port.shape) * VarPortCmd.GET self._csp_send_port.send(header) @@ -221,7 +504,14 @@ def read(self) -> np.ndarray: return np.zeros(self._shape, self._d_type) def write(self, data: np.ndarray): - """Sends the data to a VarPort to set its Var.""" + """Abstract method to write data to a VarPort to set the value of the + referenced Var. + + Parameters + ---------- + data : ndarray + The data to send via _csp_send_port. + """ if self._csp_send_port: header = np.ones(self._csp_send_port.shape) * VarPortCmd.SET self._csp_send_port.send(header) @@ -229,26 +519,35 @@ def write(self, data: np.ndarray): class PyRefPortVectorSparse(PyRefPort): + """Python implementation of RefPort for sparse vector data.""" def read(self) -> ty.Tuple[np.ndarray, np.ndarray]: + """TBD""" pass def write(self, data: np.ndarray, idx: np.ndarray): + """TBD""" pass class PyRefPortScalarDense(PyRefPort): + """Python implementation of RefPort for dense scalar data.""" def read(self) -> int: + """TBD""" pass def write(self, data: int): + """TBD""" pass class PyRefPortScalarSparse(PyRefPort): + """Python implementation of RefPort for sparse scalar data.""" def read(self) -> ty.Tuple[int, int]: + """TBD""" pass def write(self, data: int, idx: int): + """TBD""" pass @@ -260,6 +559,64 @@ def write(self, data: int, idx: int): class PyVarPort(AbstractPyPort): """Python implementation of VarPort used within AbstractPyProcessModel. + + A PyVarPort is a Port linked to a variable Var of a Process and might be + connected to a RefPort of another process. It is used to get or set the + value of the referenced Var across Processes. A PyVarPort is connected via + two channels to a PyRefPort. One channel is used to send data from the + PyRefPort to the PyVarPort and the other is used to receive data from the + PyVarPort. PyVarPorts set or send the value of the linked Var (service()) + given the command VarPortCmd received by a connected PyRefPort. + + Parameters + ---------- + var_name : str + The name of the Var linked to this VarPort. + + csp_send_port : CspSendPort or None + Csp Port used to send data to the referenced in Port (target). + + csp_recv_port: CspRecvPort or None + Csp Port used to receive data from the referenced Port (source). + + process_model : AbstractProcessModel + The process model used by the process of the Port. + + shape : tuple, default=tuple() + The shape of the Port. + + d_type: type, default=int + The data type of the Port. + + Attributes + ---------- + var_name : str + The name of the Var linked to this VarPort. + + _csp_send_port : CspSendPort + Used to send data to the referenced Port PyRefPort (target). + + _csp_recv_port : CspRecvPort + Used to receive data from the referenced Port PyRefPort (source). + + Class attributes + ---------------- + VEC_DENSE : PyVarPortVectorDense, default=None + Type of PyInPort. CSP Port sends data as dense vector. + + VEC_SPARSE : PyVarPortVectorSparse, default=None + Type of PyInPort. CSP Port sends data as sparse vector (data + indices), + so only entries which have changed in a vector need to be communicated. + + SCALAR_DENSE : PyVarPortScalarDense, default=None + Type of PyInPort. CSP Port sends data element by element for the whole + data structure. So the CSP channel does need less memory to transfer + data. + + SCALAR_SPARSE : PyVarPortScalarSparse, default=None + Type of PyInPort. CSP Port sends data element by element, but after each + element the index of the data entry is also given. So only entries which + need to be changed need to be communicated. """ VEC_DENSE: ty.Type["PyVarPortVectorDense"] = None @@ -270,30 +627,51 @@ class PyVarPort(AbstractPyPort): def __init__(self, var_name: str, csp_send_port: ty.Optional[CspSendPort], - csp_recv_port: ty.Optional[CspRecvPort], *args): + csp_recv_port: ty.Optional[CspRecvPort], + process_model: AbstractProcessModel, + shape: ty.Tuple[int, ...] = tuple(), + d_type: type = int): self._csp_recv_port = csp_recv_port self._csp_send_port = csp_send_port self.var_name = var_name - super().__init__(*args) + super().__init__(process_model, shape, d_type) @property def csp_ports(self) -> ty.List[AbstractCspPort]: - """Returns all csp ports of the port.""" + """Property to get the corresponding CSP Ports of all connected + PyPorts (csp_ports). The CSP Port is the low level interface of the + backend messaging infrastructure which is used to send and receive data. + + Returns + ------- + A list of all CSP Ports connected to the PyPort. + """ if self._csp_send_port is not None and self._csp_recv_port is not None: return [self._csp_send_port, self._csp_recv_port] else: - # In this case the port was not connected + # In this case the Port was not connected return [] + @abstractmethod def service(self): + """Abstract method to set the value of the linked Var of the VarPort, + received from the connected RefPort, or to send the value of the linked + Var of the VarPort to the connected RefPort. The connected RefPort + determines whether it will perform a read() or write() operation by + sending a command VarPortCmd. + """ pass class PyVarPortVectorDense(PyVarPort): + """Python implementation of VarPort for dense vector data.""" def service(self): - """Sets the received value to the given var or sends the value of the - var to the csp_send_port, depending on the received header information - of the csp_recv_port.""" + """Method to set the value of the linked Var of the VarPort, + received from the connected RefPort, or to send the value of the linked + Var of the VarPort to the connected RefPort. The connected RefPort + determines whether it will perform a read() or write() operation by + sending a command VarPortCmd. + """ # Inspect incoming data if self._csp_send_port is not None and self._csp_recv_port is not None: @@ -314,26 +692,47 @@ def service(self): class PyVarPortVectorSparse(PyVarPort): + """Python implementation of VarPort for sparse vector data.""" def recv(self) -> ty.Tuple[np.ndarray, np.ndarray]: + """TBD""" pass def peek(self) -> ty.Tuple[np.ndarray, np.ndarray]: + """TBD""" + pass + + def service(self): + """TBD""" pass class PyVarPortScalarDense(PyVarPort): + """Python implementation of VarPort for dense scalar data.""" def recv(self) -> int: + """TBD""" pass def peek(self) -> int: + """TBD""" + pass + + def service(self): + """TBD""" pass class PyVarPortScalarSparse(PyVarPort): + """Python implementation of VarPort for sparse scalar data.""" def recv(self) -> ty.Tuple[int, int]: + """TBD""" pass def peek(self) -> ty.Tuple[int, int]: + """TBD""" + pass + + def service(self): + """TBD""" pass @@ -344,7 +743,20 @@ def peek(self) -> ty.Tuple[int, int]: class RefVarTypeMapping: - """Class to get the mapping of PyRefPort types to PyVarPortTypes.""" + """Class to get the mapping of PyRefPort types to PyVarPort types. + + PyRefPorts and PyVarPorts can be implemented as different subtypes, defining + the format of the data to process. To connect PyRefPorts and PyVarPorts they + need to have a compatible data format. + This class maps the fitting data format between PyRefPorts and PyVarPorts. + + Class attributes + ---------------- + mapping : dict + Dictionary containing the mapping of compatible PyRefPort types to + PyVarPort types. + + """ mapping: ty.Dict[PyRefPort, PyVarPort] = { PyRefPortVectorDense: PyVarPortVectorDense, @@ -354,4 +766,18 @@ class RefVarTypeMapping: @classmethod def get(cls, ref_port: PyRefPort): + """Class method to return the compatible PyVarPort type given the + PyRefPort type. + + Parameters + ---------- + ref_port : PyRefPort + PyRefPort type to be mapped to a PyVarPort type. + + Returns + ------- + result : PyVarPort + PyVarPort type compatible to given PyRefPort type. + + """ return cls.mapping[ref_port] diff --git a/src/lava/magma/core/process/ports/ports.py b/src/lava/magma/core/process/ports/ports.py index 82a6ce39e..4d4a651a5 100644 --- a/src/lava/magma/core/process/ports/ports.py +++ b/src/lava/magma/core/process/ports/ports.py @@ -46,11 +46,11 @@ def __init__(self, shape: ty.Tuple): self.out_connections: ty.List[AbstractPort] = [] def _validate_ports( - self, - ports: ty.List["AbstractPort"], - port_type: ty.Type["AbstractPort"], - assert_same_shape: bool = True, - assert_same_type: bool = False, + self, + ports: ty.List["AbstractPort"], + port_type: ty.Type["AbstractPort"], + assert_same_shape: bool = True, + assert_same_type: bool = False, ): """Checks that each port in 'ports' is of type 'port_type' and that shapes of each port is identical to this port's shape.""" @@ -87,11 +87,11 @@ def _add_outputs(self, outputs: ty.List["AbstractPort"]): self.out_connections += outputs def _connect_forward( - self, - ports: ty.List["AbstractPort"], - port_type: ty.Type["AbstractPort"], - assert_same_shape: bool = True, - assert_same_type: bool = True, + self, + ports: ty.List["AbstractPort"], + port_type: ty.Type["AbstractPort"], + assert_same_shape: bool = True, + assert_same_type: bool = True, ): """Creates a forward connection from this AbstractPort to other ports by adding other ports to this AbstractPort's out_connection and @@ -107,11 +107,11 @@ def _connect_forward( p._add_inputs([self]) def _connect_backward( - self, - ports: ty.List["AbstractPort"], - port_type: ty.Type["AbstractPort"], - assert_same_shape: bool = True, - assert_same_type: bool = True, + self, + ports: ty.List["AbstractPort"], + port_type: ty.Type["AbstractPort"], + assert_same_shape: bool = True, + assert_same_type: bool = True, ): """Creates a backward connection from other ports to this AbstractPort by adding other ports to this AbstractPort's @@ -181,9 +181,9 @@ def flatten(self) -> "ReshapePort": return self.reshape((self.size,)) def concat_with( - self, - ports: ty.Union["AbstractPort", ty.List["AbstractPort"]], - axis: int, + self, + ports: ty.Union["AbstractPort", ty.List["AbstractPort"]], + axis: int, ) -> "ConcatPort": """Concatenates this port with other ports in given order along given axis by deriving and returning a new virtual ConcatPort. This implies @@ -254,7 +254,7 @@ class OutPort(AbstractIOPort, AbstractSrcPort): """ def connect( - self, ports: ty.Union["AbstractIOPort", ty.List["AbstractIOPort"]] + self, ports: ty.Union["AbstractIOPort", ty.List["AbstractIOPort"]] ): """Connects this OutPort to other InPort(s) of another process or to OutPort(s) of its parent process. @@ -287,9 +287,9 @@ class InPort(AbstractIOPort, AbstractDstPort): """ def __init__( - self, - shape: ty.Tuple, - reduce_op: ty.Optional[ty.Type[AbstractReduceOp]] = None, + self, + shape: ty.Tuple, + reduce_op: ty.Optional[ty.Type[AbstractReduceOp]] = None, ): super().__init__(shape) self._reduce_op = reduce_op @@ -305,7 +305,7 @@ def connect(self, ports: ty.Union["InPort", ty.List["InPort"]]): self._connect_forward(to_list(ports), InPort) def connect_from( - self, ports: ty.Union["AbstractIOPort", ty.List["AbstractIOPort"]] + self, ports: ty.Union["AbstractIOPort", ty.List["AbstractIOPort"]] ): """Connects other OutPort(s) to this InPort or connects other InPort(s) of parent process to this InPort. @@ -339,7 +339,7 @@ class RefPort(AbstractRVPort, AbstractSrcPort): RefPort to a Var via the connect_var(..) method.""" def connect( - self, ports: ty.Union["AbstractRVPort", ty.List["AbstractRVPort"]] + self, ports: ty.Union["AbstractRVPort", ty.List["AbstractRVPort"]] ): """Connects this RefPort to other VarPort(s) of another process or to RefPort(s) of its parent process. @@ -440,19 +440,7 @@ def connect_var(self, variables: ty.Union[Var, ty.List[Var]]): if var_shape != v.shape: raise AssertionError("All 'vars' must have same shape.") # Create a VarPort to wrap Var - vp = ImplicitVarPort(v) - # Propagate name and parent process of Var to VarPort - vp.name = "_" + v.name + "_implicit_port" - if v.process is not None: - # Only assign when parent process is already assigned - vp.process = v.process - # VarPort name could shadow existing attribute - if hasattr(v.process, vp.name): - raise AssertionError( - "Name of implicit VarPort might conflict" - " with existing attribute.") - setattr(v.process, vp.name, vp) - v.process.var_ports.add_members({vp.name: vp}) + vp = self.create_implicit_var_port(v) var_ports.append(vp) # Connect RefPort to VarPorts that wrap Vars self.connect(var_ports) @@ -461,6 +449,26 @@ def get_dst_vars(self) -> ty.List[Var]: """Returns destination Vars this RefPort is connected to.""" return [ty.cast(VarPort, p).var for p in self.get_dst_ports()] + @staticmethod + def create_implicit_var_port(var: Var) -> "ImplicitVarPort": + """Creates and returns an ImplicitVarPort for the given Var.""" + # Create a VarPort to wrap Var + vp = ImplicitVarPort(var) + # Propagate name and parent process of Var to VarPort + vp.name = "_" + var.name + "_implicit_port" + if var.process is not None: + # Only assign when parent process is already assigned + vp.process = var.process + # VarPort name could shadow existing attribute + if hasattr(var.process, vp.name): + raise AssertionError( + "Name of implicit VarPort might conflict" + " with existing attribute.") + setattr(var.process, vp.name, vp) + var.process.var_ports.add_members({vp.name: vp}) + + return vp + # TODO: (PP) enable connecting multiple VarPorts/RefPorts to a VarPort class VarPort(AbstractRVPort, AbstractDstPort): @@ -521,7 +529,7 @@ def connect(self, ports: ty.Union["VarPort", ty.List["VarPort"]]): self._connect_forward(to_list(ports), VarPort) def connect_from( - self, ports: ty.Union["AbstractRVPort", ty.List["AbstractRVPort"]] + self, ports: ty.Union["AbstractRVPort", ty.List["AbstractRVPort"]] ): """Connects other RefPort(s) to this VarPort or connects other VarPort(s) of parent process to this VarPort. diff --git a/tests/lava/magma/core/model/py/test_ports.py b/tests/lava/magma/core/model/py/test_ports.py index a929e135b..e34c799db 100644 --- a/tests/lava/magma/core/model/py/test_ports.py +++ b/tests/lava/magma/core/model/py/test_ports.py @@ -53,12 +53,15 @@ def probe_test_routine(self, cls): # Create two different PyOutPort send_py_port_1: PyOutPort = \ - PyOutPortVectorDense([send_csp_port_1], None) + PyOutPortVectorDense([send_csp_port_1], None, data.shape, + data.dtype) send_py_port_2: PyOutPort = \ - PyOutPortVectorDense([send_csp_port_2], None) + PyOutPortVectorDense([send_csp_port_2], None, data.shape, + data.dtype) # Create PyInPort with current implementation recv_py_port: PyInPort = \ - cls([recv_csp_port_1, recv_csp_port_2], None) + cls([recv_csp_port_1, recv_csp_port_2], None, data.shape, + data.dtype) recv_py_port.start() send_py_port_1.start() diff --git a/tests/lava/magma/core/model/test_py_model.py b/tests/lava/magma/core/model/test_py_model.py index af8f0f681..107c8f93f 100644 --- a/tests/lava/magma/core/model/test_py_model.py +++ b/tests/lava/magma/core/model/test_py_model.py @@ -432,14 +432,14 @@ def test_build_with_dangling_ports(self): # Validate that the Process with no OutPorts indeed has no output # CspPort self.assertIsInstance( - pm_with_no_out_ports.in_port._csp_recv_ports[0], FakeCspPort) - self.assertEqual(pm_with_no_out_ports.out_port._csp_send_ports, []) + pm_with_no_out_ports.in_port.csp_ports[0], FakeCspPort) + self.assertEqual(pm_with_no_out_ports.out_port.csp_ports, []) # Validate that the Process with no InPorts indeed has no input # CspPort - self.assertEqual(pm_with_no_in_ports.in_port._csp_recv_ports, []) + self.assertEqual(pm_with_no_in_ports.in_port.csp_ports, []) self.assertIsInstance( - pm_with_no_in_ports.out_port._csp_send_ports[0], FakeCspPort) + pm_with_no_in_ports.out_port.csp_ports[0], FakeCspPort) def test_set_ref_var_ports(self): """Check RefPorts and VarPorts can be set."""