====== 9. BLE ======
===== Cíle cvičení =====
* Přístup k datům v GATT databázi
* Interpretace dat
===== Podklady =====
* BLE sniffer (WireShark) - https://bleak.readthedocs.io/en/latest/troubleshooting.html#windows-10
* Bleak (Python BLE knihovna) - https://github.com/hbldh/bleak/tree/develop/examples
===== Komunikace s BLE zařízením =====
Abychom mohli efektivně komunikovat se zařízením, v našem případě zejména číst z GATT databáze, je v první řadě třeba, aby zařízení mělo aktivní BLE a nabízelo v jeho rámci data. V případě desky Thunderboard je možné využít třeba deafultní aplikaci, která je ke stažení [[courses:b0b37nsi:hw:01|zde]].
==== Identifikace BLE zařízení ====
Abychom mohli s BLE zařízením komunikovat, je třeba znát jeho fyzickou adresu. Protože odezva BLE rozhraní může být delší, je třeba použít asynchronní přístup pomocí knihovny [[https://docs.python.org/3/library/asyncio.html|asyncio]].
import asyncio
from bleak import discover
async def main():
devices = await discover()
for d in devices:
print(d)
if __name__ == "__main__":
asyncio.run(main())
==== Dostupné služby ====
Pokud známe adresu, následující kód vrátí seznam služeb dostupných přes GATT databázi:
import sys
import asyncio
import platform
from bleak import BleakClient
async def main(address: str):
async with BleakClient(address) as client:
svcs = await client.get_services()
print("Services:")
for service in svcs:
print(service)
if __name__ == "__main__":
asyncio.run(main(sys.argv[1]))
Argumentem programu je adresa zařízení:
$ python get_services.py 84:FD:27:E6:5C:E6
Services:
00001801-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Attribute Profile
00001800-0000-1000-8000-00805f9b34fb (Handle: 9): Generic Access Profile
0000180a-0000-1000-8000-00805f9b34fb (Handle: 14): Device Information
00001815-0000-1000-8000-00805f9b34fb (Handle: 27): Automation IO
0000180f-0000-1000-8000-00805f9b34fb (Handle: 37): Battery Service
ec61a454-ed00-a5e8-b8f9-de9ec026ec51 (Handle: 41): Unknown
efd658ae-c400-ef33-76e7-91b00019103b (Handle: 44): Unknown
f598dbc5-2f00-4ec5-9936-b3d1aa4f957f (Handle: 49): Unknown
a4e649f4-4be5-11e5-885d-feff819cdc9f (Handle: 58): Unknown
0000181a-0000-1000-8000-00805f9b34fb (Handle: 68): Environmental Sensing
fcb89c40-c600-59f3-7dc3-5ece444a401b (Handle: 81): Unknown
1d14d6ee-fd63-4fa1-bfa4-8f47b42119f0 (Handle: 84): Unknown
==== Service explorer ====
Pro využití dat nabízených prostřednictvím BLE rozhraní je třeba znát ''UUID'' jednotlivých služeb nebo charakteristik (data z konkrétních senzorů). V případě IMU jsou potřebná ''UUID'' následovná:
Service: UUID="a4e649f4-4be5-11e5-885d-feff819cdc9f"
Charakteristiky:
Acceleration UUID="c4c1f6e2-4be5-11e5-885d-feff819cdc9f"
Orientation UUID="b7c4b694-bee3-45dd-ba9f-f3b5e994f49a"
Control Point UUID="71e30b8c-4131-4703-b0a0-b0bbba75856b"
Jednotlivé charakteristiky vrací 6 bytů dat, které reprezentují tři čísla typu short.
import sys
import platform
import asyncio
import logging
from bleak import BleakClient
logger = logging.getLogger(__name__)
ADDRESS = (
"84:FD:27:E6:5A:B9"
if platform.system() != "Darwin"
else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
)
async def main(address):
async with BleakClient(address) as client:
logger.info(f"Connected: {client.is_connected}")
for service in client.services:
logger.info(f"[Service] {service}")
for char in service.characteristics:
if "read" in char.properties:
try:
value = bytes(await client.read_gatt_char(char.uuid))
logger.info(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
except Exception as e:
logger.error(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
)
else:
value = None
logger.info(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
for descriptor in char.descriptors:
try:
value = bytes(
await client.read_gatt_descriptor(descriptor.handle)
)
logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}")
except Exception as e:
logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS))
import asyncio
import threading
from bleak import BleakClient
from ctypes import Structure, c_short
import matplotlib.pyplot as plt
from matplotlib.patches import Arrow
from math import sin, cos, pi, radians
BTAddr = "84:FD:27:E6:5A:D6"
char_uuid = "b7c4b694-bee3-45dd-ba9f-f3b5e994f49a"
class DataPoint(Structure):
_pack_ = 1
_fields_ = [
("RX", c_short),
("RY", c_short),
("RZ", c_short)
]
AppRunning = True
plt.ion()
fig, ax = plt.subplots(1, 3, subplot_kw={'projection': 'polar'})
fig.tight_layout()
for a in ax:
a.set_theta_zero_location('N')
a.set_yticklabels([])
a.scatter([0], [1])
Newest = None
def notification_handler(sender, data):
global Newest
Newest = data
async def Loop():
global AppRunning, Newest, ax
if Newest == None:
return
data = Newest
Point = DataPoint.from_buffer(data)
print(data.hex(), Point.RX/100, Point.RY/100, Point.RZ/100)
angleX = radians(Point.RX/100)
angleY = radians(Point.RY/100)
angleZ = radians(Point.RZ/100)
for a in ax:
a.clear()
a.set_theta_zero_location('N')
a.set_yticklabels([])
ax[0].scatter([angleX], [1])
ax[0].title.set_text('X')
ax[1].scatter([angleY], [1])
ax[1].title.set_text('Y')
ax[2].scatter([angleZ], [1])
ax[2].title.set_text('Z')
try:
fig.canvas.draw()
fig.canvas.flush_events()
except Exception as e:
print(e)
AppRunning = False
async def main():
print(f"Connecting: ", BTAddr, char_uuid)
global AppRunning
async with BleakClient(BTAddr) as client:
print(f"Connected: {client.is_connected}")
await client.start_notify(char_uuid, notification_handler)
while AppRunning:
try:
await asyncio.sleep(0)
await Loop()
except KeyboardInterrupt:
break
await client.stop_notify(char_uuid)
if __name__ == "__main__":
asyncio.run(main())