diff --git a/notebooks/temporal_analysis_user_guide.ipynb b/notebooks/temporal_analysis_user_guide.ipynb new file mode 100644 index 00000000..1219958c --- /dev/null +++ b/notebooks/temporal_analysis_user_guide.ipynb @@ -0,0 +1,971 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "# Setup" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "Use the steady state frames, used in the paper describing the methods. Also uses roughly the same style in the plots." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "## Plot in separated windows\n", + "# %matplotlib qt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "steady_states = {\n", + " \"uo-050-180-180\": [211, 800],\n", + " \"uo-060-180-180\": [243, 771],\n", + " \"uo-070-180-180\": [203, 1113],\n", + " \"uo-100-180-180\": [200, 790],\n", + " \"uo-145-180-180\": [300, 1097],\n", + " \"uo-180-180-070\": [500, 1399],\n", + " \"uo-180-180-095\": [400, 1350],\n", + " \"uo-180-180-120\": [300, 1099],\n", + " \"uo-180-180-180\": [400, 1284],\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "## Load trajectories" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import pathlib\n", + "\n", + "from pedpy import TrajectoryData, TrajectoryUnit, load_trajectory\n", + "from pedpy.column_identifier import *\n", + "\n", + "folder = pathlib.Path(\"demo-data/uni-directional\")\n", + "trajectories = {}\n", + "trajectories_in_steady_state = {}\n", + "for file in folder.glob(\"uo*.txt\"):\n", + " trajectory = load_trajectory(\n", + " trajectory_file=file,\n", + " default_frame_rate=16.0,\n", + " default_unit=TrajectoryUnit.CENTIMETER,\n", + " )\n", + " trajectory_in_steady_state = TrajectoryData(\n", + " data=trajectory.data[\n", + " trajectory.data.frame.between(\n", + " steady_states[file.stem][0], steady_states[file.stem][1]\n", + " )\n", + " ],\n", + " frame_rate=trajectory.frame_rate,\n", + " )\n", + "\n", + " trajectories[file.stem] = trajectory\n", + " trajectories_in_steady_state[file.stem] = trajectory_in_steady_state" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "## Define geometry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "from shapely import Polygon\n", + "\n", + "from pedpy import WalkableArea\n", + "\n", + "walkable_area = WalkableArea(\n", + " Polygon(\n", + " [\n", + " (2.8, -6.5),\n", + " (2.8, -4),\n", + " (1.8, -4),\n", + " (1.8, 4),\n", + " (2.8, 4),\n", + " (2.8, 8),\n", + " (-1, 8),\n", + " (-1, 4),\n", + " (0, 4),\n", + " (0, -4),\n", + " (-1, -4),\n", + " (-1, -6.5),\n", + " ]\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "## Define measurement areas" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "from pedpy import MeasurementArea, MeasurementLine\n", + "\n", + "measurement_area = MeasurementArea([(0, -2.5), (0, 2), (1.8, 2), (1.8, -2.5)])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "## Plot trajectories, geometry, and measurement areas/lines" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# import matplotlib.pyplot as plt\n", + "\n", + "# from pedpy import plot_measurement_setup\n", + "\n", + "# fig, axs = plt.subplots(3, int(len(trajectories) / 3), figsize=(10, 30))\n", + "\n", + "# for (name, trajectory), ax in zip(\n", + "# trajectories.items(), axs.ravel(), strict=False\n", + "# ):\n", + "# ax.set_title(name)\n", + "\n", + "# ax = plot_measurement_setup(\n", + "# traj=trajectory,\n", + "# walkable_area=walkable_area,\n", + "# measurement_areas=[measurement_area],\n", + "# axes=ax,\n", + "# traj_width=0.2,\n", + "# traj_start_marker=\".\",\n", + "# traj_end_marker=\"x\",\n", + "# ma_color=\"g\",\n", + "# ma_line_color=\"g\",\n", + "# ma_alpha=0.2,\n", + "# ml_color=\"b\",\n", + "# )\n", + "# ax.set_aspect(\"equal\")\n", + "\n", + "# fig.tight_layout()\n", + "# plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "# Short-Time Fourier Transform (STFT)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "TrajectoryData_FT = trajectories[\"uo-180-180-070\"]\n", + "\n", + "# File names\n", + "# \"uo-050-180-180\"\n", + "# \"uo-060-180-180\"\n", + "# \"uo-070-180-180\"\n", + "# \"uo-100-180-180\"\n", + "# \"uo-145-180-180\"\n", + "# \"uo-180-180-070\"\n", + "# \"uo-180-180-095\"\n", + "# \"uo-180-180-120\"\n", + "# \"uo-180-180-180\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1) Compute the STFT for the sway of one participant: Computation on raw trajectory data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# # Mockup data for testing\n", + "# import pandas as pd\n", + "# import numpy as np\n", + "# from shapely.geometry import Point\n", + "\n", + "# # Define the number of frames\n", + "# num_frames = 400\n", + "\n", + "# # Create the DataFrame\n", + "# data = []\n", + "# for id_val in [1, 2]: # Two unique IDs\n", + "# for frame in range(num_frames+500*id_val):\n", + "# x = np.sin(0.2*np.pi*frame / TrajectoryData_FT.frame_rate)+np.sin(4*np.pi*frame / TrajectoryData_FT.frame_rate)+4 # Compute x as cos(frame/16) +4\n", + "# y = 2*frame / TrajectoryData_FT.frame_rate # Compute y as 1.5*(frame/16)\n", + "# point = Point(x, y) # Create Shapely Point\n", + "\n", + "# data.append([id_val, frame, x, y, point])\n", + "\n", + "# # Convert to DataFrame\n", + "# mock_data = pd.DataFrame(data, columns=[\"id\", \"frame\", \"x\", \"y\", \"point\"])\n", + "\n", + "\n", + "# TrajectoryData_FT= TrajectoryData(\n", + "# data=mock_data,\n", + "# frame_rate=TrajectoryData_FT.frame_rate,\n", + "# )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pedpy import compute_STFT\n", + "\n", + "# Group the data by 'id' and apply the STFT function\n", + "stft_results = TrajectoryData_FT.data.groupby(\"id\")[X_COL].apply(\n", + " lambda x: compute_STFT(\n", + " x,\n", + " TrajectoryData_FT.frame_rate,\n", + " segments_length=5 * TrajectoryData_FT.frame_rate,\n", + " overlap_length=2 * TrajectoryData_FT.frame_rate,\n", + " zeros_padded=10 * TrajectoryData_FT.frame_rate,\n", + " )\n", + ")\n", + "\n", + "# Reset index to makes \"id\" a column instead of an index.\n", + "stft_results = stft_results.reset_index()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Normalisation of the results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "# Find the maximum Time value and its associated id\n", + "max_time_row = stft_results.loc[stft_results[\"Time\"].idxmax()]\n", + "max_time = max_time_row[\"Time\"]\n", + "max_time_id = max_time_row[\"id\"]\n", + "\n", + "\n", + "# print(f\"Max Time: {max_time}, Associated ID: {max_time_id}\")\n", + "\n", + "# Determine time resolution of the series for the ID with max Time\n", + "max_time_series = stft_results[stft_results[\"id\"] == max_time_id]\n", + "time_resolution = np.diff(np.sort(max_time_series[\"Time\"].unique())).min()\n", + "\n", + "# print(f\"Time resolution for ID {max_time_id}: {time_resolution}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Resample all time series and interpolate missing values\n", + "import pandas as pd\n", + "from scipy.interpolate import interp1d\n", + "\n", + "# Create a new uniform time grid for resampling\n", + "new_time_grid = np.arange(0, max_time + time_resolution, time_resolution)\n", + "\n", + "# loop over all IDs and frequencies\n", + "resampled_data = []\n", + "for id_val, group in stft_results.groupby(\"id\"):\n", + " frequencies = group[\"Frequency\"].unique() # Get unique frequencies\n", + "\n", + " for freq in frequencies:\n", + " freq_group = group[group[\"Frequency\"] == freq]\n", + "\n", + " old_time = freq_group[\"Time\"].values\n", + " magnitude = freq_group[\"Magnitude\"].values\n", + " phase = freq_group[\"Phase\"].values\n", + "\n", + " # Interpolation functions\n", + " interp_magnitude = interp1d(\n", + " old_time, magnitude, kind=\"linear\", fill_value=\"extrapolate\"\n", + " )\n", + " interp_phase = interp1d(\n", + " old_time, phase, kind=\"linear\", fill_value=\"extrapolate\"\n", + " )\n", + "\n", + " # Compute new values at the uniform time grid\n", + " new_magnitude = interp_magnitude(new_time_grid)\n", + " new_phase = interp_phase(new_time_grid)\n", + "\n", + " # Pad missing values (last valid value)\n", + " last_magnitude = 0 # magnitude[-1]\n", + " last_phase = 0 # phase[-1]\n", + "\n", + " new_magnitude[len(old_time) :] = last_magnitude\n", + " new_phase[len(old_time) :] = last_phase\n", + "\n", + " # Store resampled data\n", + " for t, mag, ph in zip(new_time_grid, new_magnitude, new_phase):\n", + " resampled_data.append([id_val, freq, t, mag, ph])\n", + "\n", + "# Create a new DataFrame\n", + "STFT_results__resampled = pd.DataFrame(\n", + " resampled_data, columns=[\"id\", \"Frequency\", \"Time\", \"Magnitude\", \"Phase\"]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute the Mean STFT across all participants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Compute mean Magnitude and Phase for each (Frequency, Time) pair\n", + "mean_stft = STFT_results__resampled.groupby(\n", + " [\"Frequency\", \"Time\"], as_index=False\n", + ").mean()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Plot the STFT Magnitude Spectrum (provide a plotting function)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "\n", + "# Create a new figure\n", + "plt.figure(figsize=(10, 6))\n", + "\n", + "# Reshape data into 2D arrays for plotting\n", + "mag_matrix = mean_stft.pivot(\n", + " index=\"Frequency\", columns=\"Time\", values=\"Magnitude\"\n", + ").values\n", + "plt.pcolormesh(\n", + " np.array(mean_stft[\"Time\"].unique()),\n", + " np.array(mean_stft[\"Frequency\"].unique()),\n", + " mag_matrix,\n", + " shading=\"gouraud\",\n", + ")\n", + "plt.colorbar(label=\"Magnitude\")\n", + "plt.ylabel(\"Frequency [Hz]\")\n", + "plt.ylim(0, 6)\n", + "plt.xlabel(\"Time [sec]\")\n", + "plt.title(\"STFT Magnitude Spectrum\")\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2) Compute the STFT for the sway of one participant: Computation after feature extraction" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here, we are going to extract the feature of the sway of each participant by substracting a filtered trajectory to the raw trajectory.\n", + "We are then going to project this feature on the referential of the filtered trajectory to deal with trajectory changes.\n", + "Finally, we will boost the signal using a simple coefficient to increase visibility." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from scipy.ndimage import uniform_filter1d\n", + "\n", + "## Smooth out swaying usin a gausian filtering of velocity\n", + "window_size = int(TrajectoryData_FT.frame_rate / 2)\n", + "\n", + "\n", + "# Function to apply moving average filter to a group\n", + "def apply_moving_average_filter(group):\n", + " group[\"x_filtered\"] = uniform_filter1d(\n", + " group[X_COL], size=window_size, mode=\"nearest\"\n", + " )\n", + " group[\"y_filtered\"] = uniform_filter1d(\n", + " group[Y_COL], size=window_size, mode=\"nearest\"\n", + " )\n", + " return group\n", + "\n", + "\n", + "# Apply the filter to each group\n", + "trajectory_data = (\n", + " TrajectoryData_FT.data.groupby(\"id\")\n", + " .apply(apply_moving_average_filter, include_groups=True)\n", + " .droplevel(0)\n", + ")\n", + "\n", + "## Remove the limit filtered data\n", + "# Compute the min and max frame for each id\n", + "min_frame = trajectory_data.groupby(\"id\")[\"frame\"].transform(\"min\")\n", + "max_frame = trajectory_data.groupby(\"id\")[\"frame\"].transform(\"max\")\n", + "\n", + "# Filter out the first and last N frames for each id\n", + "trajectory_data = trajectory_data[\n", + " ~(\n", + " (trajectory_data[\"frame\"] <= min_frame + (window_size - 1))\n", + " | (trajectory_data[\"frame\"] >= max_frame - (window_size - 1))\n", + " )\n", + "]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute fluctuation vector by substracting the filtered trajectory " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Compute fluctuation vector\n", + "trajectory_data[\"x_fluctuation\"] = (\n", + " trajectory_data[X_COL] - trajectory_data[\"x_filtered\"]\n", + ")\n", + "trajectory_data[\"y_fluctuation\"] = (\n", + " trajectory_data[Y_COL] - trajectory_data[\"y_filtered\"]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### project fluctuation on the manifold orthogonal to filtered trajectory" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Compute directional vector (difference between consecutive filtered points)\n", + "opperation_df = trajectory_data.copy()\n", + "opperation_df[\"dx_filtered\"] = opperation_df.groupby(\"id\")[\"x_filtered\"].diff()\n", + "opperation_df[\"dy_filtered\"] = opperation_df.groupby(\"id\")[\"y_filtered\"].diff()\n", + "\n", + "# Fill NaN values (first row per \"id\" where diff() results in NaN)\n", + "opperation_df[\"dx_filtered\"] = opperation_df.groupby(\"id\")[\n", + " \"dx_filtered\"\n", + "].fillna(method=\"bfill\")\n", + "opperation_df[\"dy_filtered\"] = opperation_df.groupby(\"id\")[\n", + " \"dy_filtered\"\n", + "].fillna(method=\"bfill\")\n", + "\n", + "\n", + "# Normalize the directional vector (to get unit vector)\n", + "norm = np.sqrt(\n", + " opperation_df[\"dx_filtered\"] ** 2 + opperation_df[\"dy_filtered\"] ** 2\n", + ")\n", + "opperation_df[\"dx_filtered_unit\"] = opperation_df[\"dx_filtered\"] / norm\n", + "opperation_df[\"dy_filtered_unit\"] = opperation_df[\"dy_filtered\"] / norm\n", + "\n", + "# Compute the orthogonal unit vector\n", + "opperation_df[\"dx_ortho\"] = -opperation_df[\"dy_filtered_unit\"]\n", + "opperation_df[\"dy_ortho\"] = opperation_df[\"dx_filtered_unit\"]\n", + "\n", + "# Compute scalar product (dot product) with the orthogonal vector\n", + "trajectory_data[\"projected_fluctuation\"] = (\n", + " opperation_df[\"x_fluctuation\"] * opperation_df[\"dx_ortho\"]\n", + " + opperation_df[\"y_fluctuation\"] * opperation_df[\"dy_ortho\"]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Computation of the STFT" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Group the data by 'id' and apply the STFT function\n", + "\n", + "stft_results = trajectory_data.groupby(\"id\")[\"projected_fluctuation\"].apply(\n", + " lambda x: compute_STFT(\n", + " x,\n", + " TrajectoryData_FT.frame_rate,\n", + " segments_length=5 * TrajectoryData_FT.frame_rate,\n", + " overlap_length=3 * TrajectoryData_FT.frame_rate,\n", + " zeros_padded=20 * TrajectoryData_FT.frame_rate,\n", + " )\n", + ")\n", + "\n", + "# Reset index to makes \"id\" a column instead of an index.\n", + "stft_results = stft_results.reset_index()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Normalisation of the results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Find the maximum Time value and its associated id\n", + "max_time_row = stft_results.loc[stft_results[\"Time\"].idxmax()]\n", + "max_time = max_time_row[\"Time\"]\n", + "max_time_id = max_time_row[\"id\"]\n", + "\n", + "\n", + "# print(f\"Max Time: {max_time}, Associated ID: {max_time_id}\")\n", + "\n", + "# Determine time resolution of the series for the ID with max Time\n", + "max_time_series = stft_results[stft_results[\"id\"] == max_time_id]\n", + "time_resolution = np.diff(np.sort(max_time_series[\"Time\"].unique())).min()\n", + "\n", + "# print(f\"Time resolution for ID {max_time_id}: {time_resolution}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Resample all time series and interpolate missing values\n", + "from scipy.interpolate import interp1d\n", + "\n", + "# Create a new uniform time grid for resampling\n", + "new_time_grid = np.arange(0, max_time + time_resolution, time_resolution)\n", + "\n", + "# loop over all IDs and frequencies\n", + "resampled_data = []\n", + "for id_val, group in stft_results.groupby(\"id\"):\n", + " frequencies = group[\"Frequency\"].unique() # Get unique frequencies\n", + "\n", + " for freq in frequencies:\n", + " freq_group = group[group[\"Frequency\"] == freq]\n", + "\n", + " old_time = freq_group[\"Time\"].values\n", + " magnitude = freq_group[\"Magnitude\"].values\n", + " phase = freq_group[\"Phase\"].values\n", + "\n", + " # Interpolation functions\n", + " interp_magnitude = interp1d(\n", + " old_time, magnitude, kind=\"linear\", fill_value=\"extrapolate\"\n", + " )\n", + " interp_phase = interp1d(\n", + " old_time, phase, kind=\"linear\", fill_value=\"extrapolate\"\n", + " )\n", + "\n", + " # Compute new values at the uniform time grid\n", + " new_magnitude = interp_magnitude(new_time_grid)\n", + " new_phase = interp_phase(new_time_grid)\n", + "\n", + " # Pad missing values (last valid value)\n", + " last_magnitude = 0 # magnitude[-1]\n", + " last_phase = 0 # phase[-1]\n", + "\n", + " new_magnitude[len(old_time) :] = last_magnitude\n", + " new_phase[len(old_time) :] = last_phase\n", + "\n", + " # Store resampled data\n", + " for t, mag, ph in zip(new_time_grid, new_magnitude, new_phase):\n", + " resampled_data.append([id_val, freq, t, mag, ph])\n", + "\n", + "# Create a new DataFrame\n", + "STFT_results__resampled = pd.DataFrame(\n", + " resampled_data, columns=[\"id\", \"Frequency\", \"Time\", \"Magnitude\", \"Phase\"]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute the Mean STFT across all participants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Compute mean Magnitude and Phase for each (Frequency, Time) pair\n", + "mean_stft = STFT_results__resampled.groupby(\n", + " [\"Frequency\", \"Time\"], as_index=False\n", + ").mean()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Plot the STFT Magnitude Spectrum " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "# Create a new figure\n", + "plt.figure(figsize=(10, 6))\n", + "\n", + "# Reshape data into 2D arrays for plotting\n", + "mag_matrix = mean_stft.pivot(\n", + " index=\"Frequency\", columns=\"Time\", values=\"Magnitude\"\n", + ").values\n", + "plt.pcolormesh(\n", + " np.array(mean_stft[\"Time\"].unique()),\n", + " np.array(mean_stft[\"Frequency\"].unique()),\n", + " mag_matrix,\n", + " shading=\"gouraud\",\n", + ")\n", + "plt.colorbar(label=\"Magnitude\")\n", + "plt.ylabel(\"Frequency [Hz]\")\n", + "plt.ylim(0, 4)\n", + "plt.xlabel(\"Time [sec]\")\n", + "plt.title(\"STFT Magnitude Spectrum\")\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Welch Spectral Distribution" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1) Distribution for a single individual" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a figure of a single trajectory and the extracted sway feature\n", + "fig, axs = plt.subplots(1, 3, figsize=(10, 8))\n", + "\n", + "single_tajectory_data = trajectory_data[trajectory_data[\"id\"] == 1]\n", + "\n", + "\n", + "# Plot x_filtered\n", + "axs[0].plot(\n", + " single_tajectory_data[\"x_filtered\"],\n", + " single_tajectory_data[\"y_filtered\"],\n", + " label=\"filtered trajectory\",\n", + ")\n", + "axs[0].plot(\n", + " single_tajectory_data[\"x\"],\n", + " single_tajectory_data[\"y\"],\n", + " label=\"original trajectory\",\n", + ")\n", + "axs[0].set_xlabel(\"x\")\n", + "axs[0].set_xlabel(\"y\")\n", + "axs[0].legend()\n", + "axs[0].set_aspect(\"equal\")\n", + "\n", + "# Plot x_fluctuation\n", + "# Create a color map\n", + "cmap = plt.get_cmap(\"viridis\")\n", + "# Map the frame values to RGB values\n", + "colors = cmap(\n", + " np.array(single_tajectory_data[\"frame\"])\n", + " / single_tajectory_data[\"frame\"].max()\n", + ")\n", + "# Create a line plot\n", + "axs[1].scatter(\n", + " single_tajectory_data[\"x_fluctuation\"],\n", + " single_tajectory_data[\"y_fluctuation\"],\n", + " color=colors,\n", + ")\n", + "axs[1].set_xlabel(\"y_fluctuation\")\n", + "axs[1].set_ylabel(\"x_fluctuation\")\n", + "\n", + "\n", + "# Plot norm_fluctuation\n", + "axs[2].scatter(\n", + " single_tajectory_data[\"projected_fluctuation\"],\n", + " single_tajectory_data[\"frame\"],\n", + " color=colors,\n", + ")\n", + "axs[2].set_xlabel(\"projected fluctuation\")\n", + "axs[2].set_ylabel(\"frame\")\n", + "\n", + "\n", + "# Add a colorbar for frame values\n", + "import matplotlib.cm as cm\n", + "import matplotlib.colors as mcolors\n", + "\n", + "norm = mcolors.Normalize(vmin=single_tajectory_data[\"frame\"].max(), vmax=0)\n", + "cmap = plt.get_cmap(\"viridis\")\n", + "sm = cm.ScalarMappable(cmap=cmap, norm=norm)\n", + "cbar = plt.colorbar(sm, ax=axs[2])\n", + "cbar.set_label(\"Frame Progression\")\n", + "\n", + "# Adjust layout and display the plot\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Computation of Welch Spectral Distribution\n", + "from pedpy import compute_welch_spectral_distribution\n", + "\n", + "welch_distribution_result = compute_welch_spectral_distribution(\n", + " single_tajectory_data[\"projected_fluctuation\"],\n", + " TrajectoryData_FT.frame_rate,\n", + " segments_length=5 * TrajectoryData_FT.frame_rate,\n", + " overlap_length=4 * TrajectoryData_FT.frame_rate,\n", + " zeros_padded=50 * TrajectoryData_FT.frame_rate,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Plot the Welch Spectral Distribution\n", + "plt.figure(figsize=(10, 6))\n", + "plt.plot(\n", + " welch_distribution_result[\"Frequency\"], welch_distribution_result[\"Power\"]\n", + ")\n", + "plt.xlabel(\"Frequency [Hz]\")\n", + "plt.ylabel(\"Power Spectral Density $[m^2/Hz]$\")\n", + "plt.title(\"Welch Spectral Distribution\")\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2) Distribution for all participants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Claculation of the Welch Spectral Distribution for all participants\n", + "welch_results = trajectory_data.groupby(\"id\")[\"projected_fluctuation\"].apply(\n", + " lambda x: compute_welch_spectral_distribution(\n", + " x,\n", + " TrajectoryData_FT.frame_rate,\n", + " segments_length=TrajectoryData_FT.frame_rate * 5,\n", + " overlap_length=TrajectoryData_FT.frame_rate * 2,\n", + " zeros_padded=TrajectoryData_FT.frame_rate * 50,\n", + " )\n", + ")\n", + "\n", + "# Reset index to makes \"id\" a column instead of an index.\n", + "welch_results = welch_results.reset_index()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Plot Welch Spectral Distribution\n", + "\n", + "# First calculate the lengths for each ID\n", + "lengths = trajectory_data.groupby(\"id\")[\"projected_fluctuation\"].size()\n", + "\n", + "# Normalize lengths to use as colormap values\n", + "norm_lengths = (lengths) / (lengths.max())\n", + "\n", + "# Create the plot\n", + "plt.figure(figsize=(10, 6))\n", + "colormap = plt.cm.rainbow\n", + "\n", + "for id in welch_results[\"id\"].unique():\n", + " mask = welch_results[\"id\"] == id\n", + " color = colormap(norm_lengths[id]) # Get color based on normalized length\n", + " plt.plot(\n", + " welch_results[mask][\"Frequency\"],\n", + " welch_results[mask][\"Power\"],\n", + " color=color,\n", + " alpha=0.5,\n", + " )\n", + "\n", + "plt.title(\"Participant's Power Spectral Density \")\n", + "plt.xlabel(\"Frequency [Hz]\")\n", + "plt.ylabel(\"Power Spectral Density $[m^2/Hz]$\")\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Plot Nornalized Welch Spectral Distribution\n", + "\n", + "for id in welch_results[\"id\"].unique():\n", + " mask = welch_results[\"id\"] == id\n", + " color = colormap(norm_lengths[id]) # Get color based on normalized length\n", + " plt.plot(\n", + " welch_results[mask][\"Frequency\"],\n", + " welch_results[mask][\"Power\"]\n", + " / welch_results[mask][\n", + " \"Power\"\n", + " ].sum(), # Normalize each PSD by the total power. This way we have relative distribution of power over frequencies, rather than absolute values, allowing direct comparison.\n", + " color=color,\n", + " alpha=0.5,\n", + " )\n", + "\n", + "plt.title(\"Participant's Power Spectral Density \")\n", + "plt.xlabel(\"Frequency [Hz]\")\n", + "plt.ylabel(\"Normalized Power Spectral Density $[1/Hz]$\")\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Comparing Sway Frequency and Walking Speed " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pedpy-venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/pedpy/__init__.py b/pedpy/__init__.py index cba5c3d8..c688322f 100644 --- a/pedpy/__init__.py +++ b/pedpy/__init__.py @@ -124,6 +124,10 @@ compute_species, compute_voronoi_speed, ) +from .methods.temporal_analysis import ( + compute_STFT, + compute_welch_spectral_distribution, +) from .plotting.plotting import ( PEDPY_BLUE, PEDPY_GREEN, @@ -192,6 +196,7 @@ "is_species_valid", "is_trajectory_valid", "compute_pair_distribution_function", + "compute_STFT", "DensityMethod", "SpeedMethod", "compute_density_profile", diff --git a/pedpy/methods/temporal_analysis.py b/pedpy/methods/temporal_analysis.py new file mode 100644 index 00000000..5b4ae4ee --- /dev/null +++ b/pedpy/methods/temporal_analysis.py @@ -0,0 +1,179 @@ +"""Module containing functions to compute temporal analysis methods. + +For example: Short-Time Fourier Transform (STFT) and Welch's method for spectral analysis. +""" + +# import warnings +# from typing import Tuple + +# import numpy as np +# import numpy.typing as npt +# import pandas as pd +# from scipy.spatial.distance import cdist + +import numpy as np +import pandas as pd +from scipy.signal import stft, welch + + +def compute_STFT( + signal_series: str, + frame_rate: int, + segments_length: int = None, + overlap_length: int = None, + zeros_padded: int = None, + window: str = "hann", +) -> pd.DataFrame: + """Computes the Short-Time Fourier Transform (STFT) of a signal. + + This function calculates the time-frequency representation of a signal + using the Short-Time Fourier Transform (STFT). The STFT provides + information about the frequency content of the signal over time by + computing the Fourier Transform within a sliding window. + + The output consists of the magnitude and phase of the STFT, allowing + for both amplitude and phase analysis. + + .. math:: + STFT\{x[n]\}(m, k) = \sum_{n=-\infty}^{\infty} x[n] w[n - m] e^{-j 2 \pi k n / N} + + where :math:`x[n]` is the discrete-time signal, :math:`w[n]` is the + window function, :math:`m` is the time index, :math:`k` is the + frequency index, and :math:`N` is the number of FFT points. + + Args: + signal_series (pd.Series): A pandas Series containing data values mesured with a constant time interval. + # frame_index_series (pd.Series): the pandas Series containing the time frame index associated + # with each signal value. + frame_rate (int): The frame rate of the signal data. The frame rate + have to remain constant thought the whole dataset. + segments_length (int, optional): Length of each segment for the STFT window. + Defaults to one teenth of signal_series length. + overlap_length (int, optional): Number of overlapping points between + segments. Defaults to None (half of `segments_length` is used). + zeros_padded (int, optional): Number of FFT points. Defaults to None + (same as `segments_length`). + window (str or tuple, optional): The window function to apply before + computing the STFT. Defaults is `'hann'` (corresponds to a `'hann'` + window). Other options are `'hamming'`, `'bartlett'`, `'blackman'`, + `'boxcar'`, `'triang'`, etc. + Returns: + pd.DataFrame: A DataFrame containing the following columns: + - `"Frequency"`: The frequency bins of the STFT. + - `"Time"`: The time bins corresponding to the STFT computation. + - `"Magnitude"`: The absolute magnitude of the STFT at each + time-frequency point. + - `"Phase"`: The phase of the STFT at each time-frequency point. + """ + + # Set default segments_length if None + if segments_length is None: + segments_length = frame_rate * 5 + + # Set default overlap_length if None + if overlap_length is None: + overlap_length = segments_length // 2 + + # Set default zeros_padded it None: + if zeros_padded is None: + zeros_padded = 2 * segments_length + + # Extract signal data and compute STFT + f, t, Zxx = stft( + signal_series.values, + fs=frame_rate, + nperseg=segments_length, + noverlap=overlap_length, + nfft=zeros_padded, + window=window, + ) + + # Convert STFT result into a Pandas DataFrame + stft_df = pd.DataFrame( + { + "Frequency": np.repeat( + f, len(t) + ), # Repeat each frequency value for each time step + "Time": np.tile(t, len(f)), # Tile time values across frequencies + "Magnitude": np.abs( + Zxx + ).flatten(), # Flatten the STFT magnitude values + "Phase": np.angle(Zxx).flatten(), # Flatten the phase values + } + ) + + return stft_df + + +def compute_welch_spectral_distribution( + signal_series: str, + frame_rate: int, + segments_length: int = None, + overlap_length: int = None, + zeros_padded: int = None, + window: str = "hann", +) -> pd.DataFrame: + """Computes the spectral distribution of given temporal series. + + some decription + + .. math:: + some equation, + + Args: + traj_data: TrajectoryData, an object containing the trajectories. + column_series: tuple, an array-like object containing the temporal + series for which the spectral distribution is to be computed using fft mothod from ...library... + windowing_method: str, the method to be used for windowing the + temporal series. Can be "square", "hanning", or "hamming". #and more + lower_bound: int, the lower bound of the frequency range for the + spectral distribution. + upper_bound: int, the upper bound of the frequency range for the + spectral distribution. + zerro_padding: int, the number of zeros to be added to the end of the + temporal series for zero-padding. + temporal_averaging_window: float, the length of the window to be used + for temporal averaging. + Returns: + pd.DataFrame: A pandas DataFrame containing the frequency bins and the + corresponding values of the spectral distribution. + """ + + # Set default segments_length if None + if segments_length is None: + segments_length = len(signal_series) // 3 + + # Set default overlap_length if None + if overlap_length is None: + overlap_length = segments_length // 2 + + # Set default zeros_padded it None: + if zeros_padded is None: + zeros_padded = 2 * segments_length + + # Extract signal data and compute STFT + f, Pxx = welch( + signal_series.values, + fs=frame_rate, + nperseg=segments_length, + noverlap=overlap_length, + nfft=zeros_padded, + window=window, + ) + + # Convert result into a Pandas DataFrame + welch_df = pd.DataFrame( + { + "Frequency": f, # frequency values + "Power": Pxx, # Power spectral density + } + ) + + return welch_df + + +# based on: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.welch.html#scipy.signal.welch + + +# Also interesting: STFFT to analysi temporal evolution of the spectum +# based on: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.html#scipy.signal.ShortTimeFFT