====== Lab 10 ======
The 10 DOF IMU module is connected to the RPi via the I2C bus. Select the appropriate pins (e.g., "SDA" on pin "0" and "SCL" on pin "1") and connect the module, selecting the "VSYS" pin for power. Download the modules [[https://github.com/tuupola/micropython-mpu9250/blob/master/mpu9250.py|mpu9250.py]], [[https://github.com/tuupola/micropython-mpu9250/blob/master/mpu6500.py|mpu6500.py]] and [[https://github.com/tuupola/micropython-mpu9250/blob/master/ak8963.py|ak8963.py]] modules from [[https://github.com/tuupola/micropython-mpu9250|github]]. Save the downloaded modules to the RPi memory. The following code reads data from all available sensors.
import utime
from machine import I2C, Pin
from mpu9250 import MPU9250
i2c = I2C(0, scl=Pin(1), sda=Pin(0))
sensor = MPU9250(i2c)
print("MPU9250 id: " + hex(sensor.whoami))
while True:
print(sensor.acceleration)
print(sensor.gyro)
print(sensor.magnetic)
print(sensor.temperature)
utime.sleep_ms(1000)
For low-level access directly to the sensor registers, you can use this code:
# mpu9250.py
import machine
import time
class MPU9250:
def __init__(self, spi, cs_pin=5):
self.spi = spi
self.cs = machine.Pin(cs_pin, machine.Pin.OUT)
self.cs.value(1)
self.init_sensor()
def write_reg(self, reg, val):
self.cs.value(0)
self.spi.write(bytearray([reg & 0x7F, val]))
self.cs.value(1)
def read_reg(self, reg, nbytes=1):
self.cs.value(0)
buf = bytearray(nbytes+1)
buf[0] = reg | 0x80
self.spi.write(buf[:1])
self.spi.readinto(buf[1:])
self.cs.value(1)
return buf[1:]
def init_sensor(self):
# Reset device
self.write_reg(0x6B, 0x80)
time.sleep_ms(100)
# Wake up and set default config
self.write_reg(0x6B, 0x00)
# Accel config 2g
self.write_reg(0x1C, 0x00)
# Gyro config 250 dps
self.write_reg(0x1B, 0x00)
def read_accel(self):
data = self.read_reg(0x3B, 6)
x = int.from_bytes(data[0:2], 'big', signed=True)
y = int.from_bytes(data[2:4], 'big', signed=True)
z = int.from_bytes(data[4:6], 'big', signed=True)
return x, y, z
===== Multicore processing =====
This code is just for inspiration, it will be changed soon :-}
# main.py
import machine
import time
import _thread
import ulab
from ulab import numpy as np
from mpu9250 import MPU9250
# ===============================
# Inicializace SPI a MPU9250
# ===============================
spi = machine.SPI(0, baudrate=1000000, polarity=0, phase=0)
mpu = MPU9250(spi)
# ===============================
# Shared buffer
# ===============================
SAMPLES = 128
accel_buf = [[0,0,0] for _ in range(SAMPLES)]
lock = _thread.allocate_lock()
write_idx = 0
# ===============================
# Core1 – DSP
# ===============================
def core1_task():
global write_idx
last_idx = 0
h = np.ones(5)/5 # jednoduchý low-pass
threshold = 8000
while True:
lock.acquire()
if write_idx != last_idx:
if write_idx > last_idx:
new_samples = accel_buf[last_idx:write_idx]
else:
new_samples = accel_buf[last_idx:] + accel_buf[:write_idx]
last_idx = write_idx
lock.release()
data = np.array(new_samples)
filtered_x = np.convolve(data[:,0], h, mode='same')
filtered_y = np.convolve(data[:,1], h, mode='same')
filtered_z = np.convolve(data[:,2], h, mode='same')
if np.max(np.abs(filtered_x)) > threshold or \
np.max(np.abs(filtered_y)) > threshold or \
np.max(np.abs(filtered_z)) > threshold:
print("Impact detected!")
# Send data to PC (USB)
print("X:", filtered_x[-1], "Y:", filtered_y[-1], "Z:", filtered_z[-1])
else:
lock.release()
time.sleep_ms(1)
# Start Core1
_thread.start_new_thread(core1_task, ())
# ===============================
# Core0 – data acquistion
# ===============================
while True:
x, y, z = mpu.read_accel()
lock.acquire()
accel_buf[write_idx] = [x, y, z]
write_idx = (write_idx + 1) % SAMPLES
lock.release()
time.sleep_us(500) # ~2 kHz
===== Realtime plotting =====
# plot_accel.py
import serial
import matplotlib.pyplot as plt
ser = serial.Serial('/dev/cu.usbmodem1101', 115200, timeout=1)
plt.ion()
fig, ax = plt.subplots()
x_data, y_data, z_data = [], [], []
line_x, = ax.plot(x_data, label='X')
line_y, = ax.plot(y_data, label='Y')
line_z, = ax.plot(z_data, label='Z')
ax.legend()
while True:
try:
line = ser.readline().decode().strip()
if line.startswith("X:"):
parts = line.split()
x_data.append(float(parts[1]))
y_data.append(float(parts[3]))
z_data.append(float(parts[5]))
if len(x_data) > 200:
x_data.pop(0)
y_data.pop(0)
z_data.pop(0)
line_x.set_ydata(x_data)
line_y.set_ydata(y_data)
line_z.set_ydata(z_data)
line_x.set_xdata(range(len(x_data)))
line_y.set_xdata(range(len(y_data)))
line_z.set_xdata(range(len(z_data)))
ax.relim()
ax.autoscale_view()
plt.pause(0.01)
except KeyboardInterrupt:
break