import numpy as np
from scipy import signal
from ahrs.filters import Madgwick, Mahony
from ahrs.common.orientation import q_conj, q_rot, axang2quat
[docs]
class IMUProcessor:
"""
Class for processing IMU data from gyroscope, accelerometer, and magnetometer sensors.
Provides methods for:
- Estimating the gravity vector
- Detecting motion and stationary periods
- Estimating position using sensor fusion algorithms (Madgwick or Mahony)
- Applying ZUPT/ZUPH corrections to reduce drift
"""
def __init__(self, sample_rate, sample_period):
self.sample_period = sample_period
self.sample_rate = sample_rate
[docs]
def estimate_gravity_vector(self, acc, alpha = 0.9):
"""
Estimate normalized gravity vector from accelerometer data using exponential moving average.
:param acc: Acceleration data array with shape (N, 3).
:type acc: np.ndarray
:param alpha: Smoothing coefficient (0 < alpha < 1).
:type alpha: float
:return: Normalized gravity vectors.
:rtype: np.ndarray
:raises ValueError: If input shape is invalid or alpha is out of bounds.
"""
if acc.ndim != 2 or acc.shape[1] != 3:
raise ValueError("Input array must have shape (N, 3) with columns [Ax, Ay, Az].")
if not (0 < alpha < 1):
raise ValueError("Parameter alpha must be in the range (0, 1).")
gravity = np.zeros_like(acc)
gravity[0] = acc[0]
for i in range(1, len(acc)):
gravity[i] = alpha * gravity[i - 1] + (1 - alpha) * acc[i]
# Normalización segura
norm = np.linalg.norm(gravity, axis=1, keepdims=True)
norm[norm == 0] = 1.0 # Evita división por cero
gravity_normalized = gravity / norm
return gravity_normalized
[docs]
def detect_stationary(self, acc, sample_rate):
"""
Detect stationary periods from accelerometer data using filtering and thresholding.
:param acc: Acceleration data array with shape (N, 3).
:type acc: np.ndarray
:param sample_rate: Sampling rate of the accelerometer signal in Hz.
:type sample_rate: float
:return: A tuple containing stationary: Boolean array indicating stationary (True) or moving (False) states.
:rtype: tuple[np.ndarray]
:raises ValueError: If input shape is invalid or sample_rate is non-positive.
"""
acc_mag = np.linalg.norm(acc, axis=1)
acc_mag_clipped = np.clip(acc_mag, 0, 20)
b, a = signal.butter(1, 0.01 / (sample_rate / 2), 'highpass')
acc_hp = signal.filtfilt(b, a, acc_mag_clipped)
b, a = signal.butter(1, 5.0 / (sample_rate / 2), 'lowpass')
acc_lp = signal.filtfilt(b, a, np.abs(acc_hp))
threshold = np.percentile(acc_lp, 15)
stationary = acc_lp < threshold
return stationary
# Agregación de ZUPH
[docs]
def detect_no_rotation(self, gyr, threshold = 0.05, duration_samples = 5):
"""
Detect periods where there is negligible angular velocity on Z-axis (i.e., no yaw rotation).
:param gyr: Gyroscope data array of shape (N, 3), in rad/s.
:type gyr: np.ndarray
:param threshold: Threshold for Z-axis angular velocity (in rad/s) to define no rotation.
:type threshold: float
:param duration_samples: Minimum number of consecutive samples below the threshold to validate no rotation.
:type duration_samples: int
:return: Boolean array of shape (N,), where True indicates no rotation around the Z-axis.
:rtype: np.ndarray
"""
gz = np.abs(gyr[:, 2]) # Only Z axis
mask = gz < threshold
stable = np.copy(mask)
for i in range(len(mask)):
if not mask[i]:
continue
if i + duration_samples <= len(mask) and np.all(mask[i:i + duration_samples]):
stable[i:i + duration_samples] = True
return stable
[docs]
def estimate_position_generic(self, method, use_mag, gyr, acc, mag, time, sample_rate, stationary):
"""
Estimate the 2D position from IMU sensor data using either the Madgwick or Mahony filter.
This method computes orientation quaternions using an AHRS filter, rotates the
acceleration to the earth frame, removes gravity, integrates to velocity and
position, and compensates for drift during stationary periods.
:param method: Algorithm to use ('madgwick' or 'mahony').
:type method: str
:param use_mag: Whether to use magnetometer data (True for MARG, False for IMU-only).
:type use_mag: bool
:param gyr: Gyroscope data array of shape (N, 3), in rad/s.
:type gyr: np.ndarray
:param acc: Accelerometer data array of shape (N, 3), in m/s^2.
:type acc: np.ndarray
:param mag: Magnetometer data array of shape (N, 3), in arbitrary units.
:type mag: np.ndarray
:param time: Time vector of shape (N,), in seconds.
:type time: np.ndarray
:param sample_rate: Sampling frequency in Hz.
:type sample_rate: float
:param stationary: Boolean array of shape (N,) indicating stationary periods.
:type stationary: np.ndarray
:return: Estimated position array of shape (N, 3).
:rtype: np.ndarray
:raises ValueError: If the method is not recognized.
"""
if method == "madgwick":
base_gain = 0.02
filter_ = Madgwick(frequency=sample_rate, gain=base_gain)
q = axang2quat([0, 0, 1], np.deg2rad(45))
elif method == "mahony":
base_kp = 1.5
filter_ = Mahony(Kp=base_kp, Ki=0.01, frequency=sample_rate)
q = np.array([1.0, 0.0, 0.0, 0.0])
acc_init = np.median(acc[time <= time[0] + 2], axis=0)
for _ in range(2000):
q = filter_.updateIMU(q, gyr=np.zeros(3), acc=acc_init)
else:
raise ValueError("Unknown method. Use 'madgwick' or 'mahony'.")
quats = np.zeros((len(time), 4))
no_rotation = self.detect_no_rotation(gyr)
no_motion = stationary & no_rotation
for t in range(len(time)):
if method == "madgwick":
filter_.gain = 0.001 if no_motion[t] else base_gain
q_new = filter_.updateMARG(q, gyr=gyr[t], acc=acc[t], mag=mag[t]) if use_mag else filter_.updateIMU(q, gyr=gyr[t], acc=acc[t])
else:
filter_.Kp = base_kp * 0.1 if no_motion[t] else (base_kp if stationary[t] else 0.0)
q_new = filter_.updateMARG(q, gyr=gyr[t], acc=acc[t], mag=mag[t]) if use_mag else filter_.updateIMU(q, gyr=gyr[t], acc=acc[t])
if q_new is not None:
q = q_new
quats[t] = q
gravity = self.estimate_gravity_vector(acc, 0.95)
acc_earth = np.array([q_rot(q_conj(qt), a) for qt, a in zip(quats, acc)])
acc_earth -= gravity
acc_earth *= 9.81
vel = np.zeros_like(acc_earth)
for t in range(1, len(vel)):
vel[t] = vel[t - 1] + acc_earth[t] * (self.sample_period)
if stationary[t]:
vel[t] = 0
drift = np.zeros_like(vel)
starts = np.where(np.diff(stationary.astype(int)) == -1)[0] + 1
ends = np.where(np.diff(stationary.astype(int)) == 1)[0] + 1
for s, e in zip(starts, ends):
drift_rate = vel[e - 1] / (e - s)
drift[s:e] = np.outer(np.arange(e - s), drift_rate)
vel -= drift
pos = np.zeros_like(vel)
for t in range(1, len(pos)):
pos[t] = pos[t - 1] + vel[t] * self.sample_period
return pos
[docs]
def estimate_position_madwick(self, time, gyr, acc, mag, stationary):
"""
Estimate orientation, linear acceleration, velocity, and position from sensor data using a Madgwick filter
and zero-velocity updates.
Applies orientation estimation with adaptive gain based on motion state (ZUPH),
and velocity correction during stationary periods (ZUPT).
:param time: Array of time stamps.
:type time: np.ndarray
:param gyr: Gyroscope data array with shape (N, 3), in rad/s.
:type gyr: np.ndarray
:param acc: Accelerometer data array with shape (N, 3), in m/s².
:type acc: np.ndarray
:param mag: Magnetometer data array with shape (N, 3), in µT.
:type mag: np.ndarray
:param stationary: Boolean array indicating stationary states (True for stationary).
:type stationary: np.ndarray
:return: Tuple (quats, acc_earth, vel, pos) where:
- quats: Quaternion orientation estimates.
- acc_earth: Linear acceleration in earth frame (gravity removed).
- vel: Estimated velocity with ZUPT correction.
- pos: Estimated position.
:rtype: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
:raises ValueError: If input shapes are inconsistent.
"""
madgwick = Madgwick(frequency=self.sample_rate, gain=0.02)
low_gain = 0.001
q = np.array([1.0, 0.0, 0.0, 0.0])
quats = np.zeros((len(time), 4))
quats[0] = q
no_rotation = self.detect_no_rotation(gyr)
no_motion = stationary & no_rotation
for t in range(1, len(time)):
madgwick.gain = low_gain if no_motion[t] else 0.02
q = madgwick.updateMARG(q, gyr=gyr[t], acc=acc[t], mag=mag[t])
quats[t] = q
gravity = self.estimate_gravity_vector(acc, 0.95)
acc_earth = np.array([q_rot(q_conj(qt), a) for qt, a in zip(quats, acc)])
acc_earth -= gravity
acc_earth *= 9.81
vel = np.zeros_like(acc_earth)
for t in range(1, len(vel)):
vel[t] = vel[t - 1] + acc_earth[t] * self.sample_period
if stationary[t] and not stationary[t - 1]:
vel[t] = 0
vel_drift = np.zeros_like(vel)
starts = np.where(np.diff(stationary.astype(int)) == -1)[0] + 1
ends = np.where(np.diff(stationary.astype(int)) == 1)[0] + 1
for s, e in zip(starts, ends):
if e > s:
drift_rate = vel[e - 1] / (e - s)
vel_drift[s:e] = np.outer(np.arange(e - s), drift_rate)
vel -= vel_drift
pos = np.zeros_like(vel)
for t in range(1, len(pos)):
pos[t] = pos[t - 1] + vel[t] * self.sample_period
return quats, acc_earth, vel, pos