import time
import cv2
import numpy as np
import tensorflow as tf
from skimage.measure import label, regionprops
from tensorflow.keras.models import load_model
from .camera import Camera
from .fileio import BufferedFileWriter, FileWriter
try: # Py ≥ 3.9
from importlib.resources import files
except ImportError: # Py ≤ 3.8
from importlib_resources import files
[docs]
class Meye:
"""
Pupil segmentation and basic shape extraction using a pre-trained neural network.
This class loads a Keras/TensorFlow segmentation model and exposes a :meth:`predict`
method that:
1. converts the input frame to grayscale (if needed)
2. resizes it to the model input size (hardcoded to 128x128 in this implementation)
3. runs inference to obtain a pupil mask
4. optionally performs morphological post-processing to isolate the pupil region
5. optionally fits an ellipse to estimate major/minor diameters and orientation
Attributes
----------
model_path : str or pathlib.Path
Path to the Keras model file used for inference.
model : tensorflow.keras.Model
Loaded Keras model.
requiredFrameSize : tuple[int, int]
Expected model input frame size (height, width) derived from the model input.
Note: this implementation still resizes to 128x128 in :meth:`predict`.
centroid : tuple[float, float] or float
Centroid (row, col) of the largest detected pupil region after post-processing.
Set to ``(np.nan, np.nan)`` when no pupil is found.
pupil_size : float
Number of non-zero pixels in the resized pupil mask (in the original image size).
major_diameter : float
Major axis length from an ellipse fit (in pixels), if available.
minor_diameter : float
Minor axis length from an ellipse fit (in pixels), if available.
orientation : float
Ellipse orientation angle (degrees), if available.
Notes
-----
- This class prints GPU availability on initialization (no logging).
- The model is assumed to return two outputs (``mask, info``). Only ``mask`` is used.
- Coordinate conventions:
- centroid from :func:`skimage.measure.regionprops` is (row, col).
- the recorders write centroid as (x=col, y=row) by swapping indices.
"""
def __init__(self, model=None):
"""
Initialize the detector and load the segmentation model.
Parameters
----------
model : str or pathlib.Path or None, optional
Path to a ``.h5`` Keras model. If ``None``, loads the bundled
``meye-2022-01-24.h5`` model from the ``meyelens.models`` package.
"""
print("Num GPUs Available: ", len(tf.config.list_physical_devices("GPU")))
if model is None:
self.model_path = files("meyelens.models").joinpath("meye-2022-01-24.h5")
else:
self.model_path = model
self.model = load_model(self.model_path)
# Derive required input size from the model's first input tensor.
shape = self.model.input_shape
if isinstance(shape, list):
shape = shape[0]
self.requiredFrameSize = tuple(shape[1:3])
# Public outputs (updated on every predict call).
self.centroid = np.nan
self.pupil_size = np.nan
self.major_diameter = np.nan
self.minor_diameter = np.nan
self.orientation = np.nan
[docs]
def predict(self, img, post_proc: bool = True, morph: bool = True, fill_ellipse: bool = False):
"""
Predict a pupil mask and centroid from an input image.
Parameters
----------
img : numpy.ndarray
Input frame, grayscale (H, W) or BGR (H, W, 3).
post_proc : bool, optional
If ``True``, apply :meth:`morphProcessing` to binarize, keep the
largest connected component, and perform morphological closing.
If ``False``, the raw network output is used and centroid is set to (0, 0).
morph : bool, optional
If ``True``, attempt ellipse fitting on the post-processed mask to estimate
major/minor diameters and orientation.
fill_ellipse : bool, optional
If ``True``, replace the mask with a filled ellipse fitted to the contour
(useful for smoothing irregular segmentations).
Returns
-------
pupil_resized : numpy.ndarray
Processed pupil mask resized back to the original image size.
Pixel values are 0/255 when post-processing is enabled.
centroid : tuple[float, float]
Centroid of the detected pupil region in (row, col) format.
Notes
-----
- The model input is normalized to [0, 1] and shaped as (1, H, W, 1).
- This implementation resizes inputs to 128x128 regardless of
:attr:`requiredFrameSize`.
"""
if len(img.shape) == 3:
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Resize to model input size used by this implementation.
crop = cv2.resize(img, [128, 128])
networkInput = crop.astype(np.float32) / 255.0
networkInput = networkInput[None, :, :, None]
mask, info = self.model(networkInput) # 'info' is unused but kept for model compatibility
pupil = mask[0, :, :, 0]
if post_proc:
pupil, centroid = self.morphProcessing(pupil)
else:
centroid = (0, 0)
# Optional ellipse fitting to estimate geometric features.
if morph:
contours, _ = cv2.findContours(pupil, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
contour = max(contours, key=cv2.contourArea)
try:
ellipse = cv2.fitEllipse(contour)
center, axes, angle = ellipse
self.major_diameter = max(axes)
self.minor_diameter = min(axes)
self.orientation = angle
except Exception as e:
self.major_diameter = np.nan
self.minor_diameter = np.nan
self.orientation = np.nan
print(e)
if fill_ellipse:
pupil = self.fit_ellipse_and_fill(pupil)
# Resize mask back to the original frame size.
pupil_resized = cv2.resize(pupil, (img.shape[1], img.shape[0]))
# Update public state.
self.pupil_size = np.sum(pupil_resized > 0)
self.centroid = centroid
return pupil_resized, centroid
[docs]
def morphProcessing(self, sourceImg, thr: float = 0.8):
"""
Post-process the raw model output to isolate the pupil region.
Steps
-----
1. Threshold the model output at ``thr``
2. Label connected components
3. Keep only the largest component
4. Apply morphological closing with an elliptical kernel
Parameters
----------
sourceImg : numpy.ndarray
Raw model output mask (float array in [0, 1]).
thr : float, optional
Threshold used to binarize the mask.
Returns
-------
morph : numpy.ndarray
Post-processed binary mask as uint8 with values 0 or 255.
centroid : tuple[float, float]
Centroid of the largest component in (row, col) format.
Returns ``(np.nan, np.nan)`` if no component is found.
"""
binarized = sourceImg > thr
label_img = label(binarized)
regions = regionprops(label_img)
if len(regions) == 0:
morph = np.zeros(sourceImg.shape, dtype="uint8")
centroid = (np.nan, np.nan)
return morph, centroid
# Keep only the largest connected component.
regions.sort(key=lambda x: x.area, reverse=True)
centroid = regions[0].centroid
for rg in regions[1:]:
label_img[rg.coords[:, 0], rg.coords[:, 1]] = 0
label_img[label_img != 0] = 1
biggest_region = (label_img * 255).astype(np.uint8)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (13, 13))
morph = cv2.morphologyEx(biggest_region, cv2.MORPH_CLOSE, kernel)
return morph, centroid
[docs]
@staticmethod
def fit_ellipse_and_fill(mask):
"""
Fit an ellipse to the pupil mask and return a filled ellipse mask.
Parameters
----------
mask : numpy.ndarray
Binary mask (uint8) typically with values 0/255.
Returns
-------
numpy.ndarray
New mask where the pupil is represented by a filled ellipse
(0/255). If ellipse fitting is not possible, returns the input mask.
Notes
-----
- Uses the convex hull of the largest contour to stabilize ellipse fitting.
- OpenCV requires at least 5 points to fit an ellipse.
"""
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return mask
largest_contour = max(contours, key=cv2.contourArea)
hull = cv2.convexHull(largest_contour)
if len(hull) >= 5:
ellipse = cv2.fitEllipse(hull)
(x, y), (major_axis, minor_axis), angle = ellipse
filled_image = np.zeros_like(mask, dtype=np.uint8)
cv2.ellipse(
filled_image,
(int(x), int(y)),
(int(major_axis / 2), int(minor_axis / 2)),
angle,
0,
360,
255,
thickness=-1,
)
return filled_image
return mask
[docs]
@staticmethod
def overlay_roi(mask, roi, ratios=(0.7, 0.3)):
"""
Overlay a pupil mask over a region of interest (ROI) image.
Parameters
----------
mask : numpy.ndarray
Binary mask to overlay (expected 0/255).
roi : numpy.ndarray
Base image (typically BGR) to overlay on.
ratios : tuple[float, float], optional
Blending ratios for (roi, mask_color) passed to
:func:`cv2.addWeighted`.
Returns
-------
numpy.ndarray
Blended image for visualization.
"""
overlay = roi.copy()
mask_colored = Meye.mask2color(mask, 1)
overlay = cv2.addWeighted(overlay, ratios[0], mask_colored, ratios[1], 0)
return overlay
[docs]
@staticmethod
def mask2color(mask, channel: int = 1):
"""
Convert a single-channel mask into a 3-channel color image.
Parameters
----------
mask : numpy.ndarray
2D mask.
channel : int, optional
Channel to place the mask in:
- 0: red
- 1: green
- 2: blue
Returns
-------
numpy.ndarray
3-channel image with the mask in the selected channel.
"""
if channel == 0:
out = cv2.merge([mask, np.zeros_like(mask), np.zeros_like(mask)])
elif channel == 1:
out = cv2.merge([np.zeros_like(mask), mask, np.zeros_like(mask)])
else:
out = cv2.merge([np.zeros_like(mask), np.zeros_like(mask), mask])
return out
[docs]
def preview(self, cam):
"""
Run a real-time preview loop showing pupil segmentation overlay.
Parameters
----------
cam : Camera
Camera instance providing frames via :meth:`Camera.get_frame`.
Returns
-------
None
Notes
-----
Press "q" in the preview window to exit.
"""
print('### MEYE ### Press "q" to exit preview.')
while True:
frame = cam.get_frame()
predicted = self.predict(frame)[0]
overlay = self.overlay_roi(predicted, frame)
cam.show(overlay, name="Preview")
if cam.wait_key("q"):
break
cv2.destroyWindow("Preview")
[docs]
class MeyeRecorder:
"""
Synchronous frame-by-frame recorder using :class:`Meye` and :class:`FileWriter`.
This recorder captures frames from a :class:`~.camera.Camera`, runs pupil detection,
and writes one line per frame to a semicolon-separated text file.
The output columns are:
- time (seconds since start)
- x, y (centroid coordinates; written as col, row)
- pupil (mask area in pixels)
- major_diameter, minor_diameter, orientation (ellipse fit; may be NaN)
- trg1 ... trg9 (user-defined trigger values)
Parameters
----------
cam_ind : int, optional
Camera index passed to :class:`~.camera.Camera`.
model : str or pathlib.Path or None, optional
Path to the Keras model file. If ``None``, the packaged model is used.
show_preview : bool, optional
If ``True``, display an overlay window during recording.
filename : str, optional
Base filename used by :class:`FileWriter` (timestamp is added automatically).
folder_path : str, optional
Output folder where the text file is created.
sep : str, optional
Column separator used in the output file.
Notes
-----
This recorder writes synchronously to disk at each :meth:`save_frame` call.
"""
def __init__(
self,
cam_ind=0,
model=None,
show_preview=False,
filename="meye",
folder_path="Data",
sep=";",
):
self.cam = Camera(cam_ind)
self.filename = filename
self.show_preview = show_preview
self.meye = Meye(model=model)
self.folder_path = folder_path
self.sep = sep
self.writer = None
self.frame = None
self.predicted = None
# Trigger placeholders (kept for external code that may read these attributes).
self.trg1 = 0
self.trg2 = 0
self.trg3 = 0
self.trg4 = 0
self.trg5 = 0
self.trg6 = 0
self.trg7 = 0
self.trg8 = 0
self.trg9 = 0
[docs]
def start(self) -> None:
"""
Start recording and initialize the output file.
Returns
-------
None
"""
print("### MEYE RECORDER ### Start.")
self.running = True
self.writer = FileWriter(path_to_file=self.folder_path, filename=self.filename, sep=self.sep)
self.writer.write_sv(
[
"time",
"x",
"y",
"pupil",
"major_diameter",
"minor_diameter",
"orientation",
"trg1",
"trg2",
"trg3",
"trg4",
"trg5",
"trg6",
"trg7",
"trg8",
"trg9",
]
)
self.time_start = time.time()
[docs]
def preview(self) -> None:
"""
Open a live preview window (segmentation overlay).
Returns
-------
None
"""
self.meye.preview(self.cam)
[docs]
def save_frame(
self,
trg1=0,
trg2=0,
trg3=0,
trg4=0,
trg5=0,
trg6=0,
trg7=0,
trg8=0,
trg9=0,
) -> None:
"""
Capture one frame, run pupil detection, and append a row to the output file.
Parameters
----------
trg1, trg2, trg3, trg4, trg5, trg6, trg7, trg8, trg9 : int, optional
Trigger values to save alongside the measurements.
Returns
-------
None
"""
if self.time_start is None or self.writer is None:
print("### MEYE ASYNC RECORDER ### Recording not started!")
return
self.frame = self.cam.get_frame()
self.predicted = self.meye.predict(self.frame)[0]
timestamp = time.time() - self.time_start
# Note: centroid is (row, col); here we store x=col, y=row.
self.writer.write_sv(
[
timestamp,
self.meye.centroid[1],
self.meye.centroid[0],
self.meye.pupil_size,
self.meye.major_diameter,
self.meye.minor_diameter,
self.meye.orientation,
trg1,
trg2,
trg3,
trg4,
trg5,
trg6,
trg7,
trg8,
trg9,
]
)
if self.show_preview:
overlay = self.meye.overlay_roi(self.predicted, self.frame)
self.cam.show(overlay, name="Recording")
[docs]
def get_data(self):
"""
Capture one frame and return instantaneous pupil metrics.
Returns
-------
dict
Dictionary containing:
- ``centroid``: (x, y) as (col, row)
- ``size``: pupil mask area in pixels
- ``major_diameter``: ellipse major axis in pixels (or NaN)
- ``minor_diameter``: ellipse minor axis in pixels (or NaN)
- ``orientation``: ellipse angle (degrees) (or NaN)
"""
self.frame = self.cam.get_frame()
self.predicted = self.meye.predict(self.frame)[0]
data = {
"centroid": (self.meye.centroid[1], self.meye.centroid[0]),
"size": self.meye.pupil_size,
"major_diameter": self.meye.major_diameter,
"minor_diameter": self.meye.minor_diameter,
"orientation": self.meye.orientation,
}
return data
[docs]
def stop(self) -> None:
"""
Stop recording and close the output file.
Returns
-------
None
"""
print("### MEYE RECORDER ### File Closed.")
self.writer.close()
[docs]
def close(self) -> None:
"""
Release the camera resource.
Returns
-------
None
"""
print("### MEYE RECORDER ### Camera Closed.")
self.cam.close()
[docs]
def close_all(self) -> None:
"""
Stop recording and release all resources (file + camera).
Returns
-------
None
"""
print("### MEYE RECORDER ### Closed.")
self.stop()
self.close()
[docs]
class MeyeAsyncRecorder:
"""
Asynchronous frame-by-frame recorder using :class:`Meye` and :class:`BufferedFileWriter`.
This recorder is similar to :class:`MeyeRecorder`, but data rows are queued in memory and
written to disk by a background thread, reducing I/O latency inside tight loops.
Parameters
----------
cam_ind : int, optional
Camera index passed to :class:`~.camera.Camera`.
model : str or pathlib.Path or None, optional
Path to the Keras model file. If ``None``, the packaged model is used.
show_preview : bool, optional
If ``True``, display an overlay window during recording.
path_to_file : str, optional
Output folder where the text file is created.
filename : str, optional
Base filename used by :class:`BufferedFileWriter` (timestamp is added automatically).
buffer_size : int, optional
Queue size for :class:`BufferedFileWriter`. When full, new rows are discarded and
a warning is printed by the writer (no logging).
sep : str, optional
Column separator used in the output file.
cam_crop : list[int] or tuple[int, int, int, int] or None, optional
Crop passed to :class:`~.camera.Camera` (implementation-dependent).
Notes
-----
You must call :meth:`stop` (or :meth:`close_all`) to flush remaining queued data.
"""
def __init__(
self,
cam_ind=0,
model=None,
show_preview=False,
path_to_file="Data",
filename="meye",
buffer_size=100,
sep=";",
cam_crop=None,
):
self.cam = Camera(cam_ind, crop=cam_crop)
self.show_preview = show_preview
self.meye = Meye(model=model)
self.frame = None
self.predicted = None
self.filename = filename
self.path_to_file = path_to_file
self.buffer_size = buffer_size
self.sep = sep
self.writer = None
self.time_start = None
[docs]
def start(self, metadata=None) -> None:
"""
Start recording and initialize the asynchronous output writer.
Parameters
----------
metadata : dict or None, optional
Metadata passed to :class:`BufferedFileWriter` and written as comment
lines at the top of the file.
Returns
-------
None
"""
self.writer = BufferedFileWriter(
self.path_to_file,
filename=self.filename,
buffer_size=self.buffer_size,
headers=[
"time",
"x",
"y",
"pupil",
"major_diameter",
"minor_diameter",
"orientation",
"trg1",
"trg2",
"trg3",
"trg4",
"trg5",
"trg6",
"trg7",
"trg8",
"trg9",
],
metadata=metadata,
sep=self.sep,
)
self.time_start = time.time()
print("### MEYE ASYNC RECORDER ### Start.")
[docs]
def preview(self) -> None:
"""
Open a live preview window (segmentation overlay).
Returns
-------
None
"""
self.meye.preview(self.cam)
[docs]
def save_frame(
self,
trg1=0,
trg2=0,
trg3=0,
trg4=0,
trg5=0,
trg6=0,
trg7=0,
trg8=0,
trg9=0,
) -> None:
"""
Capture one frame, run pupil detection, and queue a row for disk writing.
Parameters
----------
trg1, trg2, trg3, trg4, trg5, trg6, trg7, trg8, trg9 : int, optional
Trigger values to save alongside the measurements.
Returns
-------
None
"""
self.frame = self.cam.get_frame()
self.predicted = self.meye.predict(self.frame)[0]
timestamp = time.time() - self.time_start
# Note: centroid is (row, col); here we store x=col, y=row.
self.writer.write_sv(
[
timestamp,
self.meye.centroid[1],
self.meye.centroid[0],
self.meye.pupil_size,
self.meye.major_diameter,
self.meye.minor_diameter,
self.meye.orientation,
trg1,
trg2,
trg3,
trg4,
trg5,
trg6,
trg7,
trg8,
trg9,
]
)
if self.show_preview:
overlay = self.meye.overlay_roi(self.predicted, self.frame)
self.cam.show(overlay, name="Recording")
[docs]
def get_data(self):
"""
Capture one frame and return instantaneous pupil metrics.
Returns
-------
dict
Dictionary containing:
- ``centroid``: (x, y) as (col, row)
- ``size``: pupil mask area in pixels
- ``major_diameter``: ellipse major axis in pixels (or NaN)
- ``minor_diameter``: ellipse minor axis in pixels (or NaN)
- ``orientation``: ellipse angle (degrees) (or NaN)
"""
self.frame = self.cam.get_frame()
self.predicted = self.meye.predict(self.frame)[0]
data = {
"centroid": (self.meye.centroid[1], self.meye.centroid[0]),
"size": self.meye.pupil_size,
"major_diameter": self.meye.major_diameter,
"minor_diameter": self.meye.minor_diameter,
"orientation": self.meye.orientation,
}
return data
[docs]
def stop(self) -> None:
"""
Stop recording and close the output file (flushes queued data).
Returns
-------
None
"""
print("### MEYE ASYNC RECORDER ### File Closed.")
self.writer.close()
[docs]
def close(self) -> None:
"""
Release the camera resource.
Returns
-------
None
"""
print("### MEYE ASYNC RECORDER ### Camera Closed.")
self.cam.close()
[docs]
def close_all(self) -> None:
"""
Stop recording and release all resources (writer + camera).
Returns
-------
None
"""
print("### MEYE ASYNC RECORDER ### Closed.")
self.stop()
self.close()
# if __name__ == '__main__':
# from camera import Camera
# cam = Camera(1, crop=[100, 220, 250, 270])
# meye = Meye()
# meye.preview(cam)