Search
Pro skenování okolních zařízení můžeme využít Python script s knihovnou Bleak
import asyncio from bleak import BleakScanner async def main(): devices = await BleakScanner.discover() for d in devices: print(d) asyncio.run(main())
Pro výpis služeb lze využít následující skript. Je potřeba znát adresu zařízení.
import asyncio from bleak import BleakClient TARGET_ADDR = "" # TARGET DEVICE ADDRESS async def main(address): async with BleakClient(address) as client: for service in client.services: print(f"Service: {service.uuid} | {service.description}:") for char in service.characteristics: props = ','.join(char.properties) print(f" Characteristic: {char.uuid} | {char.description} | Properties: {props}") asyncio.run(main(TARGET_ADDR))
Lze využít low-level skriptů na hlavní stránce cvičení. Pro zjednodušení práce však využijeme knihovnu aioble.
Pro snadnou instalaci knihovnen na RPi nejprve nainstalujte modul mpremote:
mpremote
pip install mpremote
Poté knihovnu aioble můžete nainstalovat pomocí (RPi můsí být připojené do PC a neobsazené).
aioble
mpremote mip install aioble
V případě problémů lze ručně překopírovat knihovnu do kořenového adresáře RPi pica.
Následující skript nakonfiguruje BLE na RPi a pojmenuje ho PICO_NSI. Central zařízení se na něj mohou připojit a odpojit.
PICO_NSI
import asyncio import bluetooth import aioble async def peripheral_task(): while True: conn = await aioble.advertise(250000,name="PICO_NSI") print("Device connected") try: await conn.disconnected() finally: print("Device disconnected") async def main(): await asyncio.gather( peripheral_task(), ) asyncio.run(main())
Každá sluba a charakteristika má svoje 16bitové (definované standardem) nebo 128bitové (vlastní) identifikační ID. Následující blok kódu vytvoří a zaregistruje službu s read a write charakteristikou.
read
write
SERVICE_UUID = bluetooth.UUID("00000000-00FF-00FF-00FF-000000000000") service = aioble.Service(SERVICE_UUID) WRITE_CHAR_UUID = bluetooth.UUID("00000000-00FF-00FF-00FF-000000000001") READ_CHAR_UUID = bluetooth.UUID("00000000-00FF-00FF-00FF-000000000002") write_char = aioble.Characteristic(service, WRITE_CHAR_UUID, write=True) read_char = aioble.Characteristic(service, READ_CHAR_UUID, read=True) aioble.register_services(service)
Službu je pak potřeba doplnit do advertise funkce:
advertise
conn = await aioble.advertise(250000,name="PICO_NSI", services=[SERVICE_UUID])
Následující funkce pracují s hodnotami v charakterisikách. První funkce nastaví hodnotu 0xFF na read charakteristiku každou sekundu (vhodné by bylo nahradit toto například výčtem dat ze senzoru). Druhá funkce čeká na přijatá data a vypíše je na stdout.
0xFF
#funkce obsluhující read charakteristiku async def read_task(): read_char.write(bytes([0xFF])) # lze nahradit např. výčtem ze senzoru await asyncio.sleep(1) #funkce čekající na data write charakteristiky async def write_task(): while True: await write_char.written() data = write_char.read() print(data)
Po zahrnutí těchto funckí je potřeba ještě aktualizovat definici mainu:
async def main(): await asyncio.gather( peripheral_task(), read_task(), write_task(), )
Na BLE existuje služba Nordic UART Service, která emuluje UART přes BLE. Základní implementace loopback UARTu je implementovaná níže:
Nordic UART Service
import asyncio import bluetooth import aioble # Nordic UART UUIDs UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") UART_RX_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") UART_TX_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") # Create service uart_service = aioble.Service(UART_SERVICE_UUID) # RX: client writes to device uart_rx = aioble.Characteristic( uart_service, UART_RX_UUID, write=True, ) # TX: device notifies client uart_tx = aioble.Characteristic( uart_service, UART_TX_UUID, notify=True, ) aioble.register_services(uart_service) async def uart_task(): while True: conn = await aioble.advertise( 250000, name="pico-uart", services=[UART_SERVICE_UUID], ) print("Client connected") try: while True: # Wait for incoming UART data await uart_rx.written() data = uart_rx.read() print("RX:", data) # Echo back to client uart_tx.write(data) await uart_tx.notify(conn) except Exception: print("Client disconnected") asyncio.run(uart_task())
Pro výčet dat z charakterisiky lze využít následující skript:
import asyncio from bleak import BleakClient # Replace with your device address and characteristic UUID TARGET_ADDR = "" CHAR_UUID = "00000000-00FF-00FF-00FF-000000000002" async def main(address): async with BleakClient(address) as client: print(f"Connected to {address}") try: value = await client.read_gatt_char(CHAR_UUID) print(value) except Exception as e: print(f"Error reading characteristic: {e}") asyncio.run(main(TARGET_ADDR))
Zapsat data do charakteristiky lze následujícím způsobem (skript zapíše 0xFF):
import asyncio from bleak import BleakClient # Replace with your device address and characteristic UUID TARGET_ADDR = "" CHAR_UUID = "00000000-00FF-00FF-00FF-000000000001" async def main(address): async with BleakClient(address) as client: print(f"Connected to {address}") try: await client.write_gatt_char(CHAR_UUID, bytes([0xFF]), response=True) # Write with response (ACK) or not print(f"Wrote data to {CHAR_UUID}") except Exception as e: print(f"Error writing characteristic: {e}") asyncio.run(main(TARGET_ADDR))