Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix loading 200 Hz gaze data in Pupil Invisible recordings #2204

Merged
merged 3 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 217 additions & 1 deletion pupil_src/shared_modules/pupil_recording/update/invisible.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
import re
import shutil
import tempfile
import typing as T
from pathlib import Path

import av
import file_methods as fm
import methods as m
import numpy as np
import player_methods as pm
from version_utils import parse_version
from video_capture.utils import pi_gaze_items

from ..info import RecordingInfoFile
from ..info import recording_info_utils as utils
Expand Down Expand Up @@ -320,3 +321,218 @@ def _pi_raw_time_load(path):
@staticmethod
def _pi_raw_time_save(path, arr):
arr.tofile(str(path))


def pi_gaze_items(root_dir):
"""Yields one (location, timestamp, confidence) triplet for each gaze point

Pupil Invisible Companion records this information into three different sets of
files. Their names can be matched by the following regex patterns:
- `^gaze ps[0-9]+.raw$`
- `^gaze ps[0-9]+.time$`
- `^worn ps[0-9]+.raw$`

The worn data is a stream of values of either 0 or 255, indicating that the glasses
were (not) worn. Pupil Player maps these to gaze confidence values of 0.0 and 1.0
respectively.

Since all `*.time` files are converted to Pupil Player before this function is being
called, we match the `^gaze ps[0-9]+_timestamps.npy$` pattern on the recording files
instead. When looking for the location and worn data, the function just replaces the
necessary parts of the timestamp file names instead of performing separate regex
matches.

If the recording was successfully post-processed and downloaded from Pupil Cloud, it
will contain 200Hz-densified gaze data. This data replaces the real-time recorded
data by Pupil Invisible Companion and is stored in three files:
- `gaze_200hz.raw`
- `gaze_200hz.time` (or `gaze_200hz_timestamps.npy` if upgraded)
- `worn_200hz.raw`

The worn data is a special case as it was introduced at different points in time to
Pupil Invisible Companion and Pupil Cloud. In other words, it is possible that there
is no worn data, only real-time recorded worn data, or 200 Hz worn data. The latter
is preferred. If 200 Hz gaze data is only available with real-time recorded worn
data, the latter is interpolated to 200 Hz using a k-nearest-neighbour (k=1)
approach. If no worn data is available, or the numbers of worn samples and gaze
timestamps are not consistent, Pupil Player assumes a confidence value of 1.0 for
every gaze point.
"""
root_dir = Path(root_dir)
# This pattern will match any filename that:
# - starts with "gaze ps"
# - is followed by one or more digits
# - ends with "_timestamps.npy"
timestamps_realtime_pattern = r"^gaze ps[0-9]+_timestamps.npy$"
timestamps_realtime_paths = matched_files_by_name_pattern(
root_dir, timestamps_realtime_pattern
)
# Use 200hz data only if both gaze data and timestamps are available at 200hz
raw_200hz_path = _find_raw_200hz_path(root_dir)
timestamps_200hz_path = _find_timestamps_200hz_path(root_dir)
if raw_200hz_path and timestamps_200hz_path:
worn_200hz_path = _find_worn_200hz_path(root_dir)
yield from _pi_posthoc_200hz_gaze_items(
raw_200hz_path,
timestamps_200hz_path,
worn_200hz_path,
timestamps_realtime_paths,
)
else:
yield from _pi_realtime_recorded_gaze_items(timestamps_realtime_paths)


def _pi_posthoc_200hz_gaze_items(
raw_200hz_path, timestamps_200hz_path, worn_200hz_path, timestamps_realtime_paths
):
raw_data = _load_raw_data(raw_200hz_path)
timestamps = _load_timestamps_data(timestamps_200hz_path)

if worn_200hz_path is not None:
conf_data = _load_worn_data(worn_200hz_path)
else:
conf_data = _find_and_load_densified_worn_data(
timestamps, timestamps_realtime_paths
)

raw_data, timestamps = _equalize_length_if_necessary(raw_data, timestamps)
conf_data = _validated_conf_data(conf_data, timestamps)
yield from zip(raw_data, timestamps, conf_data)


def _pi_realtime_recorded_gaze_items(timestamps_realtime_paths):
for timestamps_path in timestamps_realtime_paths:
raw_data = _load_raw_data(_find_raw_path(timestamps_path))
timestamps = _load_timestamps_data(timestamps_path)
conf_data = _load_worn_data(_find_worn_path(timestamps_path))

raw_data, timestamps = _equalize_length_if_necessary(raw_data, timestamps)
conf_data = _validated_conf_data(conf_data, timestamps)
yield from zip(raw_data, timestamps, conf_data)


def _find_timestamps_200hz_path(root_dir: Path):
path = root_dir / "gaze_200hz_timestamps.npy"
if path.is_file():
return path
else:
return None


def _find_raw_200hz_path(root_dir: Path):
path = root_dir / "gaze_200hz.raw"
if path.is_file():
return path
else:
return None


def _find_worn_200hz_path(root_dir: Path):
path = root_dir / "worn_200hz.raw"
if path.is_file():
return path
else:
return None


def _find_raw_path(timestamps_path: Path):
name = timestamps_path.name.replace("_timestamps", "")
path = timestamps_path.with_name(name).with_suffix(".raw")
assert path.is_file(), f"The file does not exist at path: {path}"
return path


def _find_worn_path(timestamps_path: Path):
name = timestamps_path.name
name = name.replace("gaze", "worn")
name = name.replace("_timestamps", "")
path = timestamps_path.with_name(name).with_suffix(".raw")
if path.is_file():
return path
else:
return None


def _load_timestamps_data(path):
timestamps = np.load(str(path))
return timestamps


def _load_raw_data(path):
raw_data = np.fromfile(str(path), "<f4")
raw_data_dtype = raw_data.dtype
raw_data.shape = (-1, 2)
return np.asarray(raw_data, dtype=raw_data_dtype)


def _load_worn_data(path: Path):
if not (path and path.exists()):
return None

confidences = np.fromfile(str(path), "<u1") / 255.0
return np.clip(confidences, 0.0, 1.0)


def _find_and_load_densified_worn_data(
timestamps_200hz, timestamps_realtime_paths: T.List[Path]
):
if not timestamps_realtime_paths:
return None
# Load and densify confidence data when 200hz gaze is available, but only
# non-200hz confidence is available
conf_data, timestamps_realtime = _find_and_load_realtime_recorded_worn_data(
timestamps_realtime_paths
)
densification_idc = pm.find_closest(timestamps_realtime, timestamps_200hz)
return conf_data[densification_idc]


def _find_and_load_realtime_recorded_worn_data(timestamps_realtime_paths: T.List[Path]):
# assumes at least one path in `timestamps_realtime_paths`, otherwise np.concatenate
# will raise ValueError: need at least one array to concatenate
assert (
len(timestamps_realtime_paths) > 0
), "Requires at least one real-time recorded gaze timestamp path"
conf_all = []
ts_all = []
for timestamps_path in timestamps_realtime_paths:
ts = _load_timestamps_data(timestamps_path)
conf_data = _load_worn_data(_find_worn_path(timestamps_path))
conf_data = _validated_conf_data(conf_data, ts)
conf_all.append(conf_data)
ts_all.append(ts)
conf_all = np.concatenate(conf_all)
ts_all = np.concatenate(ts_all)
return conf_all, ts_all


def _equalize_length_if_necessary(raw_data, timestamps):
if len(raw_data) != len(timestamps):
logger.warning(
f"There is a mismatch between the number of raw data ({len(raw_data)}) "
f"and the number of timestamps ({len(timestamps)})!"
)
size = min(len(raw_data), len(timestamps))
raw_data = raw_data[:size]
timestamps = timestamps[:size]
return raw_data, timestamps


def _validated_conf_data(conf_data, timestamps):
if conf_data is not None and len(conf_data) != len(timestamps):
logger.warning(
"There is a mismatch between the number of confidence data "
f"({len(conf_data)}) and the number of timestamps ({len(timestamps)})! "
"Not using confidence data."
)
conf_data = None
if conf_data is None:
conf_data = np.ones(len(timestamps))
return conf_data


def matched_files_by_name_pattern(parent_dir: Path, name_pattern: str) -> T.List[Path]:
# Get all non-recursive directory contents
contents = filter(Path.is_file, parent_dir.iterdir())
# Filter content that matches the name by regex pattern
return sorted(c for c in contents if re.match(name_pattern, c.name) is not None)
Loading