====== 11. BLE ====== ===== Cíle cvičení ===== * Vytvoření BLE periferie a komunikace se zařízením * Přístup ke GATT charakteristikám * Propojení se stávajícím systémem ===== Podklady ===== * https://docs.micropython.org/en/latest/library/bluetooth.html * https://github.com/makeuseofcode/Raspberry-Pi-Pico-W-WH-Bluetooth-Example-in-MicroPython * https://github.com/micropython/micropython/tree/master/examples/bluetooth * Bleak (Python BLE knihovna) - https://github.com/hbldh/bleak/tree/develop/examples * https://www.bluetooth.com/specifications/assigned-numbers/ ===== Vytvoření BLE periferie ===== Pro vytvoření BLE periférie z RPi Pico pouřijeme již hotové kódy, zejména pak moduly [[https://raw.githubusercontent.com/makeuseofcode/Raspberry-Pi-Pico-W-WH-Bluetooth-Example-in-MicroPython/main/ble_advertising.py|ble_advertising.py]] a [[https://github.com/makeuseofcode/Raspberry-Pi-Pico-W-WH-Bluetooth-Example-in-MicroPython/blob/main/ble_simple_peripheral.py|ble_simple_peripheral.py]]. Oba soubory nahrajte do paměti RPi Pico tak, aby bylo možné je použít. Následující kód přečte teplotu interního teploměru RP2040 a pošle hodnotu zařízení v roli central. from machine import Pin, ADC import bluetooth from ble_simple_peripheral import BLESimplePeripheral import time # Create a Bluetooth Low Energy (BLE) object ble = bluetooth.BLE() # Create an instance of the BLESimplePeripheral class with the BLE object sp = BLESimplePeripheral(ble) adc = ADC(4) while True: if sp.is_connected(): # Check if a BLE connection is established # Read the value from the internal temperature sensor adc_value = adc.read_u16() # Convert ADC value to voltage voltage = adc_value * (3.3 / 65535.0) # Temperature calculation based on sensor characteristics temperature = 27 - (voltage - 0.706) / 0.001721 # Transmit the temperature value over BLE temperature_data = str(temperature).encode() sp.send(temperature_data) time.sleep(1) Pro otestování funkcionality lze využít BLE klient na telefonu. Na cvičení je velmi vhodné zařízení pojmenovat unikátním názvem v rámci skupiny pro jednodušší identifikaci. Pokud váš klient umí zapisovat do BLE periférie, lze vyzkoušet následující příklad, který mění stav LED na RPi PI: from machine import Pin import bluetooth from ble_simple_peripheral import BLESimplePeripheral # Create a Bluetooth Low Energy (BLE) object ble = bluetooth.BLE() # Create an instance of the BLESimplePeripheral class with the BLE object sp = BLESimplePeripheral(ble) led = Pin("LED", Pin.OUT) led_state = 0 # Define a callback function to handle received data def on_rx(data): print("Data received: ", data) # Print the received data global led_state # Access the global variable led_state if data == b'toggle\r\n': # Check if the received data is "toggle" led.value(not led_state) # Toggle the LED state (on/off) led_state = 1 - led_state # Update the LED state # Start an infinite loop while True: if sp.is_connected(): # Check if a BLE connection is established sp.on_write(on_rx) # Set the callback function for data reception Funkcionalita tohoto kódu závisí na tom, jak BLE centrála do zařízení zapisuje (celá zpráva najednou nebo jednotlivé byty/znaky). Je možné, že bude třeba kód upravit. ==== BLE centrála s modulem Bleak ==== Abychom mohli s BLE zařízením komunikovat, je třeba znát jeho fyzickou adresu. Tu lze zjistit přímo na zařízení: import bluetooth def mac2str(mac): return ':'.join([f"{b:02X}" for b in mac]) ble = bluetooth.BLE() ble.active(True) type,mac = ble.config('mac') print(mac2str(mac)) Pokud bychom chtěli vědět o všech zařízeních okolo, je možné spustit na PC scanner, který používá knihovnu Bleak. import asyncio from bleak import discover async def main(): devices = await discover() for d in devices: print(d) if __name__ == "__main__": asyncio.run(main()) ==== Dostupné služby ==== Pokud známe adresu, následující kód vrátí seznam služeb dostupných přes GATT databázi: import sys import asyncio import platform from bleak import BleakClient async def main(address: str): async with BleakClient(address) as client: svcs = await client.get_services() print("Services:") for service in svcs: print(service) if __name__ == "__main__": asyncio.run(main(sys.argv[1])) Argumentem programu je adresa zařízení: $ python get_services.py D8:3A:DD:5D:EC:0A Services: 00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile 00001801-0000-1000-8000-00805f9b34fb (Handle: 4): Generic Attribute Profile 6e400001-b5a3-f393-e0a9-e50e24dcca9e (Handle: 7): Nordic UART Service ==== Service explorer ==== Pro využití dat nabízených prostřednictvím BLE rozhraní je třeba znát ''UUID'' jednotlivých služeb nebo charakteristik (data z konkrétních senzorů). import sys import platform import asyncio import logging from bleak import BleakClient logger = logging.getLogger(__name__) ADDRESS = ( "84:FD:27:E6:5A:B9" if platform.system() != "Darwin" else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" ) async def main(address): async with BleakClient(address) as client: logger.info(f"Connected: {client.is_connected}") for service in client.services: logger.info(f"[Service] {service}") for char in service.characteristics: if "read" in char.properties: try: value = bytes(await client.read_gatt_char(char.uuid)) logger.info( f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}" ) except Exception as e: logger.error( f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}" ) else: value = None logger.info( f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}" ) for descriptor in char.descriptors: try: value = bytes( await client.read_gatt_descriptor(descriptor.handle) ) logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}") except Exception as e: logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS)) $ python get_characteristics.py INFO:bleak.backends.winrt.client:Services resolved for BleakClientWinRT (D8:3A:DD:5D:EC:0A) INFO:__main__:Connected: True INFO:__main__:[Service] 00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile INFO:__main__: [Characteristic] 00002a00-0000-1000-8000-00805f9b34fb (Handle: 2): (read), Value: b'MPY BTSTACK' INFO:__main__:[Service] 00001801-0000-1000-8000-00805f9b34fb (Handle: 4): Generic Attribute Profile INFO:__main__: [Characteristic] 00002a05-0000-1000-8000-00805f9b34fb (Handle: 5): (read), Value: b'' INFO:__main__:[Service] 6e400001-b5a3-f393-e0a9-e50e24dcca9e (Handle: 7): Nordic UART Service INFO:__main__: [Characteristic] 6e400003-b5a3-f393-e0a9-e50e24dcca9e (Handle: 8): (read,notify), Value: b'' INFO:__main__: [Descriptor] 00002902-0000-1000-8000-00805f9b34fb (Handle: 10): Client Characteristic Configuration) | Value: b'\x00\x00' INFO:__main__: [Characteristic] 6e400002-b5a3-f393-e0a9-e50e24dcca9e (Handle: 11): (write-without-response,write), Value: None Asynchronní čtení data konkrétní charakteristiky pak může vypadat např. takto: import asyncio import threading from bleak import BleakClient BTAddr = "D8:3A:DD:5D:EC:0A" char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" AppRunning = True Newest = None def notification_handler(sender, data): global Newest Newest = data async def Loop(): global AppRunning, Newest if Newest == None: return data = Newest print(data) async def main(): print(f"Connecting: ", BTAddr, char_uuid) global AppRunning async with BleakClient(BTAddr) as client: print(f"Connected: {client.is_connected}") await client.start_notify(char_uuid, notification_handler) while AppRunning: try: await asyncio.sleep(10) await Loop() except KeyboardInterrupt: break await client.stop_notify(char_uuid) if __name__ == "__main__": asyncio.run(main())