Source code for InfluxDBms.influxdb_tools

import pandas as pd
import urllib3
import yaml
import matplotlib.pyplot as plt
import pandas as pd
import argparse
from influxdb_client import InfluxDBClient
from datetime import datetime
from datetime import datetime, timezone, timedelta
from dateutil.parser import parse as parse_date

# Desactiva las advertencias de SSL
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


[docs] class InfluxHelper: """ Helper class for handling datetime parsing and default timestamp generation. """ def __init__(self): """ Initialize the InfluxHelper instance. Currently no internal state, but structure allows future extensibility. """ pass
[docs] def parse_datetime(self, value): """ Convert a string into a datetime object. :param value: String representation of a date/time. :type value: str :return: Parsed datetime object. :rtype: datetime :raises argparse.ArgumentTypeError: If the date format is invalid. """ try: return parse_date(value) except ValueError as e: raise argparse.ArgumentTypeError(f"Invalid date: {value}. Error: {e}")
[docs] def get_default_dates(self): """ Get default start and end datetimes in ISO 8601 format. :return: Tuple of (start, end) time strings. :rtype: tuple[str, str] """ now = datetime.now(timezone.utc) return now.isoformat() + "Z", (now + timedelta(minutes=30)).isoformat() + "Z"
[docs] class cInfluxDB: def __init__(self, config_path: str, timeout: int = 500_000): """ Initializes the connection to InfluxDB using a YAML configuration file. :param config_path: Path to the YAML configuration file. :type config_path: str :param timeout: Connection timeout in milliseconds. :type timeout: int """ # Load the configuration from the YAML file with open(config_path, 'r') as file: config = yaml.safe_load(file) # Extract the necessary values self.bucket = config['influxdb']['bucket'] self.org = config['influxdb']['org'] self.token = config['influxdb']['token'] self.url = config['influxdb']['url'] # Initialises the InfluxDB client self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org, \ verify_ssl=False, timeout=timeout) self.measurement = self.bucket.split("/")[0] if '/' in self.bucket else \ self.bucket
[docs] def query_data(self, from_date: datetime, to_date: datetime, qtok: str, pie: str, metrics=None) -> pd.DataFrame: """ Query data in InfluxDB, pivoting the results to get the metrics in columns. :param from_date: Start date (ISO 8601 format: 'YYYYY'-MM-DDTHH:MM:SSZ). :type from_date: datetime :param to_date: End date (ISO 8601 format: 'YYYYY'-MM-DDTHH:MM:SSZ). :type to_date: datetime :param qtok: CodeID :type qtok: str :param pie: Left or Right foot ('Right', 'Left') :type pie: str :param metrics: List of metrics to query (default: predefined set) :type metrics: list[str], optional :return: DataFrame with the metrics pivoted on columns, ordered by _time descending. :rtype: pd.DataFrame """ from_date_str = from_date.strftime('%Y-%m-%dT%H:%M:%SZ') # UTC con 'Z' to_date_str = to_date.strftime('%Y-%m-%dT%H:%M:%SZ') # Default metrics if metrics is None: metrics = ['Ax', 'Ay', 'Az', 'Gx', 'Gy', 'Gz', 'Mx', 'My', 'Mz'] metrics_str = ' or '.join([f'r._field == "{metric}"' for metric in metrics]) columns_str = ', '.join([f'"{metric}"' for metric in metrics]) query = f''' from(bucket: "{self.bucket}") |> range(start: {from_date_str}, stop: {to_date_str}) |> filter(fn: (r) => r._measurement == "{self.measurement}") |> filter(fn: (r) => {metrics_str} or r._field == "lat" or r._field == "lng") |> filter(fn: (r) => r["CodeID"] == "{qtok}" and r["type"] == "SCKS" and r["Foot"] == "{pie}") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> keep(columns: ["_time", {columns_str}, "lat", "lng"]) ''' try: result = self.client.query_api().query(org=self.org, query=query) except Exception as e: print(f"Error in the query: {str(e)}") raise # Process the results in a DataFrame data = [] for table in result: for record in table.records: val = record.values # val["Latitude"] = val.get("lat", None) # val["Longitude"] = val.get("lng", None) data.append(val) df = pd.DataFrame(data) # Elimina columnas internas que pueden molestar df = df.drop(columns=[col for col in ['result', 'table'] if col in df.columns]) return df.sort_values(by="_time", ascending=False).reset_index(drop=True)
[docs] def query_with_aggregate_window(self, from_date: datetime, to_date: datetime, window_size: str = "20ms", qtok: str = None, pie: str = None, metrics=None) -> pd.DataFrame: """ Query data in InfluxDB with aggregateWindow, pivoting the results to get metrics as columns. :param from_date: Start datetime (ISO 8601 format: 'YYYY-MM-DDTHH:MM:SSZ'). :type from_date: datetime :param to_date: End datetime (ISO 8601 format: 'YYYY-MM-DDTHH:MM:SSZ'). :type to_date: datetime :param window_size: Aggregation window size (default: '20ms'). :type window_size: str :param qtok: CodeID (required). :type qtok: str :param pie: Left or Right foot ('Right', 'Left') (required). :type pie: str :param metrics: List of metrics to query (default: predefined set). :type metrics: list[str], optional :return: DataFrame with metrics as columns, ordered by _time. :rtype: pd.DataFrame """ if not qtok or not pie: raise ValueError("Los argumentos 'qtok' y 'pie' son obligatorios para esta consulta.") from_date_str = from_date.strftime('%Y-%m-%dT%H:%M:%SZ') to_date_str = to_date.strftime('%Y-%m-%dT%H:%M:%SZ') # Default metrics if metrics is None: metrics = ['Ax', 'Ay', 'Az', 'Gx', 'Gy', 'Gz', 'Mx', 'My', 'Mz'] metrics_str = ' or '.join([f'r._field == "{metric}"' for metric in metrics]) columns_str = ', '.join([f'"{metric}"' for metric in metrics + ["Latitude", "Longitude"]]) query = f''' from(bucket: "{self.bucket}") |> range(start: time(v: "{from_date_str}"), stop: time(v: "{to_date_str}")) |> filter(fn: (r) => r._measurement == "{self.measurement}") |> filter(fn: (r) => {metrics_str} or r._field == "Latitude" or r._field == "Longitude") |> filter(fn: (r) => r["CodeID"] == "{qtok}" and r["type"] == "SCKS" and r["Foot"] == "{pie}") |> group(columns: ["_field"]) |> aggregateWindow(every: {window_size}, fn: last, createEmpty: false) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> keep(columns: ["_time", {columns_str}, "lat", "lng"]) ''' try: result = self.client.query_api().query(org=self.org, query=query) except Exception as e: print(f"Error en la consulta: {str(e)}") raise # Process the results in a DataFrame data = [] for table in result: for record in table.records: data.append(record.values) df = pd.DataFrame(data).drop(['result', 'table'], axis=1) # Make sure that all the metrics are in the DataFrame for col in ["_time"] + metrics: if col not in df: df[col] = None return df.sort_values(by="_time", ascending=False).reset_index(drop=True)
[docs] def debug_fields(self): """ Prints all available field names (_field) in the bucket to check what can be queried. """ query = f''' import "influxdata/influxdb/schema" schema.fieldKeys(bucket: "{self.bucket}") ''' try: result = self.client.query_api().query(org=self.org, query=query) print("Available fields (_field) in the bucket:") for table in result: for record in table.records: print(f"- {record.get_value()}") except Exception as e: print(f"Error al listar los campos: {e}")
[docs] def show_raw_sample(self, from_date, to_date, qtok, pie): """ Executes a sample query on InfluxDB to retrieve and print the first 5 records that match the specified filtering criteria. This method is useful for debugging or quickly inspecting raw data from the database. :param from_date: Start date of the query (inclusive). :type from_date: datetime.datetime :param to_date: End date of the query (inclusive). :type to_date: datetime.datetime :param qtok: The CodeID to filter the data by. :type qtok: str :param pie: Indicates which foot's data to query ("Left" or "Right"). :type pie: str :return: None. The results are printed directly to stdout. :rtype: None """ query = f''' from(bucket: "{self.bucket}") |> range(start: {from_date.strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {to_date.strftime('%Y-%m-%dT%H:%M:%SZ')}) |> filter(fn: (r) => r._measurement == "{self.measurement}") |> filter(fn: (r) => r["CodeID"] == "{qtok}" and r["Foot"] == "{pie}" and r["type"] == "SCKS") |> limit(n: 5) ''' try: result = self.client.query_api().query(org=self.org, query=query) for table in result: for record in table.records: print(record.values) except Exception as e: print(f"Error executing the sample query: {e}")
[docs] def close(self) -> None: """ Closes the connection to the InfluxDB client. :return: None :rtype: None """ self.client.close()