====== 11. BLE ======
===== Cíle cvičení =====
* Vytvoření BLE periferie a komunikace se zařízením
* Přístup ke GATT charakteristikám
* Propojení se stávajícím systémem
===== Podklady =====
* https://docs.micropython.org/en/latest/library/bluetooth.html
* https://github.com/makeuseofcode/Raspberry-Pi-Pico-W-WH-Bluetooth-Example-in-MicroPython
* https://github.com/micropython/micropython/tree/master/examples/bluetooth
* Bleak (Python BLE knihovna) - https://github.com/hbldh/bleak/tree/develop/examples
* https://www.bluetooth.com/specifications/assigned-numbers/
===== Vytvoření BLE periferie =====
Pro vytvoření BLE periférie z RPi Pico pouřijeme již hotové kódy, zejména pak moduly [[https://raw.githubusercontent.com/makeuseofcode/Raspberry-Pi-Pico-W-WH-Bluetooth-Example-in-MicroPython/main/ble_advertising.py|ble_advertising.py]] a [[https://github.com/makeuseofcode/Raspberry-Pi-Pico-W-WH-Bluetooth-Example-in-MicroPython/blob/main/ble_simple_peripheral.py|ble_simple_peripheral.py]]. Oba soubory nahrajte do paměti RPi Pico tak, aby bylo možné je použít.
Následující kód přečte teplotu interního teploměru RP2040 a pošle hodnotu zařízení v roli central.
from machine import Pin, ADC
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
import time
# Create a Bluetooth Low Energy (BLE) object
ble = bluetooth.BLE()
# Create an instance of the BLESimplePeripheral class with the BLE object
sp = BLESimplePeripheral(ble)
adc = ADC(4)
while True:
if sp.is_connected(): # Check if a BLE connection is established
# Read the value from the internal temperature sensor
adc_value = adc.read_u16()
# Convert ADC value to voltage
voltage = adc_value * (3.3 / 65535.0)
# Temperature calculation based on sensor characteristics
temperature = 27 - (voltage - 0.706) / 0.001721
# Transmit the temperature value over BLE
temperature_data = str(temperature).encode()
sp.send(temperature_data)
time.sleep(1)
Pro otestování funkcionality lze využít BLE klient na telefonu. Na cvičení je velmi vhodné zařízení pojmenovat unikátním názvem v rámci skupiny pro jednodušší identifikaci.
Pokud váš klient umí zapisovat do BLE periférie, lze vyzkoušet následující příklad, který mění stav LED na RPi PI:
from machine import Pin
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
# Create a Bluetooth Low Energy (BLE) object
ble = bluetooth.BLE()
# Create an instance of the BLESimplePeripheral class with the BLE object
sp = BLESimplePeripheral(ble)
led = Pin("LED", Pin.OUT)
led_state = 0
# Define a callback function to handle received data
def on_rx(data):
print("Data received: ", data) # Print the received data
global led_state # Access the global variable led_state
if data == b'toggle\r\n': # Check if the received data is "toggle"
led.value(not led_state) # Toggle the LED state (on/off)
led_state = 1 - led_state # Update the LED state
# Start an infinite loop
while True:
if sp.is_connected(): # Check if a BLE connection is established
sp.on_write(on_rx) # Set the callback function for data reception
Funkcionalita tohoto kódu závisí na tom, jak BLE centrála do zařízení zapisuje (celá zpráva najednou nebo jednotlivé byty/znaky). Je možné, že bude třeba kód upravit.
==== BLE centrála s modulem Bleak ====
Abychom mohli s BLE zařízením komunikovat, je třeba znát jeho fyzickou adresu. Tu lze zjistit přímo na zařízení:
import bluetooth
def mac2str(mac):
return ':'.join([f"{b:02X}" for b in mac])
ble = bluetooth.BLE()
ble.active(True)
type,mac = ble.config('mac')
print(mac2str(mac))
Pokud bychom chtěli vědět o všech zařízeních okolo, je možné spustit na PC scanner, který používá knihovnu Bleak.
import asyncio
from bleak import discover
async def main():
devices = await discover()
for d in devices:
print(d)
if __name__ == "__main__":
asyncio.run(main())
==== Dostupné služby ====
Pokud známe adresu, následující kód vrátí seznam služeb dostupných přes GATT databázi:
import sys
import asyncio
import platform
from bleak import BleakClient
async def main(address: str):
async with BleakClient(address) as client:
svcs = await client.get_services()
print("Services:")
for service in svcs:
print(service)
if __name__ == "__main__":
asyncio.run(main(sys.argv[1]))
Argumentem programu je adresa zařízení:
$ python get_services.py D8:3A:DD:5D:EC:0A
Services:
00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile
00001801-0000-1000-8000-00805f9b34fb (Handle: 4): Generic Attribute Profile
6e400001-b5a3-f393-e0a9-e50e24dcca9e (Handle: 7): Nordic UART Service
==== Service explorer ====
Pro využití dat nabízených prostřednictvím BLE rozhraní je třeba znát ''UUID'' jednotlivých služeb nebo charakteristik (data z konkrétních senzorů).
import sys
import platform
import asyncio
import logging
from bleak import BleakClient
logger = logging.getLogger(__name__)
ADDRESS = (
"84:FD:27:E6:5A:B9"
if platform.system() != "Darwin"
else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
)
async def main(address):
async with BleakClient(address) as client:
logger.info(f"Connected: {client.is_connected}")
for service in client.services:
logger.info(f"[Service] {service}")
for char in service.characteristics:
if "read" in char.properties:
try:
value = bytes(await client.read_gatt_char(char.uuid))
logger.info(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
except Exception as e:
logger.error(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
)
else:
value = None
logger.info(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
for descriptor in char.descriptors:
try:
value = bytes(
await client.read_gatt_descriptor(descriptor.handle)
)
logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}")
except Exception as e:
logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS))
$ python get_characteristics.py
INFO:bleak.backends.winrt.client:Services resolved for BleakClientWinRT (D8:3A:DD:5D:EC:0A)
INFO:__main__:Connected: True
INFO:__main__:[Service] 00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile
INFO:__main__: [Characteristic] 00002a00-0000-1000-8000-00805f9b34fb (Handle: 2): (read), Value: b'MPY BTSTACK'
INFO:__main__:[Service] 00001801-0000-1000-8000-00805f9b34fb (Handle: 4): Generic Attribute Profile
INFO:__main__: [Characteristic] 00002a05-0000-1000-8000-00805f9b34fb (Handle: 5): (read), Value: b''
INFO:__main__:[Service] 6e400001-b5a3-f393-e0a9-e50e24dcca9e (Handle: 7): Nordic UART Service
INFO:__main__: [Characteristic] 6e400003-b5a3-f393-e0a9-e50e24dcca9e (Handle: 8): (read,notify), Value: b''
INFO:__main__: [Descriptor] 00002902-0000-1000-8000-00805f9b34fb (Handle: 10): Client Characteristic Configuration) | Value: b'\x00\x00'
INFO:__main__: [Characteristic] 6e400002-b5a3-f393-e0a9-e50e24dcca9e (Handle: 11): (write-without-response,write), Value: None
Asynchronní čtení data konkrétní charakteristiky pak může vypadat např. takto:
import asyncio
import threading
from bleak import BleakClient
BTAddr = "D8:3A:DD:5D:EC:0A"
char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
AppRunning = True
Newest = None
def notification_handler(sender, data):
global Newest
Newest = data
async def Loop():
global AppRunning, Newest
if Newest == None:
return
data = Newest
print(data)
async def main():
print(f"Connecting: ", BTAddr, char_uuid)
global AppRunning
async with BleakClient(BTAddr) as client:
print(f"Connected: {client.is_connected}")
await client.start_notify(char_uuid, notification_handler)
while AppRunning:
try:
await asyncio.sleep(10)
await Loop()
except KeyboardInterrupt:
break
await client.stop_notify(char_uuid)
if __name__ == "__main__":
asyncio.run(main())