Lab 10

The 10 DOF IMU module is connected to the RPi via the I2C bus. Select the appropriate pins (e.g., “SDA” on pin “0” and “SCL” on pin “1”) and connect the module, selecting the “VSYS” pin for power. Download the modules mpu9250.py, mpu6500.py and ak8963.py modules from github. Save the downloaded modules to the RPi memory. The following code reads data from all available sensors.

import utime
from machine import I2C, Pin
from mpu9250 import MPU9250
 
i2c = I2C(0, scl=Pin(1), sda=Pin(0))
sensor = MPU9250(i2c)
 
print("MPU9250 id: " + hex(sensor.whoami))
 
while True:
    print(sensor.acceleration)
    print(sensor.gyro)
    print(sensor.magnetic)
    print(sensor.temperature)
 
    utime.sleep_ms(1000)

For low-level access directly to the sensor registers, you can use this code:

# mpu9250.py
import machine
import time
 
class MPU9250:
    def __init__(self, spi, cs_pin=5):
        self.spi = spi
        self.cs = machine.Pin(cs_pin, machine.Pin.OUT)
        self.cs.value(1)
        self.init_sensor()
 
    def write_reg(self, reg, val):
        self.cs.value(0)
        self.spi.write(bytearray([reg & 0x7F, val]))
        self.cs.value(1)
 
    def read_reg(self, reg, nbytes=1):
        self.cs.value(0)
        buf = bytearray(nbytes+1)
        buf[0] = reg | 0x80
        self.spi.write(buf[:1])
        self.spi.readinto(buf[1:])
        self.cs.value(1)
        return buf[1:]
 
    def init_sensor(self):
        # Reset device
        self.write_reg(0x6B, 0x80)
        time.sleep_ms(100)
        # Wake up and set default config
        self.write_reg(0x6B, 0x00)
        # Accel config 2g
        self.write_reg(0x1C, 0x00)
        # Gyro config 250 dps
        self.write_reg(0x1B, 0x00)
 
    def read_accel(self):
        data = self.read_reg(0x3B, 6)
        x = int.from_bytes(data[0:2], 'big', signed=True)
        y = int.from_bytes(data[2:4], 'big', signed=True)
        z = int.from_bytes(data[4:6], 'big', signed=True)
        return x, y, z

Multicore processing

This code is just for inspiration, it will be changed soon :-}

# main.py
import machine
import time
import _thread
import ulab
from ulab import numpy as np
from mpu9250 import MPU9250
 
# ===============================
# Inicializace SPI a MPU9250
# ===============================
spi = machine.SPI(0, baudrate=1000000, polarity=0, phase=0)
mpu = MPU9250(spi)
 
# ===============================
# Shared buffer
# ===============================
SAMPLES = 128
accel_buf = [[0,0,0] for _ in range(SAMPLES)]
lock = _thread.allocate_lock()
write_idx = 0
 
# ===============================
# Core1 – DSP
# ===============================
def core1_task():
    global write_idx
    last_idx = 0
    h = np.ones(5)/5  # jednoduchý low-pass
    threshold = 8000
    while True:
        lock.acquire()
        if write_idx != last_idx:
            if write_idx > last_idx:
                new_samples = accel_buf[last_idx:write_idx]
            else:
                new_samples = accel_buf[last_idx:] + accel_buf[:write_idx]
            last_idx = write_idx
            lock.release()
 
            data = np.array(new_samples)
            filtered_x = np.convolve(data[:,0], h, mode='same')
            filtered_y = np.convolve(data[:,1], h, mode='same')
            filtered_z = np.convolve(data[:,2], h, mode='same')
 
            if np.max(np.abs(filtered_x)) > threshold or \
               np.max(np.abs(filtered_y)) > threshold or \
               np.max(np.abs(filtered_z)) > threshold:
                print("Impact detected!")
 
            # Send data to PC (USB)
            print("X:", filtered_x[-1], "Y:", filtered_y[-1], "Z:", filtered_z[-1])
        else:
            lock.release()
            time.sleep_ms(1)
 
# Start Core1
_thread.start_new_thread(core1_task, ())
 
# ===============================
# Core0 – data acquistion
# ===============================
while True:
    x, y, z = mpu.read_accel()
    lock.acquire()
    accel_buf[write_idx] = [x, y, z]
    write_idx = (write_idx + 1) % SAMPLES
    lock.release()
    time.sleep_us(500)  # ~2 kHz

Realtime plotting

# plot_accel.py
import serial
import matplotlib.pyplot as plt
 
ser = serial.Serial('/dev/cu.usbmodem1101', 115200, timeout=1)
 
plt.ion()
fig, ax = plt.subplots()
x_data, y_data, z_data = [], [], []
line_x, = ax.plot(x_data, label='X')
line_y, = ax.plot(y_data, label='Y')
line_z, = ax.plot(z_data, label='Z')
ax.legend()
 
while True:
    try:
        line = ser.readline().decode().strip()
        if line.startswith("X:"):
            parts = line.split()
            x_data.append(float(parts[1]))
            y_data.append(float(parts[3]))
            z_data.append(float(parts[5]))
 
            if len(x_data) > 200:
                x_data.pop(0)
                y_data.pop(0)
                z_data.pop(0)
 
            line_x.set_ydata(x_data)
            line_y.set_ydata(y_data)
            line_z.set_ydata(z_data)
            line_x.set_xdata(range(len(x_data)))
            line_y.set_xdata(range(len(y_data)))
            line_z.set_xdata(range(len(z_data)))
 
            ax.relim()
            ax.autoscale_view()
            plt.pause(0.01)
    except KeyboardInterrupt:
        break

courses/be2m37mam/labs/10.txt · Last modified: 2025/12/04 14:29 by viteks