====== 9. BLE ====== ===== Cíle cvičení ===== * Přístup k datům v GATT databázi * Interpretace dat ===== Podklady ===== * BLE sniffer (WireShark) - https://bleak.readthedocs.io/en/latest/troubleshooting.html#windows-10 * Bleak (Python BLE knihovna) - https://github.com/hbldh/bleak/tree/develop/examples ===== Komunikace s BLE zařízením ===== Abychom mohli efektivně komunikovat se zařízením, v našem případě zejména číst z GATT databáze, je v první řadě třeba, aby zařízení mělo aktivní BLE a nabízelo v jeho rámci data. V případě desky Thunderboard je možné využít třeba deafultní aplikaci, která je ke stažení [[courses:b0b37nsi:hw:01|zde]]. ==== Identifikace BLE zařízení ==== Abychom mohli s BLE zařízením komunikovat, je třeba znát jeho fyzickou adresu. Protože odezva BLE rozhraní může být delší, je třeba použít asynchronní přístup pomocí knihovny [[https://docs.python.org/3/library/asyncio.html|asyncio]]. import asyncio from bleak import discover async def main(): devices = await discover() for d in devices: print(d) if __name__ == "__main__": asyncio.run(main()) ==== Dostupné služby ==== Pokud známe adresu, následující kód vrátí seznam služeb dostupných přes GATT databázi: import sys import asyncio import platform from bleak import BleakClient async def main(address: str): async with BleakClient(address) as client: svcs = await client.get_services() print("Services:") for service in svcs: print(service) if __name__ == "__main__": asyncio.run(main(sys.argv[1])) Argumentem programu je adresa zařízení: $ python get_services.py 84:FD:27:E6:5C:E6 Services: 00001801-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Attribute Profile 00001800-0000-1000-8000-00805f9b34fb (Handle: 9): Generic Access Profile 0000180a-0000-1000-8000-00805f9b34fb (Handle: 14): Device Information 00001815-0000-1000-8000-00805f9b34fb (Handle: 27): Automation IO 0000180f-0000-1000-8000-00805f9b34fb (Handle: 37): Battery Service ec61a454-ed00-a5e8-b8f9-de9ec026ec51 (Handle: 41): Unknown efd658ae-c400-ef33-76e7-91b00019103b (Handle: 44): Unknown f598dbc5-2f00-4ec5-9936-b3d1aa4f957f (Handle: 49): Unknown a4e649f4-4be5-11e5-885d-feff819cdc9f (Handle: 58): Unknown 0000181a-0000-1000-8000-00805f9b34fb (Handle: 68): Environmental Sensing fcb89c40-c600-59f3-7dc3-5ece444a401b (Handle: 81): Unknown 1d14d6ee-fd63-4fa1-bfa4-8f47b42119f0 (Handle: 84): Unknown ==== Service explorer ==== Pro využití dat nabízených prostřednictvím BLE rozhraní je třeba znát ''UUID'' jednotlivých služeb nebo charakteristik (data z konkrétních senzorů). V případě IMU jsou potřebná ''UUID'' následovná: Service: UUID="a4e649f4-4be5-11e5-885d-feff819cdc9f" Charakteristiky: Acceleration UUID="c4c1f6e2-4be5-11e5-885d-feff819cdc9f" Orientation UUID="b7c4b694-bee3-45dd-ba9f-f3b5e994f49a" Control Point UUID="71e30b8c-4131-4703-b0a0-b0bbba75856b" Jednotlivé charakteristiky vrací 6 bytů dat, které reprezentují tři čísla typu short. import sys import platform import asyncio import logging from bleak import BleakClient logger = logging.getLogger(__name__) ADDRESS = ( "84:FD:27:E6:5A:B9" if platform.system() != "Darwin" else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" ) async def main(address): async with BleakClient(address) as client: logger.info(f"Connected: {client.is_connected}") for service in client.services: logger.info(f"[Service] {service}") for char in service.characteristics: if "read" in char.properties: try: value = bytes(await client.read_gatt_char(char.uuid)) logger.info( f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}" ) except Exception as e: logger.error( f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}" ) else: value = None logger.info( f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}" ) for descriptor in char.descriptors: try: value = bytes( await client.read_gatt_descriptor(descriptor.handle) ) logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}") except Exception as e: logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS)) import asyncio import threading from bleak import BleakClient from ctypes import Structure, c_short import matplotlib.pyplot as plt from matplotlib.patches import Arrow from math import sin, cos, pi, radians BTAddr = "84:FD:27:E6:5A:D6" char_uuid = "b7c4b694-bee3-45dd-ba9f-f3b5e994f49a" class DataPoint(Structure): _pack_ = 1 _fields_ = [ ("RX", c_short), ("RY", c_short), ("RZ", c_short) ] AppRunning = True plt.ion() fig, ax = plt.subplots(1, 3, subplot_kw={'projection': 'polar'}) fig.tight_layout() for a in ax: a.set_theta_zero_location('N') a.set_yticklabels([]) a.scatter([0], [1]) Newest = None def notification_handler(sender, data): global Newest Newest = data async def Loop(): global AppRunning, Newest, ax if Newest == None: return data = Newest Point = DataPoint.from_buffer(data) print(data.hex(), Point.RX/100, Point.RY/100, Point.RZ/100) angleX = radians(Point.RX/100) angleY = radians(Point.RY/100) angleZ = radians(Point.RZ/100) for a in ax: a.clear() a.set_theta_zero_location('N') a.set_yticklabels([]) ax[0].scatter([angleX], [1]) ax[0].title.set_text('X') ax[1].scatter([angleY], [1]) ax[1].title.set_text('Y') ax[2].scatter([angleZ], [1]) ax[2].title.set_text('Z') try: fig.canvas.draw() fig.canvas.flush_events() except Exception as e: print(e) AppRunning = False async def main(): print(f"Connecting: ", BTAddr, char_uuid) global AppRunning async with BleakClient(BTAddr) as client: print(f"Connected: {client.is_connected}") await client.start_notify(char_uuid, notification_handler) while AppRunning: try: await asyncio.sleep(0) await Loop() except KeyboardInterrupt: break await client.stop_notify(char_uuid) if __name__ == "__main__": asyncio.run(main())