Source code for msGeom.data_preprocessor


import numpy as np
import pandas as pd
import yaml
from scipy.interpolate import CubicSpline
from pyproj import Proj

[docs] class DataPreprocessor: """ Class for loading configuration and Excel data, resampling, and preprocessing sensor data. """ def __init__(self, config_path): """ Initialize the DataPreprocessor with a configuration file path. :param config_path: Path to the YAML configuration file. :type config_path: str """ self.config_path = config_path self.config = self.load_config()
[docs] def load_config(self): """ Load YAML configuration file from the specified path. :param config_path: Path to the YAML configuration file. :type config_path: str :return: Dictionary containing configuration data. :rtype: dict """ with open(self.config_path, 'r') as file: config = yaml.safe_load(file) return config
[docs] def load_data(self, file_path): """ Load Excel data from the specified path. :param file_path: Path to the Excel file. :type file_path: str :return: DataFrame containing raw data. :rtype: pd.DataFrame """ df = pd.read_excel(file_path) return df
[docs] def resample_to_40hz(self, df, time_col = '_time', freq_hz = 40, gap_threshold_ms = 200): """ Resample data to the target frequency handling session gaps. This method detects session gaps larger than `gap_threshold_ms` and resamples each session individually. :param df: Raw DataFrame. :type df: pd.DataFrame :param time_col: Name of the time column. :type time_col: str :param freq_hz: Target frequency in Hz. :type freq_hz: int :param gap_threshold_ms: Gap threshold to split sessions. :type gap_threshold_ms: int :return: Interpolated DataFrame. :rtype: pd.DataFrame """ df[time_col] = pd.to_datetime(df[time_col]) df = df.sort_values(time_col).reset_index(drop=True) df['delta'] = df[time_col].diff().dt.total_seconds() * 1000 df['session'] = (df['delta'] > gap_threshold_ms).cumsum() interpolated = [] for session_id, group in df.groupby('session'): group = group.set_index(time_col) group = group.sort_index() new_index = pd.date_range(start=group.index[0], end=group.index[-1], freq=f'{int(1000/freq_hz)}ms') df_interp = pd.DataFrame(index=new_index) for col in group.columns.difference(['delta', 'session']): clean = group[col].dropna() if len(clean) >= 4: t = (clean.index - clean.index[0]).total_seconds().to_numpy() y = clean.to_numpy() cs = CubicSpline(t, y) t_new = (new_index - clean.index[0]).total_seconds().to_numpy() df_interp[col] = cs(t_new) else: df_interp[col] = np.nan df_interp.reset_index(inplace=True) df_interp.rename(columns={'index': time_col}, inplace=True) df_interp['session'] = session_id interpolated.append(df_interp) result = pd.concat(interpolated, ignore_index=True) result.dropna(inplace=True) return result
[docs] def preprocess_data(self, df): """ Process the DataFrame to extract sensor arrays and timing information. :param df: Preprocessed DataFrame containing IMU and time data. :type df: pd.DataFrame :return: Tuple containing: - time (np.ndarray): Time vector in seconds. - sample_rate (float): Sampling frequency in Hz. - gyr (np.ndarray): Gyroscope data (Nx3) in rad/s. - acc (np.ndarray): Accelerometer data (Nx3) in m/s². - mag (np.ndarray): Magnetometer data (Nx3) in µT. :rtype: tuple[np.ndarray, float, np.ndarray, np.ndarray, np.ndarray] """ ... df['time'] = (df['_time'] - df['_time'].iloc[0]).dt.total_seconds() time = df['time'].to_numpy() sample_period = np.mean(np.diff(time)) sample_rate = 1.0 / sample_period gyr = df[['Gx', 'Gy', 'Gz']].to_numpy() * np.pi / 180 acc = df[['Ax', 'Ay', 'Az']].to_numpy() mag = df[['Mx', 'My', 'Mz']].to_numpy() * 100 return time, sample_rate, gyr, acc, mag
[docs] def compute_positions(self, df, config): """ Convert GPS coordinates to local Cartesian positions using projection configuration. :param df: DataFrame with 'lat' and 'lng' columns. :type df: pd.DataFrame :param config: Configuration dictionary containing 'Location' section with projection params. :type config: dict :return: Tuple of GPS position array, final GPS position and a DataFrame with valid GPS entries. :rtype: tuple[pd.DataFrame, np.ndarray, np.ndarray] :raises KeyError: If required projection parameters are missing in config['Location']. """ location_cfg = config["Location"] proj = Proj( proj=location_cfg["proj"], zone=location_cfg["zone"], ellps=location_cfg["ellps"], south=location_cfg["south"] ) df_gps = df[['lat', 'lng', 'time']].dropna().reset_index(drop=True) lat = df_gps['lat'].to_numpy() lng = df_gps['lng'].to_numpy() x, y = proj(lng, lat) gps_pos = np.stack((x - x[0], y - y[0]), axis=1) return df_gps,gps_pos, gps_pos[-1]