Skip to content

Commit

Permalink
#4 o2 emulator
Browse files Browse the repository at this point in the history
  • Loading branch information
dentra committed Feb 13, 2024
1 parent bd92ed2 commit 24b80bc
Show file tree
Hide file tree
Showing 15 changed files with 1,539 additions and 0 deletions.
55 changes: 55 additions & 0 deletions tests/emu/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from emu import o2
from emu.o2_main import main

if len(sys.argv) < 2:
print("Usage: o2 port [port]")
exit(1)

# main(sys.argv[1:])
flags = o2.ErrorFlags(0xFFFF)
print(flags)

# cmd = o2.DevModeCommand(bytes(b"\x02"))
# print("%s" % cmd.flags)
# flags = o2.StateFlags(0)
# print(flags)
# flags = flags | o2.StateFlags.POWER
# print(flags)
# flags = flags | o2.StateFlags.HEAT
# print(flags)
# flags = flags & o2.StateFlags.POWER
# print(flags)

# print(flags & o2.StateFlags.POWER != 0)
# print(flags & o2.StateFlags.HEAT != 0)
# print(flags & o2.StateFlags.UNKNOWN2 != 0)


# print(o2.WorkModeFlags(0xFF))
# print(o2.DevModeFlags(0xFF))

# flag = o2.DevModeFlags(o2.DevModeFlags.USER)
# print(flag)
# flag = o2.set_flag(flag, o2.DevModeFlags.PAIR, True)
# print(flag)
# flag = o2.set_flag(flag, o2.DevModeFlags.PAIR, False)
# print(flag)
# flag = o2.set_flag(flag, o2.DevModeFlags.PAIR, True)
# print(flag)
# flag = o2.set_flag(flag, o2.DevModeFlags.PAIR, False)
# print(flag)

# # flag = flag | o2.DevModeFlags.USER
# # print(flag)

# buf = bytearray([1])
# buf.extend(bytes([2, 3, 4]))
# buf.append(5)
# print(type(buf), buf)
# buf = bytes(buf)
# print(type(buf), buf)
5 changes: 5 additions & 0 deletions tests/emu/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .command import CmdId, Command
from .device import Device
from .emu import Emu
from .exchange import Exchange, Protocol, SerialTransport, Transport
from .packet import Packet
93 changes: 93 additions & 0 deletions tests/emu/base/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import abc
import logging

_LOGGER = logging.getLogger()


class CmdId:
def __init__(self, code: int, name: str = None) -> None:
self._code = code
self._name = f"{name}[{code:02X}]" if name else f"COMMAND[{code:02X}]"

@property
def code(self) -> int:
return self._code

@property
def name(self) -> str:
return self._name

def __str__(self) -> str:
return self.name

def __eq__(self, __value: object) -> bool:
if __value is None:
return False
if isinstance(__value, CmdId):
return self.code == __value.code
if isinstance(__value, int):
return self.code == __value
if isinstance(__value, float):
return self.code == int(__value)
if isinstance(__value, str) and __value.endswith("]"):
return self.code == int(__value[-3:-1], 16)
return False

def __hash__(self) -> int:
return self.code.__hash__()


class Command(abc.ABC):
cmd_req: CmdId = None
cmd_rsp: CmdId = None
cmd_set: CmdId = None

def __init__(self, data: bytes = None) -> None:
if data:
self.upd(data)

def get(self) -> bytes:
return None

def set(self, data: bytes) -> bytes:
return None

def upd(self, data: bytes) -> None:
pass

def mk_req(self) -> tuple[CmdId, bytes]:
return None

def mk_rsp(self) -> tuple[CmdId, bytes]:
return None

def mk_set(self) -> tuple[CmdId, bytes]:
return None

@staticmethod
def _ctl_int(name: str, current: int, min: int, max: int, up: bool) -> int:
newval = current
if up:
newval = newval + 1
if newval > max:
newval = min
else:
newval = newval - 1
if newval < min:
newval = max
_LOGGER.info("%s %d -> %d", name, current, newval)
return newval

@staticmethod
def _ctl_bool(name: str, current: bool) -> bool:
newval = not current
_LOGGER.info("%s %d -> %d", name, current, newval)
return newval

def __repr__(self) -> str:
return (
f"{self.__class__.__name__}[{self.cmd_req},{self.cmd_rsp},{self.cmd_rsp}]"
)

def __str__(self) -> str:
return self.__repr__()
75 changes: 75 additions & 0 deletions tests/emu/base/device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import abc
import logging
import typing

from .command import CmdId, Command
from .packet import Packet

_LOGGER = logging.getLogger()


class Device(abc.ABC):

def __init__(self, commands: list[Command], disabled_log: list[str]):
self._commands = commands
self._disabled_log = disabled_log
_LOGGER.info("commands: %s", self._commands)

def _find(
self, cmd: CmdId, chk_fn: typing.Callable[[Command], CmdId]
) -> typing.Optional[Command]:
for sup in self._commands:
if cmd == chk_fn(sup):
return sup
return None

def _log(self, cmd: CmdId, data: bytes):
if cmd in self._disabled_log:
return
if data:
_LOGGER.info("%s: %s (%d)", cmd, data.hex(" "), len(data))
else:
_LOGGER.info(cmd)

@abc.abstractmethod
def pkt(self, cmd: CmdId, data: bytes) -> Packet:
pass

def req(self, pkt: Packet) -> Packet:
def rsp(cmd: Command, data: bytes) -> Packet:
# self._log(cmd.cmd_rsp, data)
pkt = self.pkt(cmd.cmd_rsp, data)
assert pkt
return pkt

req = self._find(pkt.cmd, lambda c: c.cmd_req)
if req:
return rsp(req, req.get())

set = self._find(pkt.cmd, lambda c: c.cmd_set)
if set:
return rsp(set, set.set(pkt.data))

_LOGGER.warning("Unsupported command %s: %s", pkt.cmd, pkt.log_data)
return None

def upd(self, pkt: Packet) -> bool:
lst: list[typing.Callable[[Command], CmdId]] = [
# lambda c: c.cmd_req,
lambda c: c.cmd_set,
lambda c: c.cmd_rsp,
]
for itm in lst:
res = self._find(pkt.cmd, itm)
if res:
res.upd(pkt.data)
# _LOGGER.info("Updating %s with %s", res, pkt)
return True
_LOGGER.warning("Can't find command for %s", pkt)
return False

def instance(self, cls):
for cmd in self._commands:
if isinstance(cmd, cls):
return cmd
return None
48 changes: 48 additions & 0 deletions tests/emu/base/emu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
import logging
import queue
import threading
import time

from .device import Device
from .exchange import Exchange, Protocol
from .packet import Packet

_LOGGER = logging.getLogger()


class Emu:
def __init__(self, name: str, exchange: Protocol, device: Device) -> None:
self.device = device
self.alive = threading.Event()
self.reader = Exchange(name, exchange, self.alive)
self.thread: threading.Thread = None

def __del__(self):
self.stop()

def stop(self):
self.alive.set()

def start(self) -> bool:
if self.thread:
_LOGGER.warning("emu already running")
return False

def loop_task():
while not self.alive.is_set() and self.reader:
self.loop()
time.sleep(0.0005)

self.reader.start()
self.thread = threading.Thread(target=loop_task, daemon=True)
self.thread.start()

return True

def loop(self):
pkt = self.reader.read()
if pkt:
pkt = self.device.req(pkt)
if pkt:
self.reader.write(pkt)
Loading

0 comments on commit 24b80bc

Please sign in to comment.