Skip to content

Commit

Permalink
Use BT characteristic for monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
abmantis authored and newAM committed Aug 23, 2023
1 parent 1030451 commit 21f1525
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
34 changes: 33 additions & 1 deletion idasen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from bleak import BleakClient
from bleak import BleakScanner
from bleak import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from typing import Any
from typing import Any, Awaitable, Callable
from typing import MutableMapping
from typing import Optional
from typing import Tuple
Expand Down Expand Up @@ -117,6 +118,37 @@ async def _connect(self):
)
time.sleep(0.3 * i)

async def monitor(self, callback: Callable[[float], Awaitable[None]]):
output_service_uuid = "99fa0020-338a-1024-8a49-009c0215f78a"
output_char_uuid = "99fa0021-338a-1024-8a49-009c0215f78a"

previous_height = 0.0

async def output_listener(char: BleakGATTCharacteristic, data: bytearray):
height = _bytes_to_meters(data)
self._logger.info(f"Got data: {height}m")

nonlocal previous_height
if abs(height - previous_height) < 0.001:
return
previous_height = height
await callback(height)

for service in self._client.services:
if service.uuid != output_service_uuid:
continue

chr_output = service.get_characteristic(output_char_uuid)
if chr_output is None:
self._logger.error("No output characteristic found")
return

self._logger.debug("Starting notify")
await self._client.start_notify(chr_output, output_listener)
return

self._logger.error("Output service not found")

@property
def is_connected(self) -> bool:
"""
Expand Down
13 changes: 7 additions & 6 deletions idasen/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ async def init(args: argparse.Namespace) -> int:
async def monitor(args: argparse.Namespace) -> None:
try:
async with IdasenDesk(args.mac_address, exit_on_fail=True) as desk:
previous_height = 0.0

async def printer(height: float):
print(f"{height:.3f} meters", flush=True)

await desk.monitor(printer)
while True:
height = await desk.get_height()
if abs(height - previous_height) > 0.001:
print(f"{height:.3f} meters", flush=True)
previous_height = height
except KeyboardInterrupt:
await asyncio.sleep(1000000)
except (KeyboardInterrupt, asyncio.exceptions.CancelledError):
pass


Expand Down
25 changes: 23 additions & 2 deletions tests/test_idasen.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ def event_loop() -> Generator[AbstractEventLoop, None, None]:
desk_mac: str = "AA:AA:AA:AA:AA:AA"


class MockCharacteristic:
"""Mocks a GATT Characteristic"""

@property
def uuid(self):
return "99fa0020-338a-1024-8a49-009c0215f78a"

def get_characteristic(self, uuid):
return ""


class MockBleakClient:
"""Mocks the bleak client for unit testing."""

Expand All @@ -42,8 +53,8 @@ async def __aexit__(self, *args, **kwargs):
return

async def start_notify(self, uuid: str, callback: Callable):
callback(uuid, bytearray([0x00, 0x00, 0x00, 0x00]))
callback(None, bytearray([0x00, 0x00, 0x00, 0x00]))
await callback(uuid, bytearray([0x00, 0x00, 0x00, 0x00]))
await callback(None, bytearray([0x10, 0x00, 0x00, 0x00]))

async def write_gatt_char(
self, uuid: str, command: bytearray, response: bool = False
Expand All @@ -66,6 +77,10 @@ async def read_gatt_char(self, uuid: str) -> bytearray:
def address(self) -> str:
return desk_mac

@property
def services(self):
return [MockCharacteristic()]


@pytest.fixture(scope="session")
async def desk(event_loop: AbstractEventLoop) -> AsyncGenerator[IdasenDesk, None]:
Expand Down Expand Up @@ -104,6 +119,12 @@ async def test_get_height(desk: IdasenDesk):
assert isinstance(height, float)


async def test_monitor(desk: IdasenDesk):
monitor_callback = mock.AsyncMock()
await desk.monitor(monitor_callback)
monitor_callback.assert_has_calls([mock.call(0.62), mock.call(0.6216)])


@pytest.mark.parametrize("target", [0.0, 2.0])
async def test_move_to_target_raises(desk: IdasenDesk, target: float):
with pytest.raises(ValueError):
Expand Down

0 comments on commit 21f1525

Please sign in to comment.