Skip to content

Commit

Permalink
Add passive_scan.py example
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanpotocnik committed Nov 24, 2022
1 parent 35a0784 commit 57a68f9
Showing 1 changed file with 119 additions and 0 deletions.
119 changes: 119 additions & 0 deletions examples/passive_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Scanner using passive scanning mode
--------------
Example similar to detection_callback.py, but using passive scanning mode and showing how to iterate devices
Updated on 2022-11-24 by bojanpotocnik <info@bojanpotocnik.com>
"""
import argparse
import asyncio
import logging
from typing import Optional, List, Dict, Any, AsyncIterator, Tuple, Iterable

import bleak
from bleak import BleakScanner, BLEDevice, AdvertisementData

logger = logging.getLogger(__name__)


def _get_os_specific_scanning_params(
uuids: Optional[List[str]], rssi: Optional[int], macos_use_bdaddr: bool = False
) -> Dict[str, Any]:
def get_bluez_dbus_scanning_params() -> Dict[str, Any]:
from bleak.assigned_numbers import AdvertisementDataType
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern
from bleak.backends.bluezdbus.scanner import (
BlueZScannerArgs,
BlueZDiscoveryFilters,
)

filters = BlueZDiscoveryFilters(
# UUIDs= Added below, because it cannot be None
# RSSI= Added below, because it cannot be None
Transport="le",
DuplicateData=True,
)
if uuids:
filters["UUIDs"] = uuids
if rssi:
filters["RSSI"] = rssi

# or_patterns ar required for BlueZ passive scanning
or_patterns = [
# General Discoverable (peripherals)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x02"),
# BR/EDR Not Supported (BLE peripherals)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x04"),
# General Discoverable, BR/EDR Not Supported (BLE peripherals)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x06"),
# General Discoverable, LE and BR/EDR Capable (Controller), LE and BR/EDR Capable (Host) (computers, phones)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x1A"),
]

return {"bluez": BlueZScannerArgs(filters=filters, or_patterns=or_patterns)}

def get_core_bluetooth_scanning_params() -> Dict[str, Any]:
from bleak.backends.corebluetooth.scanner import CBScannerArgs

return {"cb": CBScannerArgs(use_bdaddr=macos_use_bdaddr)}

return {
"BleakScannerBlueZDBus": get_bluez_dbus_scanning_params,
"BleakScannerCoreBluetooth": get_core_bluetooth_scanning_params,
# "BleakScannerP4Android": get_p4android_scanning_params,
# "BleakScannerWinRT": get_winrt_scanning_params,
}.get(bleak.get_platform_scanner_backend_type().__name__, lambda: {})()


async def scan(
uuids: Optional[Iterable[str]] = None,
rssi: Optional[int] = None,
macos_use_bdaddr: bool = False,
) -> AsyncIterator[Tuple[BLEDevice, AdvertisementData]]:
devices = asyncio.Queue()

def scan_callback(bd: BLEDevice, ad: AdvertisementData):
devices.put_nowait((bd, ad))

async with BleakScanner(
detection_callback=scan_callback,
**_get_os_specific_scanning_params(uuids=uuids, rssi=rssi),
scanning_mode="passive"
):
while True:
try:
yield await devices.get()
except GeneratorExit:
break


async def main():
parser = argparse.ArgumentParser()

parser.add_argument(
"--services",
metavar="<uuid>",
nargs="*",
help="UUIDs of one or more services to filter for",
)

args = parser.parse_args()

async for device, adv_data in scan(
uuids=args.services
): # type: BLEDevice, AdvertisementData
logger.info("%s: %r", device.address, adv_data)


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)

try:
asyncio.run(main())
except KeyboardInterrupt:
pass

0 comments on commit 57a68f9

Please sign in to comment.