From 2da093c6ea6bd65c98a3780df7d9e16328c8b419 Mon Sep 17 00:00:00 2001 From: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Date: Thu, 24 Feb 2022 16:17:47 -0800 Subject: [PATCH] Add NxSDKRuntimeService (#182) * Address review comments Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Improve mnist tutorial (#147) * Minor: Removed a divide by zero warning from the fixed point LIF ProcessModel Signed-off-by: Risbud, Sumedh * Improved to MNIST end-to-end tutorial - uses fixed point bit-accurate ProcessModels for LIF and Dense - resets internal neural state of all LIF neurons - these changes are needed to make the pre-trained networks parameters work, because the network was trained with these assumptions Signed-off-by: Risbud, Sumedh * Post code review @awintel and @phstratmann Signed-off-by: Risbud, Sumedh * Post code review @awintel and @phstratmann Signed-off-by: Risbud, Sumedh * Post re-review by @phstratmann Signed-off-by: Risbud, Sumedh Co-authored-by: PhilippPlank <32519998+PhilippPlank@users.noreply.github.com> * Check process lineage before join (#177) Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Add NxSDKRuntimeService Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Fix unit test, linting Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Remove comments Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Handle nxsdk import exception Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Fix indentation issue Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Uncomment board.run in nc proc model Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Address review, rework NxSdkRuntime Service Signed-off-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> * Fix unit tests, merge with main Signed-off-by: Marcus G K Williams * Remove nc/ports.py again Remove commented code in compiler.py Signed-off-by: Marcus G K Williams * Update comments logging Signed-off-by: Marcus G K Williams * Update test Utils method name and document Signed-off-by: Marcus G K Williams * Update test name and docs for nxsdkruntimeservice Signed-off-by: Marcus G K Williams * Update docstrings for RuntimeService Signed-off-by: Marcus G K Williams * Update logging Signed-off-by: Marcus G K Williams * Remove unneeded logging import Signed-off-by: Marcus G K Williams Co-authored-by: Risbud, Sumedh Co-authored-by: PhilippPlank <32519998+PhilippPlank@users.noreply.github.com> Co-authored-by: Marcus G K Williams --- src/lava/magma/compiler/builders/builder.py | 278 ++++++--- .../magma/compiler/builders/interfaces.py | 10 +- src/lava/magma/compiler/compiler.py | 108 +++- src/lava/magma/core/model/interfaces.py | 5 + src/lava/magma/core/model/model.py | 27 +- src/lava/magma/core/model/nc/model.py | 50 +- src/lava/magma/core/model/nc/type.py | 9 + src/lava/magma/core/model/py/model.py | 8 +- src/lava/magma/core/process/process.py | 31 +- src/lava/magma/core/resources.py | 4 + src/lava/magma/core/run_configs.py | 32 +- .../core/sync/protocols/async_protocol.py | 3 +- .../core/sync/protocols/loihi_protocol.py | 21 +- src/lava/magma/runtime/runtime.py | 57 +- .../magma/runtime/runtime_services/enums.py | 44 ++ .../runtime/runtime_services/interfaces.py | 41 ++ .../runtime_services/runtime_service.py | 590 ++++++++++++++++++ tests/lava/magma/compiler/test_compiler.py | 39 +- .../magma/core/process/test_lif_dense_lif.py | 2 +- .../magma/runtime/test_exception_handling.py | 34 +- tests/lava/magma/runtime/test_get_set_var.py | 1 - .../runtime/test_nxsdkruntimeservice_loihi.py | 71 +++ .../lava/magma/runtime/test_ref_var_ports.py | 1 - tests/lava/magma/runtime/test_runtime.py | 34 +- .../magma/runtime/test_runtime_service.py | 121 +++- tests/lava/proc/io/test_dataloader.py | 1 - tests/lava/test_utils/__init__.py | 0 tests/lava/test_utils/utils.py | 22 + 28 files changed, 1430 insertions(+), 214 deletions(-) create mode 100644 src/lava/magma/core/model/nc/type.py create mode 100644 src/lava/magma/runtime/runtime_services/enums.py create mode 100644 src/lava/magma/runtime/runtime_services/interfaces.py create mode 100644 src/lava/magma/runtime/runtime_services/runtime_service.py create mode 100644 tests/lava/magma/runtime/test_nxsdkruntimeservice_loihi.py create mode 100644 tests/lava/test_utils/__init__.py create mode 100644 tests/lava/test_utils/utils.py diff --git a/src/lava/magma/compiler/builders/builder.py b/src/lava/magma/compiler/builders/builder.py index 5275b745b..22d02bf6b 100644 --- a/src/lava/magma/compiler/builders/builder.py +++ b/src/lava/magma/compiler/builders/builder.py @@ -1,7 +1,8 @@ -# Copyright (C) 2021 Intel Corporation +# Copyright (C) 2022 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import logging import typing as ty import numpy as np @@ -10,12 +11,14 @@ from lava.magma.core.sync.protocol import AbstractSyncProtocol from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\ import MessageInfrastructureInterface -from lava.magma.runtime.runtime_service import PyRuntimeService, \ - AbstractRuntimeService +from lava.magma.runtime.runtime_services.enums import LoihiVersion +from lava.magma.runtime.runtime_services.runtime_service import ( + AbstractRuntimeService, + NxSdkRuntimeService +) if ty.TYPE_CHECKING: from lava.magma.core.process.process import AbstractProcess - from lava.magma.core.model.model import AbstractProcessModel from lava.magma.runtime.runtime import Runtime from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort @@ -24,6 +27,9 @@ AbstractRuntimeServiceBuilder, AbstractChannelBuilder ) +from lava.magma.core.model.model import AbstractProcessModel +from lava.magma.core.model.nc.model import AbstractNcProcessModel +from lava.magma.core.model.nc.type import LavaNcType from lava.magma.core.model.py.model import AbstractPyProcessModel from lava.magma.core.model.py.type import LavaPyType from lava.magma.compiler.utils import VarInitializer, PortInitializer, \ @@ -39,49 +45,21 @@ ChannelType -class PyProcessBuilder(AbstractProcessBuilder): - """A PyProcessBuilder instantiates and initializes a PyProcessModel. - - The compiler creates a PyProcessBuilder for each PyProcessModel. In turn, - the runtime, loads a PyProcessBuilder onto a compute node where it builds - the PyProcessModel and its associated ports. - - In order to build the PyProcessModel, the builder inspects all LavaType - class variables of a PyProcessModel, creates the corresponding data type - with the specified properties, the shape and the initial value provided by - the Lava Var. In addition, the builder creates the required PyPort - instances. Finally, the builder assigns both port and variable - implementations to the PyProcModel. - - Once the PyProcessModel is built, it is the RuntimeService's job to - connect channels to ports and start the process. - - Note: For unit testing it should be possible to build processes locally - instead of on a remote node. For pure atomic unit testing a ProcessModel - locally, PyInPorts and PyOutPorts must be fed manually with data. - """ +class _AbstractProcessBuilder(AbstractProcessBuilder): + """A _AbstractProcessBuilder instantiates and initializes + an AbstractProcessModel but is not meant to be used + directly but inherited from""" def __init__( - self, proc_model: ty.Type[AbstractPyProcessModel], - model_id: int, - proc_params: ty.Dict[str, ty.Any] = None): - super(PyProcessBuilder, self).__init__( + self, proc_model: ty.Type[AbstractProcessModel], + model_id: int): + super(_AbstractProcessBuilder, self).__init__( proc_model=proc_model, model_id=model_id ) - if not issubclass(proc_model, AbstractPyProcessModel): - raise AssertionError("Is not a subclass of AbstractPyProcessModel") - self.vars: ty.Dict[str, VarInitializer] = {} - self.py_ports: ty.Dict[str, PortInitializer] = {} - self.ref_ports: ty.Dict[str, PortInitializer] = {} - self.var_ports: ty.Dict[str, VarPortInitializer] = {} - self.csp_ports: ty.Dict[str, ty.List[AbstractCspPort]] = {} - self.csp_rs_send_port: ty.Dict[str, CspSendPort] = {} - self.csp_rs_recv_port: ty.Dict[str, CspRecvPort] = {} - self.proc_params = proc_params @property - def proc_model(self) -> ty.Type[AbstractPyProcessModel]: + def proc_model(self) -> ty.Type[AbstractProcessModel]: return self._proc_model # ToDo: Perhaps this should even be done in Compiler? @@ -137,6 +115,68 @@ def _check_not_assigned_yet( f"Member '{key}' already found in {m_type}." ) + # ToDo: Also check that Vars are initializable with var.value provided + def set_variables(self, variables: ty.List[VarInitializer]): + """Appends the given list of variables to the ProcessModel. Used by the + compiler to create a ProcessBuilder during the compilation of + ProcessModels. + + Parameters + ---------- + variables : ty.List[VarInitializer] + + """ + self._check_members_exist(variables, "Var") + new_vars = {v.name: v for v in variables} + self._check_not_assigned_yet(self.vars, new_vars.keys(), "vars") + self.vars.update(new_vars) + + +class PyProcessBuilder(_AbstractProcessBuilder): + """A PyProcessBuilder instantiates and initializes a PyProcessModel. + + The compiler creates a PyProcessBuilder for each PyProcessModel. In turn, + the runtime, loads a PyProcessBuilder onto a compute node where it builds + the PyProcessModel and its associated ports. + + In order to build the PyProcessModel, the builder inspects all LavaType + class variables of a PyProcessModel, creates the corresponding data type + with the specified properties, the shape and the initial value provided by + the Lava Var. In addition, the builder creates the required PyPort + instances. Finally, the builder assigns both port and variable + implementations to the PyProcModel. + + Once the PyProcessModel is built, it is the RuntimeService's job to + connect channels to ports and start the process. + + Note: For unit testing it should be possible to build processes locally + instead of on a remote node. For pure atomic unit testing a ProcessModel + locally, PyInPorts and PyOutPorts must be fed manually with data. + """ + + def __init__( + self, proc_model: ty.Type[AbstractPyProcessModel], + model_id: int, + proc_params: ty.Dict[str, ty.Any] = None): + super(PyProcessBuilder, self).__init__( + proc_model=proc_model, + model_id=model_id + ) + if not issubclass(proc_model, AbstractPyProcessModel): + raise AssertionError("Is not a subclass of AbstractPyProcessModel") + self.vars: ty.Dict[str, VarInitializer] = {} + self.py_ports: ty.Dict[str, PortInitializer] = {} + self.ref_ports: ty.Dict[str, PortInitializer] = {} + self.var_ports: ty.Dict[str, VarPortInitializer] = {} + self.csp_ports: ty.Dict[str, ty.List[AbstractCspPort]] = {} + self.csp_rs_send_port: ty.Dict[str, CspSendPort] = {} + self.csp_rs_recv_port: ty.Dict[str, CspRecvPort] = {} + self.proc_params = proc_params + + @property + def proc_model(self) -> ty.Type[AbstractPyProcessModel]: + return self._proc_model + def check_all_vars_and_ports_set(self): """Checks that Vars and PyPorts assigned from Process have a corresponding LavaPyType. @@ -191,22 +231,6 @@ def check_lava_py_types(self): f"PyRefPort in '{self.proc_model.__name__}'." ) - # ToDo: Also check that Vars are initializable with var.value provided - def set_variables(self, variables: ty.List[VarInitializer]): - """Appends the given list of variables to the ProcessModel. Used by the - compiler to create a ProcessBuilder during the compilation of - ProcessModels. - - Parameters - ---------- - variables : ty.List[VarInitializer] - - """ - self._check_members_exist(variables, "Var") - new_vars = {v.name: v for v in variables} - self._check_not_assigned_yet(self.vars, new_vars.keys(), "vars") - self.vars.update(new_vars) - def set_py_ports(self, py_ports: ty.List[PortInitializer], check=True): """Appends the given list of PyPorts to the ProcessModel. Used by the compiler to create a ProcessBuilder during the compilation of @@ -425,16 +449,107 @@ def build(self): return pm -class CProcessBuilder(AbstractProcessBuilder): +class CProcessBuilder(_AbstractProcessBuilder): """C Process Builder""" pass -class NcProcessBuilder(AbstractProcessBuilder): - """Neuromorphic Core Process Builder""" +class NcProcessBuilder(_AbstractProcessBuilder): + """NcProcessBuilder instantiates and initializes an NcProcessModel. - pass + The compiler creates a NcProcessBuilder for each NcProcessModel. In turn, + the runtime, loads a NcProcessBuilder onto a compute node where it builds + the NcProcessModel and its associated vars. + + In order to build the NcProcessModel, the builder inspects all LavaType + class variables of a NcProcessModel, creates the corresponding data type + with the specified properties, the shape and the initial value provided by + the Lava Var. Finally, the builder assigns variable + implementations to the NcProcModel.""" + def __init__( + self, proc_model: ty.Type[AbstractNcProcessModel], + model_id: int, + proc_params: ty.Dict[str, ty.Any] = None): + super(NcProcessBuilder, self).__init__( + proc_model=proc_model, + model_id=model_id + ) + if not issubclass(proc_model, AbstractNcProcessModel): + raise AssertionError("Is not a subclass of AbstractNcProcessModel") + self.vars: ty.Dict[str, VarInitializer] = {} + self.proc_params = proc_params + + def _get_lava_type(self, name: str) -> LavaNcType: + return getattr(self.proc_model, name) + + def check_all_vars_set(self): + """Checks that Vars assigned from Process have a + corresponding LavaNcType. + + Raises + ------ + AssertionError + No LavaNcType found in ProcModel + """ + for attr_name in dir(self.proc_model): + attr = getattr(self.proc_model, attr_name) + if isinstance(attr, LavaNcType): + if ( + attr_name not in self.vars + ): + raise AssertionError( + f"No LavaNcType '{attr_name}' found in ProcModel " + f"'{self.proc_model.__name__}'." + ) + + def set_rs_csp_ports(self, csp_ports: ty.List[AbstractCspPort]): + pass + + def build(self): + """Builds a NcProcessModel at runtime within Runtime. + + The Compiler initializes the NcProcBuilder with the ProcModel, + VarInitializers and PortInitializers. + + At deployment to a node, the Builder.build(..) gets executed + resulting in the following: + 1. ProcModel gets instantiated + 2. Vars are initialized and assigned to ProcModel + + Returns + ------- + AbstractNcProcessModel + + + Raises + ------ + NotImplementedError + """ + + pm = self.proc_model(self.proc_params) + pm.model_id = self._model_id + + # Initialize Vars + for name, v in self.vars.items(): + # Build variable + lt = self._get_lava_type(name) + if issubclass(lt.cls, np.ndarray): + var = lt.cls(v.shape, lt.d_type) + var[:] = v.value + elif issubclass(lt.cls, (int, float)): + var = v.value + else: + raise NotImplementedError + + # Create dynamic variable attribute on ProcModel + setattr(pm, name, var) + # Create private attribute for variable precision + setattr(pm, "_" + name + "_p", lt.precision) + + pm.var_id_to_var_map[v.var_id] = name + + return pm class RuntimeServiceBuilder(AbstractRuntimeServiceBuilder): @@ -446,14 +561,19 @@ def __init__( protocol: ty.Type[AbstractSyncProtocol], runtime_service_id: int, model_ids: ty.List[int], + loihi_version: ty.Type[LoihiVersion], + loglevel: int = logging.WARNING ): super(RuntimeServiceBuilder, self).__init__(rs_class, protocol) + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) self._runtime_service_id = runtime_service_id self._model_ids: ty.List[int] = model_ids self.csp_send_port: ty.Dict[str, CspSendPort] = {} self.csp_recv_port: ty.Dict[str, CspRecvPort] = {} self.csp_proc_send_port: ty.Dict[str, CspSendPort] = {} self.csp_proc_recv_port: ty.Dict[str, CspRecvPort] = {} + self.loihi_version: ty.Type[LoihiVersion] = loihi_version @property def runtime_service_id(self): @@ -487,24 +607,40 @@ def set_csp_proc_ports(self, csp_ports: ty.List[AbstractCspPort]): if isinstance(port, CspRecvPort): self.csp_proc_recv_port.update({port.name: port}) - def build(self) -> PyRuntimeService: - """Build Runtime Service + def build(self, + loihi_version: LoihiVersion = LoihiVersion.N3 + ) -> AbstractRuntimeService: + """Build the runtime service Returns ------- - PyRuntimeService + A concreate instance of AbstractRuntimeService + [PyRuntimeService or NxSdkRuntimeService] """ - rs = self.rs_class(protocol=self.sync_protocol) + + self.log.debug("RuntimeService Class: " + str(self.rs_class)) + nxsdk_rts = False + if self.rs_class == NxSdkRuntimeService: + rs = self.rs_class(protocol=self.sync_protocol, + loihi_version=loihi_version) + nxsdk_rts = True + self.log.debug("Initilized NxSdkRuntimeService") + else: + rs = self.rs_class(protocol=self.sync_protocol) + self.log.debug("Initilized PyRuntimeService") rs.runtime_service_id = self._runtime_service_id rs.model_ids = self._model_ids - for port in self.csp_proc_send_port.values(): - if "service_to_process" in port.name: - rs.service_to_process.append(port) + if not nxsdk_rts: + for port in self.csp_proc_send_port.values(): + if "service_to_process" in port.name: + rs.service_to_process.append(port) - for port in self.csp_proc_recv_port.values(): - if "process_to_service" in port.name: - rs.process_to_service.append(port) + for port in self.csp_proc_recv_port.values(): + if "process_to_service" in port.name: + rs.process_to_service.append(port) + + self.log.debug("Setup 'RuntimeService <--> Rrocess; ports") for port in self.csp_send_port.values(): if "service_to_runtime" in port.name: @@ -514,6 +650,8 @@ def build(self) -> PyRuntimeService: if "runtime_to_service" in port.name: rs.runtime_to_service = port + self.log.debug("Setup 'Runtime <--> RuntimeService' ports") + return rs diff --git a/src/lava/magma/compiler/builders/interfaces.py b/src/lava/magma/compiler/builders/interfaces.py index e367d2747..10b377f54 100644 --- a/src/lava/magma/compiler/builders/interfaces.py +++ b/src/lava/magma/compiler/builders/interfaces.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 Intel Corporation +# Copyright (C) 2022 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ @@ -6,10 +6,10 @@ import typing as ty -from lava.magma.compiler.channels.interfaces import AbstractCspPort from lava.magma.core.model.model import AbstractProcessModel from lava.magma.core.sync.protocol import AbstractSyncProtocol -from lava.magma.runtime.runtime_service import AbstractRuntimeService +from lava.magma.runtime.runtime_services.runtime_service import \ + AbstractRuntimeService class AbstractProcessBuilder(ABC): @@ -31,10 +31,6 @@ def __init__( self._proc_model = proc_model self._model_id = model_id - @abstractmethod - def set_csp_ports(self, csp_ports: ty.List[AbstractCspPort]): - pass - @property @abstractmethod def proc_model(self) -> "AbstractProcessModel": diff --git a/src/lava/magma/compiler/compiler.py b/src/lava/magma/compiler/compiler.py index 962f40694..da1eb22d4 100644 --- a/src/lava/magma/compiler/compiler.py +++ b/src/lava/magma/compiler/compiler.py @@ -1,6 +1,7 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import logging import importlib import importlib.util as import_utils import inspect @@ -18,7 +19,7 @@ import lava.magma.compiler.exec_var as exec_var from lava.magma.compiler.builders.builder import ChannelBuilderMp from lava.magma.compiler.builders.builder import PyProcessBuilder, \ - AbstractRuntimeServiceBuilder, RuntimeServiceBuilder, \ + NcProcessBuilder, AbstractRuntimeServiceBuilder, RuntimeServiceBuilder, \ AbstractChannelBuilder, ServiceChannelBuilderMp from lava.magma.compiler.builders.builder import RuntimeChannelBuilderMp from lava.magma.compiler.channels.interfaces import ChannelType @@ -29,25 +30,37 @@ from lava.magma.core import resources from lava.magma.core.model.c.model import AbstractCProcessModel from lava.magma.core.model.model import AbstractProcessModel -from lava.magma.core.model.nc.model import AbstractNcProcessModel +from lava.magma.core.model.nc.model import ( + AbstractNcProcessModel, + NcProcessModel +) from lava.magma.core.model.py.model import AbstractPyProcessModel from lava.magma.core.model.py.ports import RefVarTypeMapping from lava.magma.core.model.sub.model import AbstractSubProcessModel from lava.magma.core.process.ports.ports import AbstractPort, VarPort, \ ImplicitVarPort, RefPort from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.resources import CPU, NeuroCore +from lava.magma.core.resources import ( + CPU, + Loihi1NeuroCore, + Loihi2NeuroCore, + NeuroCore +) from lava.magma.core.run_configs import RunConfig from lava.magma.core.sync.domain import SyncDomain from lava.magma.core.sync.protocols.async_protocol import AsyncProtocol from lava.magma.runtime.runtime import Runtime +from lava.magma.runtime.runtime_services.enums import LoihiVersion PROC_MAP = ty.Dict[AbstractProcess, ty.Type[AbstractProcessModel]] # ToDo: (AW) Document all class methods and class class Compiler: - def __init__(self, compile_cfg: ty.Optional[ty.Dict[str, ty.Any]] = None): + def __init__(self, loglevel: int = logging.WARNING, + compile_cfg: ty.Optional[ty.Dict[str, ty.Any]] = None): + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) self._compile_config = {"pypy_channel_size": 64} if compile_cfg: self._compile_config.update(compile_cfg) @@ -195,12 +208,12 @@ def _select_proc_models( run_cfg: RunConfig) -> ty.Type[AbstractProcessModel]: """Selects a ProcessModel from list of provided models given RunCfg.""" selected_proc_model = run_cfg.select(proc, models) - err_msg = f"RunConfig {run_cfg.__class__.__qualname__}.select() must " \ - f"return a sub-class of AbstractProcessModel. Got" \ - f" {type(selected_proc_model)} instead." - if not isinstance(selected_proc_model, type): - raise AssertionError(err_msg) - if not issubclass(selected_proc_model, AbstractProcessModel): + + if not isinstance(selected_proc_model, type) \ + or not issubclass(selected_proc_model, AbstractProcessModel): + err_msg = f"RunConfig {run_cfg.__class__.__qualname__}.select()" \ + f" must return a sub-class of AbstractProcessModel. Got" \ + f" {type(selected_proc_model)} instead." raise AssertionError(err_msg) return selected_proc_model @@ -375,8 +388,16 @@ def _compile_proc_models( elif issubclass(pm, AbstractCProcessModel): raise NotImplementedError elif issubclass(pm, AbstractNcProcessModel): - # ToDo: This needs to call NeuroCoreCompiler - raise NotImplementedError + for p in procs: + b = NcProcessBuilder(pm, p.id, p.proc_params) + # Create VarInitializers from lava.process Vars + v = [VarInitializer(v.name, v.shape, v.init, v.id) + for v in p.vars] + + # Assigns initializers to builder + b.set_variables(v) + b.check_all_vars_set() + nc_builders[p] = b else: raise TypeError("Non-supported ProcessModel type {}" .format(pm)) @@ -391,7 +412,8 @@ def _compile_proc_models( @staticmethod def _create_sync_domains( - proc_map: PROC_MAP, run_cfg: RunConfig, node_cfgs + proc_map: PROC_MAP, run_cfg: RunConfig, node_cfgs, + log: logging.getLoggerClass() ) -> ty.Tuple[ty.List[SyncDomain], ty.Dict[Node, ty.List[SyncDomain]]]: """Validates custom sync domains provided by run_cfg and otherwise creates default sync domains. @@ -406,7 +428,6 @@ def _create_sync_domains( unassigned processes to those default sync domains based on the sync protocol that the chosen process model implements. """ - proc_to_domain_map = OrderedDict() sync_domains = OrderedDict() @@ -422,13 +443,16 @@ def _create_sync_domains( # Validate and map all processes in sync domain for p in sd.processes: + log.debug("Process: " + str(p)) pm = proc_map[p] # Auto-assign AsyncProtocol if none was assigned if not pm.implements_protocol: proto = AsyncProtocol + log.debug("Protocol: AsyncProtocol") else: proto = pm.implements_protocol + log.debug("Protocol: " + proto.__name__) # Check that SyncProtocols of process model and sync domain # are compatible @@ -460,8 +484,10 @@ def _create_sync_domains( # Auto-assign AsyncProtocol if none was assigned if not pm.implements_protocol: proto = AsyncProtocol + log.debug("Protocol: AsyncProtocol") else: proto = pm.implements_protocol + log.debug("Protocol: " + proto.__name__) # Add process to existing or new default sync domain if not part # of custom sync domain @@ -481,13 +507,16 @@ def _create_sync_domains( defaultdict(list) for node_cfg in node_cfgs: for node in node_cfg: + log.debug("Node: " + str(node.node_type.__name__)) node_to_sync_domain_dict[node].extend( [proc_to_domain_map[proc] for proc in node.processes]) return list(sync_domains.values()), node_to_sync_domain_dict # ToDo: (AW) Implement the general NodeConfig generation algorithm @staticmethod - def _create_node_cfgs(proc_map: PROC_MAP) -> ty.List[NodeConfig]: + def _create_node_cfgs(proc_map: PROC_MAP, + log: logging.getLoggerClass() + ) -> ty.List[NodeConfig]: """Creates and returns a list of NodeConfigs from the AbstractResource requirements of all process's ProcessModels where each NodeConfig is a set of Nodes that satisfies the resource @@ -562,10 +591,27 @@ def _create_node_cfgs(proc_map: PROC_MAP) -> ty.List[NodeConfig]: Finally, we are left with a list of (the best) legal NodeCfgs. """ procs = list(proc_map.keys()) + if log.level == logging.DEBUG: + for proc in procs: + log.debug("Proc Name: " + proc.name + " Proc: " + str(proc)) + proc_models = list(proc_map.items()) + for procm in proc_models: + log.debug("ProcModels: " + str(procm[1])) + n = Node(node_type=resources.HeadNode, processes=procs) ncfg = NodeConfig() ncfg.append(n) + # Until NodeConfig generation algorithm is present + # check if NcProcessModel is present in proc_map, + # if so add hardcoded Node for OheoGulch + for proc_model in proc_map.items(): + if issubclass(proc_model[1], NcProcessModel): + n1 = Node(node_type=resources.OheoGulch, processes=procs) + ncfg.append(n1) + log.debug("OheoGulch Node Added to NodeConfig: " + + str(n1.node_type)) + return [ncfg] @staticmethod @@ -671,26 +717,45 @@ def _create_channel_builders(self, proc_map: PROC_MAP) \ # ToDo: (AW) Fix type resolution issues @staticmethod def _create_runtime_service_as_py_process_model( - node_to_sync_domain_dict: ty.Dict[Node, ty.List[SyncDomain]]) \ + node_to_sync_domain_dict: ty.Dict[Node, ty.List[SyncDomain]], + log: logging.getLoggerClass() = logging.getLogger()) \ -> ty.Tuple[ ty.Dict[SyncDomain, AbstractRuntimeServiceBuilder], ty.Dict[int, int]]: rs_builders: ty.Dict[SyncDomain, AbstractRuntimeServiceBuilder] = {} proc_id_to_runtime_service_id_map: ty.Dict[int, int] = {} rs_id: int = 0 + loihi_version: LoihiVersion = LoihiVersion.N3 for node, sync_domains in node_to_sync_domain_dict.items(): sync_domain_set = set(sync_domains) for sync_domain in sync_domain_set: + if log.level == logging.DEBUG: + for resource in node.node_type.resources: + log.debug("node.node_type.resources: " + + resource.__name__) if NeuroCore in node.node_type.resources: rs_class = sync_domain.protocol.runtime_service[NeuroCore] + elif Loihi1NeuroCore in node.node_type.resources: + log.debug("sync_domain.protocol. " + + "runtime_service[Loihi1NeuroCore]") + rs_class = sync_domain.protocol. \ + runtime_service[Loihi1NeuroCore] + loihi_version: LoihiVersion = LoihiVersion.N2 + elif Loihi2NeuroCore in node.node_type.resources: + log.debug("sync_domain.protocol. " + + "runtime_service[Loihi2NeuroCore]") + rs_class = sync_domain.protocol. \ + runtime_service[Loihi2NeuroCore] else: rs_class = sync_domain.protocol.runtime_service[CPU] + log.debug("RuntimeService Class: " + str(rs_class.__name__)) model_ids: ty.List[int] = [p.id for p in sync_domain.processes] rs_builder = \ RuntimeServiceBuilder(rs_class=rs_class, protocol=sync_domain.protocol, runtime_service_id=rs_id, - model_ids=model_ids) + model_ids=model_ids, + loihi_version=loihi_version) rs_builders[sync_domain] = rs_builder for p in sync_domain.processes: proc_id_to_runtime_service_id_map[p.id] = rs_id @@ -767,8 +832,7 @@ def _create_exec_vars(self, elif issubclass(pm, AbstractCProcessModel): ev = exec_var.CExecVar(v, node_id, run_srv_id) elif issubclass(pm, AbstractNcProcessModel): - raise NotImplementedError( - "NcProcessModel not yet supported.") + ev = exec_var.PyExecVar(v, node_id, run_srv_id) else: raise NotImplementedError("Illegal ProcessModel type.") exec_vars[v.id] = ev @@ -992,11 +1056,11 @@ def compile(self, proc: AbstractProcess, run_cfg: RunConfig) -> Executable: exe = self._compile_proc_models(proc_groups) # 4. Create NodeConfigs (just pick one manually for now): - node_cfgs = self._create_node_cfgs(proc_map) + node_cfgs = self._create_node_cfgs(proc_map, self.log) # 5. Create SyncDomains sync_domains, node_to_sync_domain_dict = self._create_sync_domains( - proc_map, run_cfg, node_cfgs) + proc_map, run_cfg, node_cfgs, self.log) # 6. Create Channel builders channel_builders = self._create_channel_builders(proc_map) @@ -1004,7 +1068,7 @@ def compile(self, proc: AbstractProcess, run_cfg: RunConfig) -> Executable: # 7. Create Runtime Service builders runtime_service_builders, proc_id_to_runtime_service_id_map = \ self._create_runtime_service_as_py_process_model( - node_to_sync_domain_dict) + node_to_sync_domain_dict, self.log) # 8. Create ExecVars self._create_exec_vars(node_cfgs, diff --git a/src/lava/magma/core/model/interfaces.py b/src/lava/magma/core/model/interfaces.py index 4170f89a6..65200b949 100644 --- a/src/lava/magma/core/model/interfaces.py +++ b/src/lava/magma/core/model/interfaces.py @@ -32,3 +32,8 @@ def join(self): """Join all csp ports""" for csp_port in self.csp_ports: csp_port.join() + + +class AbstractNodeGroup: + def alloc(self, *args, **kwargs): + pass diff --git a/src/lava/magma/core/model/model.py b/src/lava/magma/core/model/model.py index 4b1baa1c7..713d5d8dc 100644 --- a/src/lava/magma/core/model/model.py +++ b/src/lava/magma/core/model/model.py @@ -4,6 +4,7 @@ from __future__ import annotations import typing as ty +import logging from abc import ABC if ty.TYPE_CHECKING: @@ -57,7 +58,11 @@ class level attributes with the same name if they exist. required_resources: ty.List[ty.Type[AbstractResource]] = [] tags: ty.List[str] = [] - def __init__(self, proc_params: ty.Dict[str, ty.Any]) -> None: + def __init__(self, + proc_params: ty.Dict[str, ty.Any], + loglevel: int = logging.WARNING) -> None: + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) self.proc_params: ty.Dict[str, ty.Any] = proc_params def __repr__(self): @@ -78,23 +83,3 @@ def __repr__(self): + " has tags " + tags ) - - # ToDo: (AW) Should AbstractProcessModel even have a run() method? What - # if a sub class like AbstractCProcessModel for a LMT does not even need - # a 'run'? - def run(self): - raise NotImplementedError("'run' method is not implemented.") - - def add_ports_for_polling(self): - raise NotImplementedError( - "'add_ports_for_polling' method is not implemented.") - - # ToDo: What does this function do here? The AbstractProcModel can't - # depend on one specific Python implementation of ports/channels. It can - # probably not even have a start function. Because for a CProcModel - # running on LMT there might not even be Python start function to call. - # Starting the ports is likely the RuntimeService's or Builder's job - # which is what makes a process run on a certain compute resource. - def start(self): - # Store the list of csp_ports. Start them here. - raise NotImplementedError diff --git a/src/lava/magma/core/model/nc/model.py b/src/lava/magma/core/model/nc/model.py index 8253c7cd5..f8d246790 100644 --- a/src/lava/magma/core/model/nc/model.py +++ b/src/lava/magma/core/model/nc/model.py @@ -1,17 +1,25 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause from abc import ABC, abstractmethod +import logging +import typing as ty +from lava.magma.core.model.interfaces import AbstractNodeGroup from lava.magma.core.model.model import AbstractProcessModel -# ToDo: Move somewhere else. Just created for typing -class AbstractNodeGroup: - def alloc(self, *args, **kwargs): - pass - - class Net(ABC): + """Represents a collection of logical entities (Attribute Groups) + that consume resources on a NeuroCore. + + * InputAxons + * Synapses + * DendriticAccumulator + * Compartments + * OutputAxons + * Synaptic pre traces + * Synaptic post traces + """ def __init__(self): self.out_ax = AbstractNodeGroup() self.cx = AbstractNodeGroup() @@ -29,12 +37,38 @@ def connect(self, from_thing, to_thing): class AbstractNcProcessModel(AbstractProcessModel, ABC): - """Abstract interface for a NeuroCore ProcessModels.""" + """Abstract interface for NeuroCore ProcessModels + + Example for how variables might be initialized: + u: np.ndarray = LavaNcType(np.ndarray, np.int32, precision=24) + v: np.ndarray = LavaNcType(np.ndarray, np.int32, precision=24) + bias: np.ndarray = LavaNcType(np.ndarray, np.int16, precision=12) + du: int = LavaNcType(int, np.uint16, precision=12) + """ + def __init__(self, + log: logging.getLoggerClass, + proc_params: ty.Dict[str, ty.Any], + loglevel: int = logging.WARNING) -> None: + super().__init__(proc_params, loglevel=loglevel) + self.model_id: ty.Optional[int] = None @abstractmethod def allocate(self, net: Net): """Allocates resources required by Process via Net provided by compiler. - Note: This should work as before. """ pass + + +class NcProcessModel(AbstractNcProcessModel): + def __init__(self, + proc_params: ty.Dict[str, ty.Any], + loglevel: int = logging.WARNING): + super(AbstractNcProcessModel, self).__init__(proc_params, + loglevel=loglevel) + + def allocate(self, net: Net): + pass + + def run(self): + pass diff --git a/src/lava/magma/core/model/nc/type.py b/src/lava/magma/core/model/nc/type.py new file mode 100644 index 000000000..68b5ff581 --- /dev/null +++ b/src/lava/magma/core/model/nc/type.py @@ -0,0 +1,9 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +from dataclasses import dataclass + + +@dataclass +class LavaNcType: + d_type: str + precision: int = None # If None, infinite precision is assumed diff --git a/src/lava/magma/core/model/py/model.py b/src/lava/magma/core/model/py/model.py index c9eee6753..6381ac6b3 100644 --- a/src/lava/magma/core/model/py/model.py +++ b/src/lava/magma/core/model/py/model.py @@ -4,6 +4,8 @@ import typing as ty from abc import ABC, abstractmethod +import logging + import numpy as np from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort, \ @@ -29,8 +31,10 @@ class AbstractPyProcessModel(AbstractProcessModel, ABC): du: int = LavaPyType(int, np.uint16, precision=12) """ - def __init__(self, proc_params: ty.Dict[str, ty.Any]) -> None: - super().__init__(proc_params) + def __init__(self, + proc_params: ty.Dict[str, ty.Any], + loglevel: int = logging.WARNING) -> None: + super().__init__(proc_params=proc_params, loglevel=loglevel) self.model_id: ty.Optional[int] = None self.service_to_process: ty.Optional[CspRecvPort] = None self.process_to_service: ty.Optional[CspSendPort] = None diff --git a/src/lava/magma/core/process/process.py b/src/lava/magma/core/process/process.py index 33405a3e0..5a330ad40 100644 --- a/src/lava/magma/core/process/process.py +++ b/src/lava/magma/core/process/process.py @@ -1,6 +1,7 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import logging import typing as ty from _collections import OrderedDict from lava.magma.compiler.executable import Executable @@ -227,6 +228,31 @@ def __init__(self, **kwargs): self.name: str = kwargs.pop("name", f"Process_{self.id}") + # Setup Logging + self.loglevel: int = kwargs.pop("loglevel", logging.WARNING) + self.loglevelconsole: int = kwargs.pop("loglevelconsole", logging.ERROR) + self.logfile: str = kwargs.pop("logfile", "lava.log") + self.logfileenable: bool = bool(kwargs.pop("logfileenable", False)) + self.log = logging.getLogger() + + formatter = logging.Formatter( + '%(asctime)s:%(levelname)s: %(name)s - %(message)s', + datefmt='%m/%d/%Y %I:%M:%S%p') + + console_handler = logging.StreamHandler() + console_handler.setLevel(self.loglevelconsole) + console_handler.setFormatter(formatter) + + if self.logfileenable: + logging.basicConfig( + filename=self.logfile, + level=self.loglevel, + format='%(asctime)s:%(levelname)s: %(name)s - %(message)s', + datefmt='%m/%d/%Y %I:%M:%S%p' + ) + + self.log.addHandler(console_handler) + # kwargs will be used for ProcessModel initialization later self.init_args: dict = kwargs @@ -343,7 +369,7 @@ def compile(self, run_cfg: RunConfig) -> Executable: ProcessModel for each compiled process. """ from lava.magma.compiler.compiler import Compiler - compiler = Compiler() + compiler = Compiler(loglevel=self.loglevel) return compiler.compile(self, run_cfg) def save(self, path: str): @@ -391,7 +417,8 @@ def run(self, if not self._runtime: executable = self.compile(run_cfg) self._runtime = Runtime(executable, - ActorType.MultiProcessing) + ActorType.MultiProcessing, + loglevel=self.loglevel) self._runtime.initialize() self._runtime.start(condition) diff --git a/src/lava/magma/core/resources.py b/src/lava/magma/core/resources.py index 84bc82854..c2e0cbb5e 100644 --- a/src/lava/magma/core/resources.py +++ b/src/lava/magma/core/resources.py @@ -114,3 +114,7 @@ class KapohoPoint(Loihi2System): class Unalaska(Loihi2System): resources = [CPU, Loihi2NeuroCore, LMT, PB] + + +class OheoGulch(Loihi2System): + resources = [Loihi2NeuroCore, LMT, PB] diff --git a/src/lava/magma/core/run_configs.py b/src/lava/magma/core/run_configs.py index 2e4d10fa6..9be4d35fa 100644 --- a/src/lava/magma/core/run_configs.py +++ b/src/lava/magma/core/run_configs.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ from __future__ import annotations +import logging import typing as ty from abc import ABC @@ -42,7 +43,10 @@ class RunConfig(ABC): """ def __init__(self, - custom_sync_domains: ty.Optional[ty.List[SyncDomain]] = None): + custom_sync_domains: ty.Optional[ty.List[SyncDomain]] = None, + loglevel: int = logging.WARNING): + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) self.custom_sync_domains = [] if custom_sync_domains: if not isinstance(custom_sync_domains, list): @@ -122,8 +126,10 @@ def __init__(self, select_sub_proc_model: ty.Optional[bool] = False, exception_proc_model_map: ty.Optional[ty.Dict[ ty.Type[AbstractProcess], ty.Type[ - AbstractProcessModel]]] = None): - super().__init__(custom_sync_domains=custom_sync_domains) + AbstractProcessModel]]] = None, + loglevel: int = logging.WARNING): + super().__init__(custom_sync_domains=custom_sync_domains, + loglevel=loglevel) self.select_tag = select_tag self.select_sub_proc_model = select_sub_proc_model self.exception_proc_model_map = exception_proc_model_map @@ -240,11 +246,11 @@ def _ispypm(pm: ty.Type[AbstractProcessModel]) -> bool: # Assumption: User doesn't care about tags. We return the first # SubProcessModel found if self.select_tag is None: - print(f"[{self.__class__.__qualname__}]: Using the first " - f"SubProcessModel " - f"{proc_models[sub_pm_idxs[0]].__qualname__} " - f"available for Process " - f"{proc.name}::{proc.__class__.__qualname__}.") + self.log.info(f"[{self.__class__.__qualname__}]: Using the" + f" first SubProcessModel " + f"{proc_models[sub_pm_idxs[0]].__qualname__} " + f"available for Process " + f"{proc.name}::{proc.__class__.__qualname__}.") return proc_models[sub_pm_idxs[0]] # Case 3a(iii): User asked for a specific tag: # ------------------------------------------- @@ -276,11 +282,11 @@ def _ispypm(pm: ty.Type[AbstractProcessModel]) -> bool: # Assumption: User doesn't care about tags. We return the first # PyProcessModel found if self.select_tag is None: - print(f"[{self.__class__.__qualname__}]: Using the first " - f"PyProcessModel " - f"{proc_models[py_pm_idxs[0]].__qualname__} " - f"available for Process " - f"{proc.name}::{proc.__class__.__qualname__}.") + self.log.info(f"[{self.__class__.__qualname__}]: Using the first " + f"PyProcessModel " + f"{proc_models[py_pm_idxs[0]].__qualname__} " + f"available for Process " + f"{proc.name}::{proc.__class__.__qualname__}.") return proc_models[py_pm_idxs[0]] # Case 3b(ii): User asked for a specific tag: # ------------------------------------------ diff --git a/src/lava/magma/core/sync/protocols/async_protocol.py b/src/lava/magma/core/sync/protocols/async_protocol.py index 36c207e56..b7fe85170 100644 --- a/src/lava/magma/core/sync/protocols/async_protocol.py +++ b/src/lava/magma/core/sync/protocols/async_protocol.py @@ -5,7 +5,8 @@ from lava.magma.core.resources import CPU from lava.magma.core.sync.protocol import AbstractSyncProtocol -from lava.magma.runtime.runtime_service import AsyncPyRuntimeService +from lava.magma.runtime.runtime_services.runtime_service import \ + AsyncPyRuntimeService @dataclass diff --git a/src/lava/magma/core/sync/protocols/loihi_protocol.py b/src/lava/magma/core/sync/protocols/loihi_protocol.py index 3a7cf147b..cf3017a10 100644 --- a/src/lava/magma/core/sync/protocols/loihi_protocol.py +++ b/src/lava/magma/core/sync/protocols/loihi_protocol.py @@ -1,12 +1,21 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ from collections import namedtuple from dataclasses import dataclass -from lava.magma.core.resources import CPU, NeuroCore +from lava.magma.core.resources import ( + CPU, + NeuroCore, + Loihi1NeuroCore, + Loihi2NeuroCore +) from lava.magma.core.sync.protocol import AbstractSyncProtocol from lava.magma.runtime.mgmt_token_enums import enum_to_np -from lava.magma.runtime.runtime_service import ( +from lava.magma.runtime.runtime_services.runtime_service import ( LoihiPyRuntimeService, LoihiCRuntimeService, + NxSdkRuntimeService ) Proc_Function_With_Guard = namedtuple("Proc_Function_With_Guard", "guard func") @@ -23,7 +32,8 @@ class Phase: @dataclass class LoihiProtocol(AbstractSyncProtocol): # The phases of Loihi protocol - phases = [Phase.SPK, Phase.PRE_MGMT, Phase.LRN, Phase.POST_MGMT, Phase.HOST] + phases = [Phase.SPK, Phase.PRE_MGMT, + Phase.LRN, Phase.POST_MGMT, Phase.HOST] # Methods that processes implementing protocol may provide proc_functions = [ Proc_Function_With_Guard("pre_guard", "run_pre_mgmt"), @@ -39,4 +49,7 @@ class LoihiProtocol(AbstractSyncProtocol): @property def runtime_service(self): - return {CPU: LoihiPyRuntimeService, NeuroCore: LoihiCRuntimeService} + return {CPU: LoihiPyRuntimeService, + NeuroCore: LoihiCRuntimeService, + Loihi1NeuroCore: NxSdkRuntimeService, + Loihi2NeuroCore: NxSdkRuntimeService} diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index ace6e6f97..7804fd195 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -3,11 +3,14 @@ # See: https://spdx.org/licenses/ from __future__ import annotations +import logging + +import numpy as np + import sys import typing import typing as ty -import numpy as np from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort from lava.magma.compiler.exec_var import AbstractExecVar @@ -19,7 +22,8 @@ import MessageInfrastructureInterface from lava.magma.runtime.mgmt_token_enums import enum_to_np, enum_equal, \ MGMT_COMMAND, MGMT_RESPONSE -from lava.magma.runtime.runtime_service import AsyncPyRuntimeService +from lava.magma.runtime.runtime_services.runtime_service \ + import AsyncPyRuntimeService if ty.TYPE_CHECKING: from lava.magma.core.process.process import AbstractProcess @@ -96,7 +100,10 @@ class Runtime: def __init__(self, exe: Executable, - message_infrastructure_type: ActorType): + message_infrastructure_type: ActorType, + loglevel: int = logging.WARNING): + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) self._run_cond: typing.Optional[AbstractRunCondition] = None self._executable: Executable = exe @@ -119,19 +126,10 @@ def __del__(self): if self._is_started: self.stop() - def initialize(self): + def initialize(self, node_cfg_idx: int = 0): """Initializes the runtime""" - # Right now assume there is only 1 node config - node_configs: ty.List[NodeConfig] = self._executable.node_configs - if len(node_configs) != 1: - raise AssertionError + node_config: NodeConfig = self.node_cfg[node_cfg_idx] - node_config: NodeConfig = node_configs[0] - - # Right now assume there is only 1 node in node_config with resource - # type CPU - if len(node_config) != 1: - raise AssertionError if node_config[0].node_type != HeadNode: raise AssertionError @@ -151,11 +149,10 @@ def _start_ports(self): for port in self.service_to_runtime: port.start() - # ToDo: (AW) Hack: This currently just returns the one and only NodeCfg @property - def node_cfg(self) -> NodeConfig: + def node_cfg(self) -> ty.List[NodeConfig]: """Returns the selected NodeCfg.""" - return self._executable.node_configs[0] + return self._executable.node_configs def _build_message_infrastructure(self): """Create the Messaging Infrastructure Backend given the @@ -221,15 +218,17 @@ def _build_sync_channels(self): sync_channel_builder.src_process.set_csp_proc_ports( [channel.src_port]) self._get_process_builder_for_process( - sync_channel_builder.dst_process).set_rs_csp_ports( - [channel.dst_port]) + sync_channel_builder.dst_process) \ + .set_rs_csp_ports([channel.dst_port]) else: sync_channel_builder.dst_process.set_csp_proc_ports( [channel.dst_port]) self._get_process_builder_for_process( - sync_channel_builder.src_process).set_rs_csp_ports( - [channel.src_port]) + sync_channel_builder.src_process) \ + .set_rs_csp_ports([channel.src_port]) else: + self.log.info( + sync_channel_builder.dst_process.__class__.__name__) raise ValueError("Unexpected type of Sync Channel Builder") # ToDo: (AW) Why not pass the builder as an argument to the mp.Process @@ -281,7 +280,7 @@ def _get_resp_for_run(self): actors.join() if actors.exception: _, traceback = actors.exception - print(traceback) + self.log.info(traceback) error_cnt += 1 raise RuntimeError( f"{error_cnt} Exception(s) occurred. See " @@ -308,7 +307,7 @@ def start(self, run_condition: AbstractRunCondition): self._is_started = True self._run(run_condition) else: - print("Runtime not initialized yet.") + self.log.info("Runtime not initialized yet.") def _run(self, run_condition: AbstractRunCondition): """ @@ -333,7 +332,7 @@ def _run(self, run_condition: AbstractRunCondition): raise ValueError(f"Wrong type of run_condition : " f"{run_condition.__class__}") else: - print("Runtime not started yet.") + self.log.info("Runtime not started yet.") def wait(self): """Waits for existing run to end. This is helpful if the execution @@ -356,7 +355,7 @@ def pause(self): actors.join() if actors.exception: _, traceback = actors.exception - print(traceback) + self.log.info(traceback) error_cnt += 1 self.stop() raise RuntimeError( @@ -379,7 +378,7 @@ def stop(self): self._is_started = False # Send messages to RuntimeServices to stop as soon as possible. else: - print("Runtime not started yet.") + self.log.info("Runtime not started yet.") finally: self._messaging_infrastructure.stop() @@ -393,7 +392,8 @@ def join(self): def set_var(self, var_id: int, value: np.ndarray, idx: np.ndarray = None): """Sets value of a variable with id 'var_id'.""" if self._is_running: - print("WARNING: Cannot Set a Var when the execution is going on") + self.log.info( + "WARNING: Cannot Set a Var when the execution is going on") return node_config: NodeConfig = self._executable.node_configs[0] ev: AbstractExecVar = node_config.exec_vars[var_id] @@ -439,7 +439,8 @@ def set_var(self, var_id: int, value: np.ndarray, idx: np.ndarray = None): def get_var(self, var_id: int, idx: np.ndarray = None) -> np.ndarray: """Gets value of a variable with id 'var_id'.""" if self._is_running: - print("WARNING: Cannot Get a Var when the execution is going on") + self.log.info( + "WARNING: Cannot Get a Var when the execution is going on") return node_config: NodeConfig = self._executable.node_configs[0] ev: AbstractExecVar = node_config.exec_vars[var_id] diff --git a/src/lava/magma/runtime/runtime_services/enums.py b/src/lava/magma/runtime/runtime_services/enums.py new file mode 100644 index 000000000..9c3c5dda5 --- /dev/null +++ b/src/lava/magma/runtime/runtime_services/enums.py @@ -0,0 +1,44 @@ +# Copyright (C) 2021 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ +from enum import IntEnum +from lava.magma.runtime.mgmt_token_enums import enum_to_np + + +class LoihiVersion(IntEnum): + """Enumerator of different Loihi Versions.""" + N2 = 2 + N3 = 3 + + +class LoihiPhase: + """Enumerator of Lava Loihi phases""" + SPK = enum_to_np(1) + PRE_MGMT = enum_to_np(2) + LRN = enum_to_np(3) + POST_MGMT = enum_to_np(4) + HOST = enum_to_np(5) + + +class NxSdkPhase: + """Enumerator phases in which snip can run in NxSDK.""" + + EMBEDDED_INIT = 1 + """INIT Phase of Embedded Snip. This executes only once.""" + EMBEDDED_SPIKING = 2 + """SPIKING Phase of Embedded Snip.""" + EMBEDDED_PRELEARN_MGMT = 3 + """Pre-Learn Management Phase of Embedded Snip.""" + EMBEDDED_MGMT = 4 + """Management Phase of Embedded Snip.""" + HOST_PRE_EXECUTION = 5 + """Host Pre Execution Phase for Host Snip.""" + HOST_POST_EXECUTION = 6 + """Host Post Execution Phase for Host Snip.""" + HOST_CONCURRENT_EXECUTION = 7 + """Concurrent Execution for Host Snip.""" + EMBEDDED_USER_CMD = 8 + """Any User Command to execute during embedded execution. + (Internal Use Only)""" + EMBEDDED_REMOTE_MGMT = 9 + """A management phase snip triggered remotely""" diff --git a/src/lava/magma/runtime/runtime_services/interfaces.py b/src/lava/magma/runtime/runtime_services/interfaces.py new file mode 100644 index 000000000..cc86542a0 --- /dev/null +++ b/src/lava/magma/runtime/runtime_services/interfaces.py @@ -0,0 +1,41 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ +import typing as ty +from abc import ABC, abstractmethod + +from lava.magma.compiler.channels.pypychannel import ( + CspRecvPort, + CspSendPort +) +from lava.magma.core.sync.protocol import AbstractSyncProtocol + + +class AbstractRuntimeService(ABC): + def __init__(self, protocol): + self.protocol: ty.Optional[AbstractSyncProtocol] = protocol + + self.runtime_service_id: ty.Optional[int] = None + + self.runtime_to_service: ty.Optional[CspRecvPort] = None + self.service_to_runtime: ty.Optional[CspSendPort] = None + + self.model_ids: ty.List[int] = [] + + def __repr__(self): + return f"Synchronizer : {self.__class__}, \ + RuntimeServiceId : {self.runtime_service_id}, \ + Protocol: {self.protocol}" + + def start(self): + self.runtime_to_service.start() + self.service_to_runtime.start() + self.run() + + @abstractmethod + def run(self): + pass + + def join(self): + self.runtime_to_service.join() + self.service_to_runtime.join() diff --git a/src/lava/magma/runtime/runtime_services/runtime_service.py b/src/lava/magma/runtime/runtime_services/runtime_service.py new file mode 100644 index 000000000..00e6065dd --- /dev/null +++ b/src/lava/magma/runtime/runtime_services/runtime_service.py @@ -0,0 +1,590 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ +from abc import abstractmethod +import logging +import typing as ty + +import numpy as np + +from lava.magma.compiler.channels.pypychannel import ( + CspSelector, + CspRecvPort, + CspSendPort +) +from lava.magma.core.sync.protocol import AbstractSyncProtocol +from lava.magma.runtime.mgmt_token_enums import ( + enum_to_np, + enum_equal, + MGMT_RESPONSE, + MGMT_COMMAND, +) +from lava.magma.runtime.runtime_services.enums import ( + LoihiPhase, + LoihiVersion +) +from lava.magma.runtime.runtime_services.interfaces import \ + AbstractRuntimeService + +try: + from nxsdk.arch.base.nxboard import NxBoard +except(ImportError): + class NxBoard(): + pass + +"""The RuntimeService interface is responsible for +coordinating the execution of a group of process models belonging to a common +synchronization domain. The domain will follow a SyncProtocol or will be +asynchronous. The processes and their corresponding process models are +selected by the Runtime dependent on the RunConfiguration assigned at the +start of execution. For each group of processes which follow the same +protocol and execute on the same node, the Runtime creates a RuntimeService. +Each RuntimeService coordinates all actions and commands from the Runtime, + transmitting them to the the processes under it's managment and +returning action and command responses back to Runtime. + +RuntimeService Types: + +1. PyRuntimeService: (Abstract Class) Coordinates process models executing on + the CPU and written in Python. + Concrete Implementations: + a. LoihiPyRuntimeService: Coordinates process models executing on + the CPU and written in Python and following the LoihiProtocol. + b. AsyncPyRuntimeService: Coordinates process models executing on + the CPU and written in Python and following the AsyncProtocol. + +2. CRuntimeService: (Abstract Class) Coordinates/Manages process models + executing on the CPU/Embedded CPU and written in C + Concrete Implementations: + a. LoihiCRuntimeService: Coordinates process models executing on + the CPU/Embedded CPU and written in C and following the LoihiProtocol. +3. NcRuntimeService: (Abstract Class) Coordinates/Manages process models + executing on a Loihi NeuroCore. + Concrete Implementations: + a. NxSdkRuntimeService: Coordinates process models executing on a Loihi + NeuroCore and written in Python following the LoihiProtocol. +""" + + +class PyRuntimeService(AbstractRuntimeService): + """Abstract RuntimeService for Python, it provides base methods + for start and run. It is not meant to instantiated directly + but used by inheritance + """ + + def __init__(self, + protocol: ty.Type[AbstractSyncProtocol], + loglevel: int = logging.WARNING,): + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) + super(PyRuntimeService, self).__init__( + protocol=protocol + ) + self.service_to_process: ty.Iterable[CspSendPort] = [] + self.process_to_service: ty.Iterable[CspRecvPort] = [] + + def start(self): + """Start the necessary channels to coordinate with runtime and group + of processes this RuntimeService is managing""" + self.runtime_to_service.start() + self.service_to_runtime.start() + for i in range(len(self.service_to_process)): + self.service_to_process[i].start() + self.process_to_service[i].start() + self.run() + + @abstractmethod + def run(self): + """Override this method to implement the runtime service. The run + method is invoked upon start which called when the execution is + started by the runtime.""" + pass + + def join(self): + """Stop the necessary channels to coordinate with runtime and group + of processes this RuntimeService is managing""" + self.runtime_to_service.join() + self.service_to_runtime.join() + + for i in range(len(self.service_to_process)): + self.service_to_process[i].join() + self.process_to_service[i].join() + + +class CRuntimeService(AbstractRuntimeService): + pass + + +class NcRuntimeService(AbstractRuntimeService): + pass + + +class LoihiPyRuntimeService(PyRuntimeService): + """RuntimeService that implements Loihi SyncProtocol in Python.""" + + def __init__(self, protocol): + super().__init__(protocol) + self.req_pre_lrn_mgmt = False + self.req_post_lrn_mgmt = False + self.req_lrn = False + self.req_stop = False + self.req_pause = False + self.paused = False + self._error = False + + class Phase: + SPK = enum_to_np(1) + PRE_MGMT = enum_to_np(2) + LRN = enum_to_np(3) + POST_MGMT = enum_to_np(4) + HOST = enum_to_np(5) + + class PMResponse: + STATUS_DONE = enum_to_np(0) + """Signfies Ack or Finished with the Command""" + STATUS_TERMINATED = enum_to_np(-1) + """Signifies Termination""" + STATUS_ERROR = enum_to_np(-2) + """Signifies Error raised""" + STATUS_PAUSED = enum_to_np(-3) + """Signifies Execution State to be Paused""" + REQ_PRE_LRN_MGMT = enum_to_np(-4) + """Signifies Request of PREMPTION""" + REQ_LEARNING = enum_to_np(-5) + """Signifies Request of LEARNING""" + REQ_POST_LRN_MGMT = enum_to_np(-6) + """Signifies Request of PREMPTION""" + REQ_PAUSE = enum_to_np(-7) + """Signifies Request of PAUSE""" + REQ_STOP = enum_to_np(-8) + """Signifies Request of STOP""" + + def _next_phase(self, is_last_time_step: bool): + """Advances the current phase to the next phase. + On the first time step it starts with HOST phase and advances to SPK. + Afterwards it loops: SPK -> PRE_MGMT -> LRN -> POST_MGMT -> SPK + On the last time step POST_MGMT advances to HOST phase.""" + if self.req_pre_lrn_mgmt: + self.req_pre_lrn_mgmt = False + return LoihiPyRuntimeService.Phase.PRE_MGMT + if self.req_post_lrn_mgmt: + self.req_post_lrn_mgmt = False + return LoihiPyRuntimeService.Phase.POST_MGMT + if self.req_lrn: + self.req_lrn = False + return LoihiPyRuntimeService.Phase.LRN + if self.req_pause: + self.req_pause = False + return MGMT_COMMAND.PAUSE + if self.req_stop: + self.req_stop = False + return MGMT_COMMAND.STOP + + if is_last_time_step: + return LoihiPyRuntimeService.Phase.HOST + return LoihiPyRuntimeService.Phase.SPK + + def _send_pm_cmd(self, phase: MGMT_COMMAND): + """Sends a command (phase information) to all ProcessModels.""" + for send_port in self.service_to_process: + send_port.send(phase) + + def _send_pm_req_given_model_id(self, model_id: int, *requests): + """Sends requests to a ProcessModel given by the model id.""" + process_idx = self.model_ids.index(model_id) + req_port = self.service_to_process[process_idx] + for request in requests: + req_port.send(request) + + def _get_pm_resp(self) -> ty.Iterable[MGMT_RESPONSE]: + """Retrieves responses of all ProcessModels.""" + rcv_msgs = [] + num_responses_expected = len(self.model_ids) + counter = 0 + while counter < num_responses_expected: + ptos_recv_port = self.process_to_service[counter] + rcv_msgs.append(ptos_recv_port.recv()) + counter += 1 + for idx, recv_msg in enumerate(rcv_msgs): + if enum_equal(recv_msg, + LoihiPyRuntimeService.PMResponse.STATUS_ERROR): + self._error = True + if enum_equal(recv_msg, + LoihiPyRuntimeService.PMResponse.REQ_PRE_LRN_MGMT): + self.req_pre_lrn_mgmt = True + if enum_equal(recv_msg, + LoihiPyRuntimeService.PMResponse.REQ_POST_LRN_MGMT): + self.req_post_lrn_mgmt = True + if enum_equal(recv_msg, + LoihiPyRuntimeService.PMResponse.REQ_LEARNING): + self.req_lrn = True + if enum_equal(recv_msg, + LoihiPyRuntimeService.PMResponse.REQ_PAUSE): + # ToDo: Add some mechanism to get the exact process id + self.log.info(f"Process : {idx} has requested Pause") + self.req_pause = True + if enum_equal(recv_msg, + LoihiPyRuntimeService.PMResponse.REQ_STOP): + # ToDo: Add some mechanism to get the exact process id + self.log.info(f"Process : {idx} has requested Stop") + self.req_stop = True + return rcv_msgs + + def _relay_to_runtime_data_given_model_id(self, model_id: int): + """Relays data received from ProcessModel given by model id to the + runtime""" + process_idx = self.model_ids.index(model_id) + data_recv_port = self.process_to_service[process_idx] + data_relay_port = self.service_to_runtime + num_items = data_recv_port.recv() + data_relay_port.send(num_items) + for i in range(int(num_items[0])): + value = data_recv_port.recv() + data_relay_port.send(value) + + def _relay_to_pm_data_given_model_id(self, model_id: int) -> MGMT_RESPONSE: + """Relays data received from the runtime to the ProcessModel given by + the model id.""" + process_idx = self.model_ids.index(model_id) + data_recv_port = self.runtime_to_service + data_relay_port = self.service_to_process[process_idx] + resp_port = self.process_to_service[process_idx] + # Receive and relay number of items + num_items = data_recv_port.recv() + data_relay_port.send(num_items) + # Receive and relay data1, data2, ... + for i in range(int(num_items[0].item())): + data_relay_port.send(data_recv_port.recv()) + rsp = resp_port.recv() + return rsp + + def _relay_pm_ack_given_model_id(self, model_id: int): + """Relays ack received from ProcessModel given by model id to the + runtime.""" + process_idx = self.model_ids.index(model_id) + + ack_recv_port = self.process_to_service[process_idx] + ack_relay_port = self.service_to_runtime + ack_relay_port.send(ack_recv_port.recv()) + + def _handle_pause(self): + # Inform all ProcessModels about the PAUSE command + self._send_pm_cmd(MGMT_COMMAND.PAUSE) + rsps = self._get_pm_resp() + for rsp in rsps: + if not enum_equal(rsp, + LoihiPyRuntimeService.PMResponse.STATUS_PAUSED): + raise ValueError(f"Wrong Response Received : {rsp}") + # Inform the runtime about successful pausing + self.service_to_runtime.send(MGMT_RESPONSE.PAUSED) + + def _handle_stop(self): + # Inform all ProcessModels about the STOP command + self._send_pm_cmd(MGMT_COMMAND.STOP) + rsps = self._get_pm_resp() + for rsp in rsps: + if not enum_equal(rsp, + LoihiPyRuntimeService.PMResponse.STATUS_TERMINATED + ): + raise ValueError(f"Wrong Response Received : {rsp}") + # Inform the runtime about successful termination + self.service_to_runtime.send(MGMT_RESPONSE.TERMINATED) + self.join() + + def run(self): + """Retrieves commands from the runtime. On STOP or PAUSE commands all + ProcessModels are notified and expected to TERMINATE or PAUSE, + respectively. Otherwise the number of time steps is received as command. + In this case iterate through the phases of the Loihi protocol until the + last time step is reached. The runtime is informed after the last time + step. The loop ends when receiving the STOP command from the runtime.""" + selector = CspSelector() + phase = LoihiPhase.HOST + + channel_actions = [(self.runtime_to_service, lambda: 'cmd')] + + while True: + # Probe if there is a new command from the runtime + action = selector.select(*channel_actions) + if action == 'cmd': + command = self.runtime_to_service.recv() + if enum_equal(command, MGMT_COMMAND.STOP): + self._handle_stop() + return + elif enum_equal(command, MGMT_COMMAND.PAUSE): + self._handle_pause() + self.paused = True + elif enum_equal(command, MGMT_COMMAND.GET_DATA) or \ + enum_equal(command, MGMT_COMMAND.SET_DATA): + self._handle_get_set(phase, command) + else: + self.paused = False + # The number of time steps was received ("command") + # Start iterating through Loihi phases + curr_time_step = 0 + phase = LoihiPhase.HOST + while True: + # Check if it is the last time step + is_last_ts = enum_equal(enum_to_np(curr_time_step), + command) + # Advance to the next phase + phase = self._next_phase(is_last_ts) + if enum_equal(phase, MGMT_COMMAND.STOP): + self.service_to_runtime.send( + MGMT_RESPONSE.REQ_STOP) + break + if enum_equal(phase, MGMT_COMMAND.PAUSE): + self.service_to_runtime.send( + MGMT_RESPONSE.REQ_PAUSE) + break + # Increase time step if spiking phase + if enum_equal(phase, LoihiPhase.SPK): + curr_time_step += 1 + # Inform ProcessModels about current phase + self._send_pm_cmd(phase) + # ProcessModels respond with DONE if not HOST phase + if not enum_equal( + phase, LoihiPyRuntimeService.Phase.HOST): + self._get_pm_resp() + if self._error: + # Forward error to runtime + self.service_to_runtime.send( + MGMT_RESPONSE.ERROR) + # stop all other pm + self._send_pm_cmd(MGMT_COMMAND.STOP) + return + # Check if pause or stop received from Runtime + # TODO: Do we actualy need to wait for PMs to be in + # HOST or MGMT phase to stop or pause them? + if self.runtime_to_service.probe(): + cmd = self.runtime_to_service.peek() + if enum_equal(cmd, MGMT_COMMAND.STOP): + self.runtime_to_service.recv() + self._handle_stop() + return + if enum_equal(cmd, MGMT_COMMAND.PAUSE): + self.runtime_to_service.recv() + self._handle_pause() + self.paused = True + break + + # If HOST phase (last time step ended) break the loop + if enum_equal( + phase, LoihiPhase.HOST): + break + if self.paused or enum_equal(phase, MGMT_COMMAND.STOP) or \ + enum_equal(phase, MGMT_COMMAND.PAUSE): + continue + # Inform the runtime that last time step was reached + self.service_to_runtime.send(MGMT_RESPONSE.DONE) + else: + self.service_to_runtime.send(MGMT_RESPONSE.ERROR) + + def _handle_get_set(self, phase, command): + if enum_equal(phase, LoihiPhase.HOST): + if enum_equal(command, MGMT_COMMAND.GET_DATA): + requests: ty.List[np.ndarray] = [command] + # recv model_id + model_id: int = int(self.runtime_to_service.recv()[0].item()) + # recv var_id + requests.append(self.runtime_to_service.recv()) + self._send_pm_req_given_model_id(model_id, *requests) + self._relay_to_runtime_data_given_model_id(model_id) + elif enum_equal(command, MGMT_COMMAND.SET_DATA): + requests: ty.List[np.ndarray] = [command] + # recv model_id + model_id: int = int(self.runtime_to_service.recv()[0].item()) + # recv var_id + requests.append(self.runtime_to_service.recv()) + self._send_pm_req_given_model_id(model_id, *requests) + rsp = self._relay_to_pm_data_given_model_id(model_id) + self.service_to_runtime.send(rsp) + else: + raise RuntimeError(f"Unknown request {command}") + + +class LoihiCRuntimeService(AbstractRuntimeService): + """RuntimeService that implements Loihi SyncProtocol in C.""" + pass + + +class AsyncPyRuntimeService(PyRuntimeService): + """RuntimeService that implements Async SyncProtocol in Py.""" + + def __init__(self, protocol): + super().__init__(protocol) + self.req_stop = False + self.req_pause = False + self._error = False + + class PMResponse: + STATUS_DONE = enum_to_np(0) + """Signfies Ack or Finished with the Command""" + STATUS_TERMINATED = enum_to_np(-1) + """Signifies Termination""" + STATUS_ERROR = enum_to_np(-2) + """Signifies Error raised""" + STATUS_PAUSED = enum_to_np(-3) + """Signifies Execution State to be Paused""" + REQ_PAUSE = enum_to_np(-4) + """Signifies Request of PAUSE""" + REQ_STOP = enum_to_np(-5) + """Signifies Request of STOP""" + + def _send_pm_cmd(self, cmd: MGMT_COMMAND): + for stop_send_port in self.service_to_process: + stop_send_port.send(cmd) + + def _get_pm_resp(self) -> ty.Iterable[MGMT_RESPONSE]: + rcv_msgs = [] + for ptos_recv_port in self.process_to_service: + rcv_msgs.append(ptos_recv_port.recv()) + return rcv_msgs + + def _handle_pause(self): + # Inform the runtime about successful pausing + self.service_to_runtime.send(MGMT_RESPONSE.PAUSED) + + def _handle_stop(self): + self._send_pm_cmd(MGMT_COMMAND.STOP) + rsps = self._get_pm_resp() + for rsp in rsps: + if not enum_equal(rsp, + LoihiPyRuntimeService.PMResponse.STATUS_TERMINATED + ): + self.service_to_runtime.send(MGMT_RESPONSE.ERROR) + raise ValueError(f"Wrong Response Received : {rsp}") + # Inform the runtime about successful termination + self.service_to_runtime.send(MGMT_RESPONSE.TERMINATED) + self.join() + + def run(self): + """Retrieves commands from the runtime and relays them to the process + models. Also send the acknowledgement back to runtime.""" + selector = CspSelector() + channel_actions = [(self.runtime_to_service, lambda: 'cmd')] + while True: + # Probe if there is a new command from the runtime + action = selector.select(*channel_actions) + channel_actions = [] + if action == 'cmd': + command = self.runtime_to_service.recv() + if enum_equal(command, MGMT_COMMAND.STOP): + self._handle_stop() + return + elif enum_equal(command, MGMT_COMMAND.PAUSE): + self._handle_pause() + else: + self._send_pm_cmd(MGMT_COMMAND.RUN) + for ptos_recv_port in self.process_to_service: + channel_actions.append((ptos_recv_port, + lambda: 'resp')) + elif action == 'resp': + resps = self._get_pm_resp() + for resp in resps: + if enum_equal(resp, + AsyncPyRuntimeService.PMResponse.REQ_PAUSE): + self.req_pause = True + if enum_equal(resp, + AsyncPyRuntimeService.PMResponse.REQ_STOP): + self.req_stop = True + if enum_equal(resp, + AsyncPyRuntimeService.PMResponse.STATUS_ERROR + ): + self._error = True + if self.req_stop: + self.service_to_runtime.send(MGMT_RESPONSE.REQ_STOP) + if self.req_pause: + self.service_to_runtime.send(MGMT_RESPONSE.REQ_PAUSE) + if self._error: + self.service_to_runtime.send(MGMT_RESPONSE.ERROR) + else: + self.service_to_runtime.send(MGMT_RESPONSE.ERROR) + raise ValueError(f"Wrong type of channel action : {action}") + channel_actions.append((self.runtime_to_service, lambda: 'cmd')) + + +class NxSdkRuntimeService(NcRuntimeService): + """The NxSdkRuntimeService uses NxCore to coordinate + communication and executinon on Loihi within a SyncDomain. + + Parameters + ---------- + protocol: ty.Type[AbstractSyncProtocol] + Communication protocol used by NxSdkRuntimeService + loihi_version: LoihiVersion + Version of Loihi Chip to use, N2 or N3 + loglevel: int + Log level to use for logging + """ + + def __init__(self, + protocol: ty.Type[AbstractSyncProtocol], + loihi_version: LoihiVersion = LoihiVersion.N3, + loglevel: int = logging.WARNING): + self.log = logging.getLogger(__name__) + self.log.setLevel(loglevel) + super(NxSdkRuntimeService, self).__init__( + protocol=protocol + ) + self.board: NxBoard = None + self.num_steps = 0 + + try: + if loihi_version == LoihiVersion.N3: + from nxsdk.arch.n3b.n3board import N3Board + # # TODO: Use dynamic set Board Init + self.board = N3Board(1, 1, [2], [[5, 5]]) + elif loihi_version == LoihiVersion.N2: + from nxsdk.arch.n2a.n2board import N2Board # noqa F401 + self.board = N2Board(1, 1, [2], [[5, 5]]) + else: + raise ValueError('Unsupported Loihi version ' + + 'used in board selection') + except(ImportError): + class NxBoard(): + pass + self.board = NxBoard() + + self.log.debug("NxSdkRuntimeService is initialized") + + def run(self): + """Retrieves commands from the runtime. STOP and PAUSE commands are + relayed to NxCore. Otherwise the number of time steps is received as + a RUN command. In this case RUN is relayed to NxCore with number of time + steps. The loop ends when receiving the STOP command from the runtime. + """ + + self.log.debug("NxSdkRuntime is running") + selector = CspSelector() + channel_actions = [(self.runtime_to_service, lambda: 'cmd')] + + while True: + action = selector.select(*channel_actions) + if action == 'cmd': + command = self.runtime_to_service.recv() + self.log.debug("Recieved command: " + str(command[0])) + if enum_equal(command, MGMT_COMMAND.STOP): + self.board.stop() + + self.service_to_runtime.send(MGMT_RESPONSE.TERMINATED) + self.join() + return + elif enum_equal(command, MGMT_COMMAND.PAUSE): + self.board.pause() + + self.service_to_runtime.send(MGMT_RESPONSE.PAUSED) + break + # If message recieved from Runtime is greater than zero + # it is the num_steps for a run, use num_steps to start + # a run + elif command[0] > 0: + self.log.debug("Command: " + str(command[0]) + + " > 0, setting num_steps and running") + self.num_steps = command + self.board.run(numSteps=self.num_steps, aSync=False) + + self.service_to_runtime.send(MGMT_RESPONSE.DONE) + else: + self.service_to_runtime.send(MGMT_RESPONSE.ERROR) + return diff --git a/tests/lava/magma/compiler/test_compiler.py b/tests/lava/magma/compiler/test_compiler.py index 46a0e4cf7..78496b3b0 100644 --- a/tests/lava/magma/compiler/test_compiler.py +++ b/tests/lava/magma/compiler/test_compiler.py @@ -1,6 +1,7 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import logging import unittest from lava.magma.compiler.channels.interfaces import ChannelType @@ -55,6 +56,7 @@ def __init__(self, **kwargs): class MockRuntimeService: + __name__ = "MockRuntimeService" pass @@ -125,8 +127,12 @@ def run(self): # A minimal RunConfig that will select SubProcModel if there is one class RunCfg(RunConfig): - def __init__(self, custom_sync_domains=None, select_sub_proc_model=False): - super().__init__(custom_sync_domains=custom_sync_domains) + def __init__(self, + loglevel: int = logging.WARNING, + custom_sync_domains=None, + select_sub_proc_model=False): + super().__init__(custom_sync_domains=custom_sync_domains, + loglevel=loglevel) self.select_sub_proc_model = select_sub_proc_model def select(self, proc, proc_models): @@ -489,7 +495,10 @@ def test_create_sync_domain(self): # Processes not assigned to a custom sync domain will get assigned # automatically to a default sync domain created for each unique sync # protocol - sd = Compiler._create_sync_domains(proc_map, run_cfg, []) + c = Compiler() + sd = c._create_sync_domains(proc_map, run_cfg, + [], + log=c.log) # We expect 5 sync domains: The 2 custom ones and 2 default ones # created for p3 and p5 and 1 AsyncDomain for processes not @@ -522,7 +531,10 @@ def test_create_sync_domains_run_config_without_sync_domain(self): # In this case, only default SyncDomains will be chosen based on the # implemented SyncProtocols of each ProcessModel c = Compiler() - sd = c._create_sync_domains(proc_map, run_cfg, []) + sd = c._create_sync_domains(proc_map, + run_cfg, + [], + log=c.log) self.assertEqual(len(sd[0]), 2) self.assertEqual(sd[0][0].protocol.__class__, ProtocolA) @@ -542,7 +554,10 @@ def test_create_sync_domains_proc_assigned_to_multiple_domains(self): c = Compiler() with self.assertRaises(AssertionError): - c._create_sync_domains(proc_map, run_cfg, []) + c._create_sync_domains(proc_map, + run_cfg, + [], + log=c.log) def test_create_sync_domains_proc_assigned_to_incompatible_domain(self): """Checks that a process can only be assigned to a sync domain with a @@ -561,7 +576,10 @@ def test_create_sync_domains_proc_assigned_to_incompatible_domain(self): # In this case, sync domain creation will fail c = Compiler() with self.assertRaises(AssertionError): - c._create_sync_domains(proc_map, run_cfg, []) + c._create_sync_domains(proc_map, + run_cfg, + [], + log=c.log) def test_create_sync_domain_non_unique_domain_names(self): """Checks that sync domain names must be unique.""" @@ -579,7 +597,10 @@ def test_create_sync_domain_non_unique_domain_names(self): # This does not compile because domain names must be unique c = Compiler() with self.assertRaises(AssertionError): - c._create_sync_domains(proc_map, run_cfg, []) + c._create_sync_domains(proc_map, + run_cfg, + [], + log=c.log) def test_create_node_cfgs(self): """Checks creation of NodeConfigs. @@ -595,7 +616,7 @@ def test_create_node_cfgs(self): # This creates the naive NodeConfig for now: c = Compiler() - ncfgs = c._create_node_cfgs(proc_map) + ncfgs = c._create_node_cfgs(proc_map, log=c.log) # It will be a single NodeCfg of type HeadNode containing all processes from lava.magma.core.resources import HeadNode @@ -769,7 +790,7 @@ def test_create_py_exec_vars(self): # First we need to compile a NodeConfig c = Compiler() - node_cfgs = c._create_node_cfgs(proc_map) + node_cfgs = c._create_node_cfgs(proc_map, log=c.log) # Creating exec_vars adds any ExecVars to each NodeConfig c._create_exec_vars(node_cfgs, proc_map, { diff --git a/tests/lava/magma/core/process/test_lif_dense_lif.py b/tests/lava/magma/core/process/test_lif_dense_lif.py index 879918023..b13a791a2 100644 --- a/tests/lava/magma/core/process/test_lif_dense_lif.py +++ b/tests/lava/magma/core/process/test_lif_dense_lif.py @@ -13,8 +13,8 @@ class SimpleRunConfig(RunConfig): def __init__(self, **kwargs): sync_domains = kwargs.pop("sync_domains") - super().__init__(custom_sync_domains=sync_domains) self.model = None + super().__init__(custom_sync_domains=sync_domains) if "model" in kwargs: self.model = kwargs.pop("model") diff --git a/tests/lava/magma/runtime/test_exception_handling.py b/tests/lava/magma/runtime/test_exception_handling.py index a90c7bc14..495645b9a 100644 --- a/tests/lava/magma/runtime/test_exception_handling.py +++ b/tests/lava/magma/runtime/test_exception_handling.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import logging import unittest from lava.magma.core.decorator import implements, requires, tag @@ -19,21 +20,24 @@ # A minimal process with an OutPort class P1(AbstractProcess): def __init__(self, **kwargs): - super().__init__(**kwargs) + super().__init__( + loglevel=logging.CRITICAL, **kwargs) self.out = OutPort(shape=(2,)) # A minimal process with an InPort class P2(AbstractProcess): def __init__(self, **kwargs): - super().__init__(**kwargs) + super().__init__( + loglevel=logging.CRITICAL, **kwargs) self.inp = InPort(shape=(2,)) # A minimal process with an InPort class P3(AbstractProcess): def __init__(self, **kwargs): - super().__init__(**kwargs) + super().__init__( + loglevel=logging.CRITICAL, **kwargs) self.inp = InPort(shape=(2,)) @@ -82,12 +86,16 @@ def test_one_pm(self): # Create an instance of P1 proc = P1() + run_steps = RunSteps(num_steps=1) + run_cfg = Loihi1SimCfg( + loglevel=logging.CRITICAL) + # Run the network for 1 time step -> no exception - proc.run(condition=RunSteps(num_steps=1), run_cfg=Loihi1SimCfg()) + proc.run(condition=run_steps, run_cfg=run_cfg) # Run the network for another time step -> expect exception with self.assertRaises(RuntimeError) as context: - proc.run(condition=RunSteps(num_steps=1), run_cfg=Loihi1SimCfg()) + proc.run(condition=run_steps, run_cfg=run_cfg) exception = context.exception self.assertEqual(RuntimeError, type(exception)) @@ -102,15 +110,19 @@ def test_two_pm(self): sender = P1() recv = P2() + run_steps = RunSteps(num_steps=1) + run_cfg = Loihi1SimCfg( + loglevel=logging.CRITICAL) + # Connect sender with receiver sender.out.connect(recv.inp) # Run the network for 1 time step -> no exception - sender.run(condition=RunSteps(num_steps=1), run_cfg=Loihi1SimCfg()) + sender.run(condition=run_steps, run_cfg=run_cfg) # Run the network for another time step -> expect exception with self.assertRaises(RuntimeError) as context: - sender.run(condition=RunSteps(num_steps=1), run_cfg=Loihi1SimCfg()) + sender.run(condition=run_steps, run_cfg=run_cfg) exception = context.exception self.assertEqual(RuntimeError, type(exception)) @@ -126,15 +138,19 @@ def test_three_pm(self): recv1 = P2() recv2 = P3() + run_steps = RunSteps(num_steps=1) + run_cfg = Loihi1SimCfg( + loglevel=logging.CRITICAL) + # Connect sender with receiver sender.out.connect([recv1.inp, recv2.inp]) # Run the network for 1 time step -> no exception - sender.run(condition=RunSteps(num_steps=1), run_cfg=Loihi1SimCfg()) + sender.run(condition=run_steps, run_cfg=run_cfg) # Run the network for another time step -> expect exception with self.assertRaises(RuntimeError) as context: - sender.run(condition=RunSteps(num_steps=1), run_cfg=Loihi1SimCfg()) + sender.run(condition=run_steps, run_cfg=run_cfg) exception = context.exception self.assertEqual(RuntimeError, type(exception)) diff --git a/tests/lava/magma/runtime/test_get_set_var.py b/tests/lava/magma/runtime/test_get_set_var.py index 897e243bb..f1fe92b02 100644 --- a/tests/lava/magma/runtime/test_get_set_var.py +++ b/tests/lava/magma/runtime/test_get_set_var.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ - import numpy as np import unittest diff --git a/tests/lava/magma/runtime/test_nxsdkruntimeservice_loihi.py b/tests/lava/magma/runtime/test_nxsdkruntimeservice_loihi.py new file mode 100644 index 000000000..106125d6f --- /dev/null +++ b/tests/lava/magma/runtime/test_nxsdkruntimeservice_loihi.py @@ -0,0 +1,71 @@ +import unittest + +from tests.lava.test_utils.utils import Utils + +from lava.magma.core.decorator import implements, requires +from lava.magma.core.model.nc.type import LavaNcType +from lava.magma.core.process.process import AbstractProcess +from lava.magma.core.process.variable import Var +from lava.magma.core.resources import Loihi2NeuroCore +from lava.magma.core.run_conditions import RunSteps +from lava.magma.core.run_configs import RunConfig +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.model.nc.model import NcProcessModel + + +class SimpleProcess(AbstractProcess): + def __init__(self, **kwargs): + super().__init__(**kwargs) + shape = kwargs["shape"] + self.u = Var(shape=shape, init=0) + self.v = Var(shape=shape, init=0) + + +class SimpleRunConfig(RunConfig): + def __init__(self, **kwargs): + super().__init__() + self.model = None + if "model" in kwargs: + self.model = kwargs.pop("model") + + def select(self, process, proc_models): + return proc_models[0] + + +@implements(proc=SimpleProcess, protocol=LoihiProtocol) +@requires(Loihi2NeuroCore) +class SimpleProcessModel(NcProcessModel): + u = LavaNcType(int, int) + v = LavaNcType(int, int) + + def post_guard(self): + return False + + def pre_guard(self): + return False + + def lrn_guard(self): + return False + + +class TestProcessLoihi2(unittest.TestCase): + # Run Loihi Tests using command below: + # + # "SLURM=1 LOIHI_GEN=N3B3 BOARD=ncl-og-05 PARTITION=oheogulch + # RUN_LOIHI_TESTS=1 python -m unittest + # tests/lava/magma/runtime/test_nxsdkruntimeservice_loihi.py" + + run_loihi_tests: bool = Utils.get_bool_env_setting("RUN_LOIHI_TESTS") + + @unittest.skipUnless(run_loihi_tests, + "runtime_to_runtimeservice_to_nxcore_to_loihi") + def test_nxsdkruntimeservice_loihi(self): + process = SimpleProcess(shape=(2, 2)) + run_config = SimpleRunConfig(sync_domains=[]) + process.run(condition=RunSteps(num_steps=10), run_cfg=run_config) + process.run(condition=RunSteps(num_steps=5), run_cfg=run_config) + process.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/lava/magma/runtime/test_ref_var_ports.py b/tests/lava/magma/runtime/test_ref_var_ports.py index b60c15de5..68bb8fce5 100644 --- a/tests/lava/magma/runtime/test_ref_var_ports.py +++ b/tests/lava/magma/runtime/test_ref_var_ports.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ - import numpy as np import unittest diff --git a/tests/lava/magma/runtime/test_runtime.py b/tests/lava/magma/runtime/test_runtime.py index 63d5aad85..5f5fa16a6 100644 --- a/tests/lava/magma/runtime/test_runtime.py +++ b/tests/lava/magma/runtime/test_runtime.py @@ -3,7 +3,7 @@ from lava.magma.compiler.executable import Executable from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.core.resources import HeadNode +from lava.magma.core.resources import HeadNode, Loihi2System from lava.magma.compiler.node import Node, NodeConfig from lava.magma.runtime.runtime import Runtime @@ -16,16 +16,16 @@ def test_runtime_creation(self): runtime: Runtime = Runtime(exe=exe, message_infrastructure_type=mp) expected_type: ty.Type = Runtime - assert isinstance( - runtime, expected_type - ), f"Expected type {expected_type} doesn't match {(type(runtime))}" + self.assertIsInstance( + runtime, expected_type, + f"Expected type {expected_type} doesn't match {(type(runtime))}") def test_executable_node_config_assertion(self): """Tests runtime constructions with expected constraints""" exec: Executable = Executable() runtime1: Runtime = Runtime(exec, ActorType.MultiProcessing) - with self.assertRaises(AssertionError): + with self.assertRaises(IndexError): runtime1.initialize() node: Node = Node(HeadNode, []) @@ -33,20 +33,28 @@ def test_executable_node_config_assertion(self): runtime2: Runtime = Runtime(exec, ActorType.MultiProcessing) runtime2.initialize() expected_type: ty.Type = Runtime - assert isinstance( - runtime2, expected_type - ), f"Expected type {expected_type} doesn't match {(type(runtime2))}" + self.assertIsInstance( + runtime2, expected_type, + f"Expected type {expected_type} doesn't match {(type(runtime2))}") runtime2.stop() - exec.node_configs[0].append(node) - runtime3: Runtime = Runtime(exec, ActorType.MultiProcessing) + exec1: Executable = Executable() + node1: Node = Node(Loihi2System, []) + exec1.node_configs.append(NodeConfig([node1])) + runtime3: Runtime = Runtime(exec1, ActorType.MultiProcessing) with self.assertRaises(AssertionError): - runtime3.initialize() + runtime3.initialize(0) exec.node_configs.append(NodeConfig([node])) runtime4: Runtime = Runtime(exec, ActorType.MultiProcessing) - with self.assertRaises(AssertionError): - runtime4.initialize() + runtime4.initialize(0) + self.assertEqual(len(runtime4._executable.node_configs), 2, + "Expected node_configs length to be 2") + node2: Node = Node(Loihi2System, []) + exec.node_configs[0].append(node2) + self.assertEqual(len(runtime4._executable.node_configs[0]), 2, + "Expected node_configs[0] node_config length to be 2") + runtime4.stop() if __name__ == "__main__": diff --git a/tests/lava/magma/runtime/test_runtime_service.py b/tests/lava/magma/runtime/test_runtime_service.py index 73e650fbe..b049aa6b7 100644 --- a/tests/lava/magma/runtime/test_runtime_service.py +++ b/tests/lava/magma/runtime/test_runtime_service.py @@ -1,14 +1,21 @@ +import random import unittest from multiprocessing.managers import SharedMemoryManager import numpy as np +from tests.lava.test_utils.utils import Utils + from lava.magma.compiler.channels.pypychannel import PyPyChannel from lava.magma.core.decorator import implements from lava.magma.core.model.py.model import AbstractPyProcessModel from lava.magma.core.process.process import AbstractProcess from lava.magma.core.sync.protocol import AbstractSyncProtocol -from lava.magma.runtime.runtime_service import PyRuntimeService +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.runtime.runtime_services.runtime_service import ( + PyRuntimeService, + NxSdkRuntimeService +) class MockInterface: @@ -86,5 +93,117 @@ def test_runtime_service_start_run(self): smm.shutdown() +class NxSdkTestRuntimeService(NxSdkRuntimeService): + def run(self): + self.board.run(numSteps=self.num_steps, aSync=False) + + def stop(self): + self.board.stop() + + def pause(self): + self.board.pause() + + def test_setup(self): + self.nxCore = self.board.nxChips[0].nxCores[0] + self.axon_map = self.nxCore.axonMap + + def test_idx(self, test_case: unittest.TestCase): + value = random.getrandbits(15) + # Setting the value of idx as value + self.axon_map[0].idx = value + self.axon_map.push(0) + # Checking the value of idx + self.axon_map.fetch(0) + test_case.assertEqual(self.axon_map[0].idx, value) + + value = random.getrandbits(15) + # Setting the value of idx as value + self.axon_map[1].idx = value + self.axon_map.push(1) + # Checking the value of idx + self.axon_map.fetch(1) + test_case.assertEqual(self.axon_map[1].idx, value) + + def test_len(self, test_case: unittest.TestCase): + value = random.getrandbits(13) + # Setting the value of len as value + self.axon_map[0].len = value + self.axon_map.push(0) + # Checking the value of len + self.axon_map.fetch(0) + test_case.assertEqual(self.axon_map[0].len, value) + + value = random.getrandbits(13) + # Setting the value of len as value + self.axon_map[1].len = value + self.axon_map.push(1) + # Checking the value of len + self.axon_map.fetch(1) + test_case.assertEqual(self.axon_map[1].len, value) + + def test_data(self, test_case: unittest.TestCase): + value = random.getrandbits(36) + # Setting the value of data as value + self.axon_map[0].data = value + self.axon_map.push(0) + # Checking the value of data + self.axon_map.fetch(0) + test_case.assertEqual(self.axon_map[0].data, value) + + value = random.getrandbits(36) + # Setting the value of data as value + self.axon_map[1].data = value + self.axon_map.push(1) + # Checking the value of data + self.axon_map.fetch(1) + test_case.assertEqual(self.axon_map[1].data, value) + + +class TestNxSdkRuntimeService(unittest.TestCase): + # Run Loihi Tests using example below: + # + # "SLURM=1 LOIHI_GEN=N3B3 BOARD=ncl-og-05 PARTITION=oheogulch + # RUN_LOIHI_TESTS=1 python -m unittest + # tests/lava/magma/runtime/test_runtime_service.py" + + run_loihi_tests: bool = Utils.get_bool_env_setting("RUN_LOIHI_TESTS") + + def test_runtime_service_construction(self): + p = LoihiProtocol() + rs = NxSdkTestRuntimeService(protocol=p) + self.assertEqual(rs.protocol, p) + self.assertEqual(rs.service_to_runtime, None) + self.assertEqual(rs.runtime_to_service, None) + + @unittest.skipUnless(run_loihi_tests, "runtimeservice_to_nxcore_to_loihi") + def test_runtime_service_loihi_start_run(self): + p = LoihiProtocol() + rs = NxSdkTestRuntimeService(protocol=p) + + smm = SharedMemoryManager() + smm.start() + runtime_to_service = create_channel(smm, name="runtime_to_service") + service_to_runtime = create_channel(smm, name="service_to_runtime") + runtime_to_service.dst_port.start() + service_to_runtime.src_port.start() + + rs.num_steps = 10 + + rs.runtime_to_service = runtime_to_service.src_port + rs.service_to_runtime = service_to_runtime.dst_port + + rs.join() + + rs.test_setup() + rs.test_idx(self) + rs.test_len(self) + rs.test_data(self) + + rs.run() + rs.stop() + + smm.shutdown() + + if __name__ == '__main__': unittest.main() diff --git a/tests/lava/proc/io/test_dataloader.py b/tests/lava/proc/io/test_dataloader.py index c0c8b6742..f339c8c72 100644 --- a/tests/lava/proc/io/test_dataloader.py +++ b/tests/lava/proc/io/test_dataloader.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ - from typing import List, Tuple import unittest import numpy as np diff --git a/tests/lava/test_utils/__init__.py b/tests/lava/test_utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lava/test_utils/utils.py b/tests/lava/test_utils/utils.py new file mode 100644 index 000000000..c268a38fb --- /dev/null +++ b/tests/lava/test_utils/utils.py @@ -0,0 +1,22 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ +import os + + +class Utils(): + """Utility Class containing testing helper + code that can be reused between tests + """ + + @staticmethod + def get_bool_env_setting(env_var: str): + """Get an environment varible and return + True if the variable is set to 1 else return + false + """ + env_test_setting = os.environ.get(env_var) + test_setting = False + if env_test_setting == "1": + test_setting = True + return test_setting