diff --git a/examples/basic_usage.py b/examples/basic_usage.py index 30728a9..e2a915d 100644 --- a/examples/basic_usage.py +++ b/examples/basic_usage.py @@ -18,7 +18,7 @@ print("Recording Info:") print(f"\tStart time (ns): {recording.start_ts_ns}") print(f"\tWearer : {recording.wearer['name']}") -print(f"\tDevice serial : {recording.serial}") +print(f"\tDevice serial : {recording.device_serial}") print(f"\tGaze samples : {len(recording.gaze)}") print("") diff --git a/src/pupil_labs/neon_recording/calib.py b/src/pupil_labs/neon_recording/calib.py index 18ebda0..7f0ffa5 100644 --- a/src/pupil_labs/neon_recording/calib.py +++ b/src/pupil_labs/neon_recording/calib.py @@ -1,49 +1,48 @@ -import pathlib -from dataclasses import dataclass - import numpy as np - -from . import structlog - -log = structlog.get_logger(__name__) - - -@dataclass -class Calibration: - camera_matrix: np.ndarray - distortion_coefficients: np.ndarray - extrinsics_affine_matrix: np.ndarray - - -def _parse_calib_bin(rec_dir: pathlib.Path): - log.debug("NeonRecording: Loading calibration.bin data") - - calib_raw_data: bytes = b"" - try: - with open(rec_dir / "calibration.bin", "rb") as f: - calib_raw_data = f.read() - except Exception as e: - log.exception(f"Unexpected error loading calibration.bin: {e}") - raise - - log.debug("NeonRecording: Parsing calibration data") - - return np.frombuffer( - calib_raw_data, - np.dtype( - [ - ("version", "u1"), - ("serial", "6a"), - ("scene_camera_matrix", "(3,3)d"), - ("scene_distortion_coefficients", "8d"), - ("scene_extrinsics_affine_matrix", "(4,4)d"), - ("right_camera_matrix", "(3,3)d"), - ("right_distortion_coefficients", "8d"), - ("right_extrinsics_affine_matrix", "(4,4)d"), - ("left_camera_matrix", "(3,3)d"), - ("left_distortion_coefficients", "8d"), - ("left_extrinsics_affine_matrix", "(4,4)d"), - ("crc", "u4"), - ] - ), +import typing as T +import numpy.typing as npt + + +class Calibration(T.NamedTuple): + dtype = np.dtype( + [ + ("version", "u1"), + ("serial", "6a"), + ("scene_camera_matrix", "(3,3)d"), + ("scene_distortion_coefficients", "8d"), + ("scene_extrinsics_affine_matrix", "(4,4)d"), + ("right_camera_matrix", "(3,3)d"), + ("right_distortion_coefficients", "8d"), + ("right_extrinsics_affine_matrix", "(4,4)d"), + ("left_camera_matrix", "(3,3)d"), + ("left_distortion_coefficients", "8d"), + ("left_extrinsics_affine_matrix", "(4,4)d"), + ("crc", "u4"), + ], ) + + version: int + serial: str + scene_camera_matrix: npt.NDArray[np.float64] + scene_distortion_coefficients: npt.NDArray[np.float64] + scene_extrinsics_affine_matrix: npt.NDArray[np.float64] + right_camera_matrix: npt.NDArray[np.float64] + right_distortion_coefficients: npt.NDArray[np.float64] + right_extrinsics_affine_matrix: npt.NDArray[np.float64] + left_camera_matrix: npt.NDArray[np.float64] + left_distortion_coefficients: npt.NDArray[np.float64] + left_extrinsics_affine_matrix: npt.NDArray[np.float64] + crc: int + + def __getitem__(self, key): + if isinstance(key, str): + return getattr(self, key) + return self[key] + + @classmethod + def from_buffer(cls, buffer: bytes): + return cls(*np.frombuffer(buffer, cls)[0]) + + @classmethod + def from_file(cls, path: str): + return cls(*np.fromfile(path, cls)[0]) diff --git a/src/pupil_labs/neon_recording/neon_recording.py b/src/pupil_labs/neon_recording/neon_recording.py index e7b1c07..7360a29 100644 --- a/src/pupil_labs/neon_recording/neon_recording.py +++ b/src/pupil_labs/neon_recording/neon_recording.py @@ -3,7 +3,7 @@ from typing import Union from . import structlog -from .calib import Calibration, _parse_calib_bin +from .calib import Calibration from .stream.gaze_stream import GazeStream from .stream.event_stream import EventStream from .stream.imu import IMUStream @@ -23,10 +23,8 @@ class NeonRecording: * `start_ts_ns` (int): Start timestamp in nanoseconds * `start_ts` (float): Start timestamp in seconds * `wearer` (dict): Wearer information containing uuid and name - * `serial` (int): Serial number of the device - * `scene_camera_calibration` (Calibration): Scene camera calibration data - * `right_eye_camera_calibration` (Calibration): Right eye camera calibration data - * `left_eye_camera_calibration` (Calibration): Left eye camera calibration data + * `calibration` (Calibration): Camera calibration data + * `device_serial` (str): Serial number of the device * `streams` (dict): data streams of the recording """ @@ -63,25 +61,8 @@ def __init__(self, rec_dir_in: Union[pathlib.Path, str]): self.wearer["name"] = wearer_data["name"] log.debug("NeonRecording: Loading calibration data") - self._calib = _parse_calib_bin(self._rec_dir) - - self.calib_version = str(self._calib["version"]) - self.serial = int(self._calib["serial"][0]) - self.scene_camera_calibration = Calibration( - self._calib["scene_camera_matrix"], - self._calib["scene_distortion_coefficients"], - self._calib["scene_extrinsics_affine_matrix"], - ) - self.right_eye_camera_calibration = Calibration( - self._calib["right_camera_matrix"], - self._calib["right_distortion_coefficients"], - self._calib["right_extrinsics_affine_matrix"], - ) - self.left_eye_camera_calibration = Calibration( - self._calib["left_camera_matrix"], - self._calib["left_distortion_coefficients"], - self._calib["left_extrinsics_affine_matrix"], - ) + self.calibration = Calibration.from_file(self._rec_dir / "calibration.bin") + self.device_serial = self.calibration.serial.decode() self.streams = { "audio": None,