4. IoT zařízení - komunikace (ČT 12:45, 14:30)

Bluetooth Low Energy (BLE)

Využijte mobilní aplikaci LightBlue pro vyzkoušení komunikace s BLE zařízeními:

BLE Scanner

Pro skenování okolních zařízení můžeme využít Python script s knihovnou Bleak

import asyncio
from bleak import BleakScanner
 
async def main():
    devices = await BleakScanner.discover()
    for d in devices:
        print(d)
 
asyncio.run(main())

Service Scanner

Pro výpis služeb lze využít následující skript. Je potřeba znát adresu zařízení.

import asyncio
from bleak import BleakClient
 
TARGET_ADDR = "" # TARGET DEVICE ADDRESS
 
async def main(address):
    async with BleakClient(address) as client:
        for service in client.services:
            print(f"Service: {service.uuid} | {service.description}:")
            for char in service.characteristics:
                props = ','.join(char.properties)
                print(f"  Characteristic: {char.uuid} | {char.description} | Properties: {props}")
 
asyncio.run(main(TARGET_ADDR))

RPi Pico jako BLE Peripherial Device

Lze využít low-level skriptů na hlavní stránce cvičení. Pro zjednodušení práce však využijeme knihovnu aioble.

Pro snadnou instalaci knihovnen na RPi nejprve nainstalujte modul mpremote:

pip install mpremote

Poté knihovnu aioble můžete nainstalovat pomocí (RPi můsí být připojené do PC a neobsazené).

mpremote mip install aioble

V případě problémů lze ručně překopírovat knihovnu do kořenového adresáře RPi pica.

Vytvoření BLE Peripherial zařízení

Následující skript nakonfiguruje BLE na RPi a pojmenuje ho PICO_NSI. Central zařízení se na něj mohou připojit a odpojit.

import asyncio
import bluetooth
import aioble
 
async def peripheral_task():
    while True:
        conn = await aioble.advertise(250000,name="PICO_NSI")
        print("Device connected")
 
        try:
            await conn.disconnected()
 
        finally:
            print("Device disconnected")
 
async def main():
    await asyncio.gather(
        peripheral_task(),
    )
 
asyncio.run(main())

Tvorba služeb a charakteristik

Každá sluba a charakteristika má svoje 16bitové (definované standardem) nebo 128bitové (vlastní) identifikační ID. Následující blok kódu vytvoří a zaregistruje službu s read a write charakteristikou.

SERVICE_UUID = bluetooth.UUID("00000000-00FF-00FF-00FF-000000000000")
service = aioble.Service(SERVICE_UUID)
 
WRITE_CHAR_UUID = bluetooth.UUID("00000000-00FF-00FF-00FF-000000000001")
READ_CHAR_UUID = bluetooth.UUID("00000000-00FF-00FF-00FF-000000000002")
write_char = aioble.Characteristic(service, WRITE_CHAR_UUID, write=True)
read_char = aioble.Characteristic(service, READ_CHAR_UUID, read=True)
 
aioble.register_services(service)

Službu je pak potřeba doplnit do advertise funkce:

conn = await aioble.advertise(250000,name="PICO_NSI", services=[SERVICE_UUID])

Práce s daty v charakteristikách

Následující funkce pracují s hodnotami v charakterisikách. První funkce nastaví hodnotu 0xFF na read charakteristiku každou sekundu (vhodné by bylo nahradit toto například výčtem dat ze senzoru). Druhá funkce čeká na přijatá data a vypíše je na stdout.

#funkce obsluhující read charakteristiku
async def read_task():
    read_char.write(bytes([0xFF])) # lze nahradit např. výčtem ze senzoru
    await asyncio.sleep(1)
 
#funkce čekající na data write charakteristiky
async def write_task():
    while True:
        await write_char.written()
        data = write_char.read()
        print(data)

Po zahrnutí těchto funckí je potřeba ještě aktualizovat definici mainu:

async def main():
    await asyncio.gather(
        peripheral_task(),
        read_task(),
        write_task(),
    )

Nordic UART Service

Na BLE existuje služba Nordic UART Service, která emuluje UART přes BLE. Základní implementace loopback UARTu je implementovaná níže:

import asyncio
import bluetooth
import aioble
 
# Nordic UART UUIDs
UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
UART_RX_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
UART_TX_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")
 
# Create service
uart_service = aioble.Service(UART_SERVICE_UUID)
 
# RX: client writes to device
uart_rx = aioble.Characteristic(
    uart_service,
    UART_RX_UUID,
    write=True,
)
 
# TX: device notifies client
uart_tx = aioble.Characteristic(
    uart_service,
    UART_TX_UUID,
    notify=True,
)
 
aioble.register_services(uart_service)
 
 
async def uart_task():
 
    while True:
 
        conn = await aioble.advertise(
            250000,
            name="pico-uart",
            services=[UART_SERVICE_UUID],
        )
 
        print("Client connected")
 
        try:
            while True:
 
                # Wait for incoming UART data
                await uart_rx.written()
                data = uart_rx.read()
 
                print("RX:", data)
 
                # Echo back to client
                uart_tx.write(data)
                await uart_tx.notify(conn)
 
        except Exception:
            print("Client disconnected")
 
 
asyncio.run(uart_task())

Komunikace se zařízením na PC

Pro výčet dat z charakterisiky lze využít následující skript:

import asyncio
from bleak import BleakClient
 
# Replace with your device address and characteristic UUID
TARGET_ADDR = ""
CHAR_UUID = "00000000-00FF-00FF-00FF-000000000002"
 
async def main(address):
    async with BleakClient(address) as client:
        print(f"Connected to {address}")
 
        try:
            value = await client.read_gatt_char(CHAR_UUID)
            print(value)
 
        except Exception as e:
            print(f"Error reading characteristic: {e}")
 
asyncio.run(main(TARGET_ADDR))

Zapsat data do charakteristiky lze následujícím způsobem (skript zapíše 0xFF):

import asyncio
from bleak import BleakClient
 
# Replace with your device address and characteristic UUID
TARGET_ADDR = ""
CHAR_UUID = "00000000-00FF-00FF-00FF-000000000001"
 
async def main(address):
    async with BleakClient(address) as client:
        print(f"Connected to {address}")
 
        try:
            await client.write_gatt_char(CHAR_UUID, bytes([0xFF]), response=True) # Write with response (ACK) or not
            print(f"Wrote data to {CHAR_UUID}")
        except Exception as e:
            print(f"Error writing characteristic: {e}")
 
asyncio.run(main(TARGET_ADDR))

courses/b0b37nsi/addons/extra/04_zourefil.txt · Last modified: 2026/03/23 09:59 by zourefil