Source code for meyelens.gaze

import os
from pathlib import Path

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import PolynomialFeatures, StandardScaler


[docs] class ScreenPositions: """ Generate a shuffled sequence of calibration target positions on a screen. The sequence always includes 5 fixed points: - center - top-left, top-right - bottom-left, bottom-right Optionally, additional random points can be added within the screen bounds. Parameters ---------- screen_width : float Screen width in degrees (or in any coordinate unit you consistently use). screen_height : float Screen height in degrees (or in any coordinate unit you consistently use). random_points : bool, optional If ``True``, add random positions in addition to the fixed ones. num_points : int, optional Total number of points in the returned sequence (including the 5 fixed points). If smaller than 5, only the 5 fixed points will be used. Notes ----- - Positions are shuffled once at initialization. - Coordinates are returned as (x, y) where the origin is the center. """ def __init__(self, screen_width, screen_height, random_points: bool = False, num_points: int = 5): self.screen_width_deg = screen_width self.screen_height_deg = screen_height self.random_points = random_points self.num_points = num_points self.positions = self._generate_positions() self.current_index = -1 def _generate_positions(self): """ Build and shuffle the list of target positions. Returns ------- list[tuple[float, float]] List of (x, y) target positions. """ fixed_points = [ (0, 0), # center (-self.screen_width_deg / 2, self.screen_height_deg / 2), # top-left (self.screen_width_deg / 2, self.screen_height_deg / 2), # top-right (-self.screen_width_deg / 2, -self.screen_height_deg / 2), # bottom-left (self.screen_width_deg / 2, -self.screen_height_deg / 2), # bottom-right ] # Decide how many random points to generate (never negative). n_random = max(0, self.num_points - len(fixed_points)) if self.random_points and n_random > 0: random_points = [ ( np.random.uniform(-self.screen_width_deg / 2, self.screen_width_deg / 2), np.random.uniform(-self.screen_height_deg / 2, self.screen_height_deg / 2), ) for _ in range(n_random) ] else: random_points = [] positions = fixed_points + random_points np.random.shuffle(positions) return positions
[docs] def next(self): """ Return the next target position in the shuffled sequence. Returns ------- tuple[float, float] or None Next (x, y) position, or ``None`` when the sequence is exhausted. """ self.current_index += 1 if self.current_index < len(self.positions): return self.positions[self.current_index] return None
[docs] class GazeData: """ Load, preprocess, and visualize gaze calibration recordings. This class expects recordings in a folder as ``*.txt`` (or any extension) with semicolon-separated columns, including at least: - ``x``, ``y``: gaze coordinates - ``trg1``: target identifier (used for plotting groups) - ``trg2``, ``trg3``: screen target coordinates (x, y) associated with each sample By default, if no folder is provided, data is stored under: ``~/Documents/GazeData`` Notes ----- - Missing ``x``/``y`` samples are linearly interpolated in both directions. - File discovery is based on a filename pattern used by your acquisition pipeline: the second dash-separated token must equal ``track_cal.txt``. """ def __init__(self, folder=None): """ Parameters ---------- folder : str or pathlib.Path or None, optional Folder containing gaze data files. If ``None``, uses ``~/Documents/GazeData`` (creating it if needed). """ if folder is None: documents = Path.home() / "Documents" documents.mkdir(parents=True, exist_ok=True) folder = documents / "GazeData" folder = Path(folder) folder.mkdir(parents=True, exist_ok=True) self.data_folder = folder # Discover recordings following the naming convention: # <prefix>-track_cal.txt-<suffix> ... OR ... <prefix>-track_cal.txt recs = os.listdir(folder) self.recs = [r for r in recs if len(r.split("-")) > 1 and r.split("-")[1] == "track_cal.txt"]
[docs] def list(self) -> None: """ Print the available recordings and their indices. Returns ------- None """ for i, fn in enumerate(self.recs): print(i, fn)
[docs] def get_last(self): """ Convenience method to load the most recent recording in the list. Returns ------- tuple[numpy.ndarray, numpy.ndarray] ``(gaze_points, screen_positions)`` where each has shape (n_samples, 2). Notes ----- This simply calls :meth:`get` with ``i=-1`` (last element in list). """ return self.get(-1)
[docs] def get(self, i: int = -1): """ Load a specific recording by index. Parameters ---------- i : int, optional Recording index (supports negative indexing). Returns ------- tuple[numpy.ndarray, numpy.ndarray] ``(gaze_points, screen_positions)`` where each has shape (n_samples, 2). Raises ------ IndexError If ``i`` is out of range for the available recordings. """ print(f"### GAZEDATA ### Loading rec: {self.recs[i]}") pupil = pd.read_csv(os.path.join(self.data_folder, self.recs[i]), sep=";") # Interpolate missing gaze samples. pupil[["x", "y"]] = pupil[["x", "y"]].interpolate( method="linear", limit_direction="both", inplace=False, ) screen_positions = pupil[["trg2", "trg3"]].values gaze_points = pupil[["x", "y"]].values return gaze_points, screen_positions
[docs] def get_all(self): """ Load and concatenate all recordings. Returns ------- tuple[numpy.ndarray, numpy.ndarray] ``(all_gaze_points, all_screen_positions)`` concatenated across recordings. Each has shape (n_total_samples, 2). Notes ----- Uses the same interpolation strategy as :meth:`get`. """ all_screen_positions = [] all_gaze_points = [] for r in self.recs: pupil = pd.read_csv(os.path.join(self.data_folder, r), sep=";") pupil[["x", "y"]] = pupil[["x", "y"]].interpolate( method="linear", limit_direction="both", inplace=False, ) all_screen_positions.append(pupil[["trg2", "trg3"]].values) all_gaze_points.append(pupil[["x", "y"]].values) return np.vstack(all_gaze_points), np.vstack(all_screen_positions)
[docs] def plot(self, i: int = -1, skip_samples: int = 20) -> None: """ Plot gaze traces grouped by target identifier for a given recording. Parameters ---------- i : int, optional Recording index (supports negative indexing). skip_samples : int, optional Number of samples to skip at the beginning of each target segment. This is useful to ignore the initial transient after a target switch. Returns ------- None Notes ----- - Uses ``trg1`` to group samples by target ID. - Plots ``y`` on the x-axis and ``x`` on the y-axis (matching your current convention), then inverts the y-axis. """ pupil = pd.read_csv(os.path.join(self.data_folder, self.recs[i]), sep=";") fig, ax = plt.subplots() colors = plt.cm.tab10.colors for j, pos in enumerate(pupil["trg1"].unique()): data = pupil[pupil["trg1"] == pos].iloc[skip_samples:] ax.plot( data["y"], data["x"], color=colors[j % len(colors)], marker="o", label=f"Target {pos}", ) plt.gca().invert_yaxis() plt.legend() plt.show()
[docs] class GazeModelPoly: """ Polynomial regression gaze calibration model. This model learns a mapping from raw gaze coordinates (e.g., pupil/eye coordinates) to screen target coordinates using polynomial feature expansion and linear regression. Workflow -------- 1. Call :meth:`train` with paired ``(gaze_points, screen_positions)``. 2. Call :meth:`predict` to map new gaze points to calibrated screen positions. 3. Optionally call :meth:`save` / :meth:`load` to persist the trained model. Notes ----- - Both inputs and targets are standardized using :class:`sklearn.preprocessing.StandardScaler`. - Training metrics (MSE and R²) are printed to stdout (no logging). """ def __init__(self): """ Initialize an untrained model with feature/target scalers. Attributes ---------- scaler_features : sklearn.preprocessing.StandardScaler Scaler fitted on gaze features. scaler_target : sklearn.preprocessing.StandardScaler Scaler fitted on screen target coordinates. poly_features : sklearn.preprocessing.PolynomialFeatures or None Polynomial feature expander created during training. calibration_model : sklearn.linear_model.LinearRegression or None Linear regression model fitted during training. """ self.scaler_features = StandardScaler() self.scaler_target = StandardScaler() self.poly_features = None self.calibration_model = None
[docs] def train(self, gaze_points, screen_positions, degree: int = 2) -> None: """ Train the calibration model. Parameters ---------- gaze_points : numpy.ndarray Raw gaze points of shape (n_samples, 2). screen_positions : numpy.ndarray Screen target positions of shape (n_samples, 2). degree : int, optional Polynomial degree used by :class:`sklearn.preprocessing.PolynomialFeatures`. Returns ------- None Notes ----- Training is performed in standardized space (both X and y). Reported MSE and R² are therefore in standardized units. """ # Standardize targets and features. screen_positions_std = self.scaler_target.fit_transform(screen_positions) gaze_points_std = self.scaler_features.fit_transform(gaze_points) # Polynomial expansion then linear regression. self.poly_features = PolynomialFeatures(degree=degree) transformed_gaze = self.poly_features.fit_transform(gaze_points_std) self.calibration_model = LinearRegression() self.calibration_model.fit(transformed_gaze, screen_positions_std) # Quick training-set diagnostics. calibration_predictions = self.calibration_model.predict(transformed_gaze) calibration_mse = mean_squared_error(screen_positions_std, calibration_predictions) calibration_r2 = r2_score(screen_positions_std, calibration_predictions) print(f"Calibration Model - MSE: {calibration_mse:.4f}, R2: {calibration_r2:.4f}")
[docs] def predict(self, new_eye_position): """ Predict calibrated screen coordinates for new gaze points. Parameters ---------- new_eye_position : numpy.ndarray New gaze points of shape (n_samples, 2). Returns ------- numpy.ndarray Predicted screen positions of shape (n_samples, 2), returned in the original (inverse-transformed) coordinate space. Raises ------ ValueError If the model has not been trained (or loaded) yet. """ if self.calibration_model is None or self.poly_features is None: raise ValueError("The model has not been trained yet.") normalized_input = self.scaler_features.transform(new_eye_position) transformed_input = self.poly_features.transform(normalized_input) calibrated_prediction_std = self.calibration_model.predict(transformed_input) return self.scaler_target.inverse_transform(calibrated_prediction_std)
[docs] def save(self, model_path: str | None = None) -> None: """ Save the trained model and preprocessing objects to disk using joblib. Parameters ---------- model_path : str or None, optional Output path. If ``None``, defaults to ``gaze_models/gazemodel_poly.pkl`` and creates the ``gaze_models`` folder if needed. Returns ------- None Notes ----- The saved bundle includes: - ``calibration_model`` - ``scaler_features`` - ``scaler_target`` - ``poly_features`` """ if model_path is None: os.makedirs("gaze_models", exist_ok=True) model_path = "gaze_models/gazemodel_poly.pkl" joblib.dump( { "calibration_model": self.calibration_model, "scaler_features": self.scaler_features, "scaler_target": self.scaler_target, "poly_features": self.poly_features, }, model_path, )
[docs] @staticmethod def load(model_path: str | None = None): """ Load a saved model bundle from disk. Parameters ---------- model_path : str or None, optional Path to the saved ``.pkl``. If ``None``, defaults to ``gaze_models/gazemodel_poly.pkl``. Returns ------- GazeModelPoly An instance populated with the loaded model and preprocessing objects. """ if model_path is None: model_path = "gaze_models/gazemodel_poly.pkl" data = joblib.load(model_path) instance = GazeModelPoly() instance.calibration_model = data["calibration_model"] instance.scaler_features = data["scaler_features"] instance.scaler_target = data["scaler_target"] instance.poly_features = data["poly_features"] return instance
if __name__ == "__main__": # Minimal example: print the generated calibration positions. poss = ScreenPositions(10, 10) while True: point = poss.next() if point: print(point) else: break