Warning
This page is located in archive. Go to the latest version of this course pages. Go the latest version of this page.

9. BLE

Cíle cvičení

  • Přístup k datům v GATT databázi
  • Interpretace dat

Podklady

Komunikace s BLE zařízením

Abychom mohli efektivně komunikovat se zařízením, v našem případě zejména číst z GATT databáze, je v první řadě třeba, aby zařízení mělo aktivní BLE a nabízelo v jeho rámci data. V případě desky Thunderboard je možné využít třeba deafultní aplikaci, která je ke stažení zde.

Identifikace BLE zařízení

Abychom mohli s BLE zařízením komunikovat, je třeba znát jeho fyzickou adresu. Protože odezva BLE rozhraní může být delší, je třeba použít asynchronní přístup pomocí knihovny asyncio.

import asyncio
from bleak import discover
 
async def main():
    devices = await discover()
    for d in devices:
        print(d)
 
if __name__ == "__main__":
    asyncio.run(main())

Dostupné služby

Pokud známe adresu, následující kód vrátí seznam služeb dostupných přes GATT databázi:

import sys
import asyncio
import platform
 
from bleak import BleakClient
 
async def main(address: str):
    async with BleakClient(address) as client:
        svcs = await client.get_services()
        print("Services:")
        for service in svcs:
            print(service)
 
if __name__ == "__main__":
    asyncio.run(main(sys.argv[1]))

Argumentem programu je adresa zařízení:

$ python get_services.py 84:FD:27:E6:5C:E6
Services:
00001801-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Attribute Profile
00001800-0000-1000-8000-00805f9b34fb (Handle: 9): Generic Access Profile
0000180a-0000-1000-8000-00805f9b34fb (Handle: 14): Device Information
00001815-0000-1000-8000-00805f9b34fb (Handle: 27): Automation IO
0000180f-0000-1000-8000-00805f9b34fb (Handle: 37): Battery Service
ec61a454-ed00-a5e8-b8f9-de9ec026ec51 (Handle: 41): Unknown
efd658ae-c400-ef33-76e7-91b00019103b (Handle: 44): Unknown
f598dbc5-2f00-4ec5-9936-b3d1aa4f957f (Handle: 49): Unknown
a4e649f4-4be5-11e5-885d-feff819cdc9f (Handle: 58): Unknown
0000181a-0000-1000-8000-00805f9b34fb (Handle: 68): Environmental Sensing
fcb89c40-c600-59f3-7dc3-5ece444a401b (Handle: 81): Unknown
1d14d6ee-fd63-4fa1-bfa4-8f47b42119f0 (Handle: 84): Unknown

Service explorer

Pro využití dat nabízených prostřednictvím BLE rozhraní je třeba znát UUID jednotlivých služeb nebo charakteristik (data z konkrétních senzorů). V případě IMU jsou potřebná UUID následovná:

Service:         UUID="a4e649f4-4be5-11e5-885d-feff819cdc9f"
Charakteristiky:
Acceleration     UUID="c4c1f6e2-4be5-11e5-885d-feff819cdc9f"
Orientation      UUID="b7c4b694-bee3-45dd-ba9f-f3b5e994f49a"
Control Point    UUID="71e30b8c-4131-4703-b0a0-b0bbba75856b"

Jednotlivé charakteristiky vrací 6 bytů dat, které reprezentují tři čísla typu short.

import sys
import platform
import asyncio
import logging
 
from bleak import BleakClient
 
logger = logging.getLogger(__name__)
 
ADDRESS = (
    "84:FD:27:E6:5A:B9"
    if platform.system() != "Darwin"
    else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
)
 
 
async def main(address):
    async with BleakClient(address) as client:
        logger.info(f"Connected: {client.is_connected}")
 
        for service in client.services:
            logger.info(f"[Service] {service}")
            for char in service.characteristics:
                if "read" in char.properties:
                    try:
                        value = bytes(await client.read_gatt_char(char.uuid))
                        logger.info(
                            f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
                        )
                    except Exception as e:
                        logger.error(
                            f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
                        )
 
                else:
                    value = None
                    logger.info(
                        f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
                    )
 
                for descriptor in char.descriptors:
                    try:
                        value = bytes(
                            await client.read_gatt_descriptor(descriptor.handle)
                        )
                        logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}")
                    except Exception as e:
                        logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}")
 
 
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS))

import asyncio
import threading
 
from bleak import BleakClient
from ctypes import Structure, c_short
 
import matplotlib.pyplot as plt
from matplotlib.patches import Arrow
from math import sin, cos, pi, radians
 
BTAddr = "84:FD:27:E6:5A:D6"
char_uuid = "b7c4b694-bee3-45dd-ba9f-f3b5e994f49a"
 
class DataPoint(Structure):
    _pack_ = 1
    _fields_ = [
        ("RX", c_short),
        ("RY", c_short),
        ("RZ", c_short)
    ]
 
AppRunning = True
 
plt.ion()
fig, ax = plt.subplots(1, 3, subplot_kw={'projection': 'polar'})
fig.tight_layout()
 
for a in ax:
    a.set_theta_zero_location('N')
    a.set_yticklabels([])
    a.scatter([0], [1])
 
Newest = None
 
def notification_handler(sender, data):
    global Newest
    Newest = data
 
async def Loop():
    global AppRunning, Newest, ax
 
    if Newest == None:
        return
 
    data = Newest
    Point = DataPoint.from_buffer(data)
 
    print(data.hex(), Point.RX/100, Point.RY/100, Point.RZ/100)
 
    angleX = radians(Point.RX/100)
    angleY = radians(Point.RY/100)
    angleZ = radians(Point.RZ/100)
 
    for a in ax:
        a.clear()
        a.set_theta_zero_location('N')
        a.set_yticklabels([])
 
    ax[0].scatter([angleX], [1])
    ax[0].title.set_text('X')
    ax[1].scatter([angleY], [1])
    ax[1].title.set_text('Y')
    ax[2].scatter([angleZ], [1])
    ax[2].title.set_text('Z')
 
    try:
        fig.canvas.draw()
        fig.canvas.flush_events()
    except Exception as e:
        print(e)
        AppRunning = False
 
async def main():
    print(f"Connecting: ", BTAddr, char_uuid)
    global AppRunning
    async with BleakClient(BTAddr) as client:
        print(f"Connected: {client.is_connected}")
        await client.start_notify(char_uuid, notification_handler)
        while AppRunning:
            try:
                await asyncio.sleep(0)
                await Loop()
            except KeyboardInterrupt:
                break
        await client.stop_notify(char_uuid)
 
if __name__ == "__main__":
    asyncio.run(main())

courses/b0b37nsi/tutorials/09.txt · Last modified: 2023/04/27 15:22 by viteks