Search
The 10 DOF IMU module is connected to the RPi via the I2C bus. Select the appropriate pins (e.g., “SDA” on pin “0” and “SCL” on pin “1”) and connect the module, selecting the “VSYS” pin for power. Download the modules mpu9250.py, mpu6500.py and ak8963.py modules from github. Save the downloaded modules to the RPi memory. The following code reads data from all available sensors.
import utime from machine import I2C, Pin from mpu9250 import MPU9250 i2c = I2C(0, scl=Pin(1), sda=Pin(0)) sensor = MPU9250(i2c) print("MPU9250 id: " + hex(sensor.whoami)) while True: print(sensor.acceleration) print(sensor.gyro) print(sensor.magnetic) print(sensor.temperature) utime.sleep_ms(1000)
For low-level access directly to the sensor registers, you can use this code:
# mpu9250.py import machine import time class MPU9250: def __init__(self, spi, cs_pin=5): self.spi = spi self.cs = machine.Pin(cs_pin, machine.Pin.OUT) self.cs.value(1) self.init_sensor() def write_reg(self, reg, val): self.cs.value(0) self.spi.write(bytearray([reg & 0x7F, val])) self.cs.value(1) def read_reg(self, reg, nbytes=1): self.cs.value(0) buf = bytearray(nbytes+1) buf[0] = reg | 0x80 self.spi.write(buf[:1]) self.spi.readinto(buf[1:]) self.cs.value(1) return buf[1:] def init_sensor(self): # Reset device self.write_reg(0x6B, 0x80) time.sleep_ms(100) # Wake up and set default config self.write_reg(0x6B, 0x00) # Accel config 2g self.write_reg(0x1C, 0x00) # Gyro config 250 dps self.write_reg(0x1B, 0x00) def read_accel(self): data = self.read_reg(0x3B, 6) x = int.from_bytes(data[0:2], 'big', signed=True) y = int.from_bytes(data[2:4], 'big', signed=True) z = int.from_bytes(data[4:6], 'big', signed=True) return x, y, z
This code is just for inspiration, it will be changed soon :-}
# main.py import machine import time import _thread import ulab from ulab import numpy as np from mpu9250 import MPU9250 # =============================== # Inicializace SPI a MPU9250 # =============================== spi = machine.SPI(0, baudrate=1000000, polarity=0, phase=0) mpu = MPU9250(spi) # =============================== # Shared buffer # =============================== SAMPLES = 128 accel_buf = [[0,0,0] for _ in range(SAMPLES)] lock = _thread.allocate_lock() write_idx = 0 # =============================== # Core1 – DSP # =============================== def core1_task(): global write_idx last_idx = 0 h = np.ones(5)/5 # jednoduchý low-pass threshold = 8000 while True: lock.acquire() if write_idx != last_idx: if write_idx > last_idx: new_samples = accel_buf[last_idx:write_idx] else: new_samples = accel_buf[last_idx:] + accel_buf[:write_idx] last_idx = write_idx lock.release() data = np.array(new_samples) filtered_x = np.convolve(data[:,0], h, mode='same') filtered_y = np.convolve(data[:,1], h, mode='same') filtered_z = np.convolve(data[:,2], h, mode='same') if np.max(np.abs(filtered_x)) > threshold or \ np.max(np.abs(filtered_y)) > threshold or \ np.max(np.abs(filtered_z)) > threshold: print("Impact detected!") # Send data to PC (USB) print("X:", filtered_x[-1], "Y:", filtered_y[-1], "Z:", filtered_z[-1]) else: lock.release() time.sleep_ms(1) # Start Core1 _thread.start_new_thread(core1_task, ()) # =============================== # Core0 – data acquistion # =============================== while True: x, y, z = mpu.read_accel() lock.acquire() accel_buf[write_idx] = [x, y, z] write_idx = (write_idx + 1) % SAMPLES lock.release() time.sleep_us(500) # ~2 kHz
# plot_accel.py import serial import matplotlib.pyplot as plt ser = serial.Serial('/dev/cu.usbmodem1101', 115200, timeout=1) plt.ion() fig, ax = plt.subplots() x_data, y_data, z_data = [], [], [] line_x, = ax.plot(x_data, label='X') line_y, = ax.plot(y_data, label='Y') line_z, = ax.plot(z_data, label='Z') ax.legend() while True: try: line = ser.readline().decode().strip() if line.startswith("X:"): parts = line.split() x_data.append(float(parts[1])) y_data.append(float(parts[3])) z_data.append(float(parts[5])) if len(x_data) > 200: x_data.pop(0) y_data.pop(0) z_data.pop(0) line_x.set_ydata(x_data) line_y.set_ydata(y_data) line_z.set_ydata(z_data) line_x.set_xdata(range(len(x_data))) line_y.set_xdata(range(len(y_data))) line_z.set_xdata(range(len(z_data))) ax.relim() ax.autoscale_view() plt.pause(0.01) except KeyboardInterrupt: break