Source code for transform_data.stride_measurement

import numpy as np
import os
import argparse
import pandas as pd
from matplotlib import pyplot as plt
from tabulate import tabulate

from msGeom.data_preprocessor import DataPreprocessor
from msGeom.imu_processor import IMUProcessor
from msGeom.kalman_processor import KalmanProcessor
from msGeom.peak_detector import DetectPeaks
from msGeom.stride_processor import StrideProcessor
from msGeom.result_processor import ResultsProcessor
from msGeom.plot_processor import PlotProcessor


[docs] def parse_args(): """ Parse command-line arguments for the IMU data processing pipeline. :return: Parsed command-line arguments. :rtype: argparse.Namespace **Command-line Arguments:** - ``-f``, ``--file_path`` (str, required): Path to the input Excel file. - ``-v``, ``--verbose`` (int, optional): Verbosity level (default: 3). - ``-c``, ``--config`` (str, optional): Path to the YAML configuration file (default: .config.yaml). - ``-om``, ``--output_mode`` (str, optional): How to handle output plots. Options: "screen", "save", "both" (default: screen). - ``-o``, ``--output_dir`` (str, optional): Directory where output plots will be saved. - ``-e``, ``--export_excel`` (str, optional): Whether to export results to Excel. Options: "yes", "no" (default: yes). - ``-m``, ``--map_html`` (str, optional): Whether to generate an interactive HTML map. Options: "yes", "no" (default: no). """ parser = argparse.ArgumentParser(description="IMU data processing pipeline") parser.add_argument("-f", "--file_path", type=str, required=True, help="Path to the Excel file") parser.add_argument('-v', '--verbose', type=int, default=3, help='Verbosity level') parser.add_argument('-c', '--config', type=str, default='.config.yaml', help='Path to the configuration file') parser.add_argument('-om','--output_mode', choices=["screen", "save", "both"], default="screen", help="How to handle output plots") parser.add_argument('-o', '--output_dir', type=str, default=None, help='Directory to save output plots') parser.add_argument('-e','--export_excel', choices=["yes", "no"], default="yes", help="Export Excel with time, lat, lng, pos, vel, distance") parser.add_argument('-m','--map_html', choices=["yes", "no"], default="no", help="Generate interactive map with trajectories") return parser.parse_args()
[docs] def main(): """ Main execution function for the IMU + GPS gait analysis pipeline. This function performs the following steps: - Parses command-line arguments. - Loads and resamples sensor data from Excel. - Detects stationary periods. - Estimates orientation and position using Madgwick filter. - Fuses GPS and IMU data using a Kalman Filter. - Computes stride-level metrics and filters invalid steps. - Evaluates trajectory quality and segment-level metrics. - Optionally exports data to Excel and generates plots/maps. Dependencies include several processors from the ``msGeom`` package, including: - DataPreprocessor - IMUProcessor - KalmanProcessor - DetectPeaks - ResultsProcessor - StrideProcessor - PlotProcessor """ args = parse_args() if args.output_dir: os.makedirs(args.output_dir, exist_ok=True) sample_rate = 40 sample_period = 1 / sample_rate preprocessor = DataPreprocessor(args.config) imu_processor = IMUProcessor(sample_rate, sample_period) plotter = PlotProcessor() dp = DetectPeaks() rp = ResultsProcessor() sp = StrideProcessor(min_stride=0.2, max_stride=2.5, window_sec=1.5) file_path = args.file_path base_name = os.path.splitext(os.path.basename(file_path))[0] foot_label = "Left Foot" if "left" in base_name.lower() else "Right Foot" if "right" in base_name.lower() else base_name try: # Load and preprocess df = preprocessor.load_data(file_path) df_inter = preprocessor.resample_to_40hz(df) time, sample_rate, gyr, acc, mag = preprocessor.preprocess_data(df_inter) stationary = imu_processor.detect_stationary(acc, sample_rate) quats, acc_earth, vel, pos = imu_processor.estimate_position_madwick(time, gyr, acc, mag, stationary) df_gps, gps_pos, gps_final = preprocessor.compute_positions(df_inter, preprocessor.config) # Kalman fusion kp = KalmanProcessor(dt=sample_period, q=0.1, r=0.5) kp.initialize(gps_pos[0]) fused_trajectory = kp.run_filter_with_acc_and_gps(acc_earth, gps_pos) pos_kalman = pos.copy() pos_kalman[:, :2] = fused_trajectory df_inter["modG"] = np.linalg.norm(gyr, axis=1) df_inter["time"] = time df_imu = df_inter[["time", "Ax", "Ay", "Az", "Gx", "Gy", "Gz"]].copy() triplets_gyr = dp.detect_triplet_peaks(df_inter, column='modG') gps_lat = df_gps['lat'] if not df_gps.empty else np.full(len(pos_kalman), np.nan) gps_lng = df_gps['lng'] if not df_gps.empty else np.full(len(pos_kalman), np.nan) gps_lat = gps_lat[:len(time)] if len(gps_lat) >= len(time) else np.pad(gps_lat, (0, len(time) - len(gps_lat)), constant_values=np.nan) gps_lng = gps_lng[:len(time)] if len(gps_lng) >= len(time) else np.pad(gps_lng, (0, len(time) - len(gps_lng)), constant_values=np.nan) df_steps = rp.prepare_step_dataframe(time, gps_lat, gps_lng, pos_kalman, vel, df_inter) df_steps["minute"] = (df_steps["time"] // 60).astype(int) # Step filtering step_peaks = (triplets_gyr.loc[triplets_gyr['peak_type'] == 'main', 'orig_index'] .dropna().astype(int).values) step_peaks = np.sort(np.unique(step_peaks)) df_stride_stats, df_stride_raw = dp.compute_stride_stats_per_minute(df_steps, pos_kalman, step_peaks) df_stride_raw_clean = sp.clean_stride_data(df_stride_raw) df_stride_raw_clean["minute"] = (df_stride_raw_clean["time"] // 60).astype(int) df_stride_stats_clean = sp.recompute_stats_per_minute(df_stride_raw_clean) # Quality checks df_stats_checked = sp.check_distance_similarity(df_stride_stats_clean, gps_distance=430.04) df_stats_checked = sp.check_stride_length_range(df_stats_checked) df_steps_checked = sp.check_trajectory_smoothness(df_steps) alignment_mask, percent_ok = sp.check_spatial_alignment(pos_kalman[:, :2], gps_pos) df_eval = sp.evaluate_quality_segments(df_stats_checked, df_steps, gps_pos, imu_pos=pos_kalman[:, :2], gps_distance=430.04) df_stride_valid = df_stride_raw_clean[["time"]].copy() valid_times_set = set(df_stride_valid["time"]) df_stride_invalid = df_stride_raw[~df_stride_raw["time"].isin(valid_times_set)][["time"]].copy() kalman_gps_error, jump_indices = sp.compute_kalman_gps_error_and_jumps(time, pos_kalman, gps_pos) jump_times = time[jump_indices] # Verbose outputs and plots if args.verbose >= 2: print(f"\n{'*'*58} {foot_label} {'*'*59}\n") print(f"{'-'*130}") print(f"Processing file: {base_name}...") print(f"{'-'*130}\n") print("Quantitative Comparison:\n") total_gps_dist = np.sum(np.linalg.norm(np.diff(gps_pos, axis=0), axis=1)) print(f"- Total GPS distance: {total_gps_dist:.2f} m") rp.print_metrics("IMU", pos, gps_final) rp.print_metrics("Kalman", pos_kalman, gps_final) step_count = len(triplets_gyr[triplets_gyr['peak_type'] == 'main']) print(f"- Steps detected (modG): {step_count}") print("\nGPS jumps detected:\n") for idx in jump_indices: print(f"- Time: {time[idx]:.2f} s | Kalman-GPS Error: {kalman_gps_error[idx]:.2f} m") plotter.plot_macroscopic_comparision( pos, gps_pos, output_dir=args.output_dir if args.output_mode in ("save", "both") else None, title=f"Trajectory - {foot_label} (IMU vs GPS)", base_name=base_name + "_pre_kalman", traj_label="IMU" ) plotter.plot_macroscopic_comparision( pos_kalman, gps_pos, output_dir=args.output_dir if args.output_mode in ("save", "both") else None, title=f"Trajectory - {foot_label} (Kalman vs GPS)", base_name=base_name + "_post_kalman", traj_label="Kalman" ) dp.plot_peaks(df_inter, signal_column='modG', peak_df=triplets_gyr, signal_name='modG') sp.plot_gps_jumps(time, kalman_gps_error, jump_indices) # Extra details for verbose = 3 if args.verbose >= 3: print("\n\nAdditional Metrics and Debug Info (verbose 3):\n") print(f"- Total recorded time: {time[-1] - time[0]:.2f} s") print(f"- Total number of samples: {len(time)}") print(f"- Average step length (IMU): {np.mean(np.linalg.norm(np.diff(pos[:, :2], axis=0), axis=1)):.3f} m") print(f"- Average step length (Kalman): {np.mean(np.linalg.norm(np.diff(pos_kalman[:, :2], axis=0), axis=1)):.3f} m") print(f"- Average velocity (IMU): {np.mean(np.linalg.norm(vel, axis=1)):.3f} m/s") print(f"\n{'-'*130}") vel_magnitudes = np.linalg.norm(vel, axis=1) acc_magnitudes = np.linalg.norm(acc_earth, axis=1) print(f"\n{'*'*52} DYNAMICS CHECK {'*'*52}") print(f"- Max velocity (IMU): {np.max(vel_magnitudes):.2f} m/s") print(f"- Max acceleration (earth frame): {np.max(acc_magnitudes):.2f} m/s²") print(f"- % of samples with velocity > 3.0 m/s: {(vel_magnitudes > 3.0).sum() / len(vel_magnitudes) * 100:.1f}%") print(f"\n{'-'*130}") errors = np.linalg.norm(pos_kalman[:, :2] - gps_pos, axis=1) print(f"\n{'*'*52} SPATIAL ERROR ANALYSIS {'*'*52}") print(f"- Mean Kalman-GPS error: {np.mean(errors):.2f} m") print(f"- Median error: {np.median(errors):.2f} m") print(f"- Max error: {np.max(errors):.2f} m") print(f"- Std dev of error: {np.std(errors):.2f} m") print(f"- % of points with error < 5.0 m: {(errors < 5.0).sum() / len(errors) * 100:.1f}%") print(f"- % of points with error < 2.0 m: {(errors < 2.0).sum() / len(errors) * 100:.1f}%") print(f"\n{'-'*130}") if args.export_excel == "yes": rp.export_to_excel(df_steps, rp.get_output_path(f"{base_name}_stride.xlsx", args)) if args.map_html == "yes": map_out_path = os.path.join(args.output_dir or ".", f"{base_name}_map.html") resultados_mapa = {"IMU": pos[:, :2], "Kalman": pos_kalman[:, :2]} plotter.generate_map_with_estimates(df_gps, resultados_mapa, map_out_path, preprocessor.config) except Exception as e: print(f"Error processing {file_path}: {e}") if args.output_dir and args.output_mode in ("save", "both"): print(f"{'-'*80}") print(f"\nTrajectory plots saved to: {args.output_dir}") if args.output_mode in ("screen", "both"): plt.show()
if __name__ == "__main__": main()