Skip to content

Commit

Permalink
Merge pull request #20 from pupil-labs/calibration_trunc
Browse files Browse the repository at this point in the history
truncate superfluous leading dim from calibration data
  • Loading branch information
dourvaris committed Aug 19, 2024
2 parents 3567822 + ec7aa5e commit f51201a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 72 deletions.
2 changes: 1 addition & 1 deletion examples/basic_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
print("Recording Info:")
print(f"\tStart time (ns): {recording.start_ts_ns}")
print(f"\tWearer : {recording.wearer['name']}")
print(f"\tDevice serial : {recording.serial}")
print(f"\tDevice serial : {recording.device_serial}")
print(f"\tGaze samples : {len(recording.gaze)}")
print("")

Expand Down
93 changes: 46 additions & 47 deletions src/pupil_labs/neon_recording/calib.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,48 @@
import pathlib
from dataclasses import dataclass

import numpy as np

from . import structlog

log = structlog.get_logger(__name__)


@dataclass
class Calibration:
camera_matrix: np.ndarray
distortion_coefficients: np.ndarray
extrinsics_affine_matrix: np.ndarray


def _parse_calib_bin(rec_dir: pathlib.Path):
log.debug("NeonRecording: Loading calibration.bin data")

calib_raw_data: bytes = b""
try:
with open(rec_dir / "calibration.bin", "rb") as f:
calib_raw_data = f.read()
except Exception as e:
log.exception(f"Unexpected error loading calibration.bin: {e}")
raise

log.debug("NeonRecording: Parsing calibration data")

return np.frombuffer(
calib_raw_data,
np.dtype(
[
("version", "u1"),
("serial", "6a"),
("scene_camera_matrix", "(3,3)d"),
("scene_distortion_coefficients", "8d"),
("scene_extrinsics_affine_matrix", "(4,4)d"),
("right_camera_matrix", "(3,3)d"),
("right_distortion_coefficients", "8d"),
("right_extrinsics_affine_matrix", "(4,4)d"),
("left_camera_matrix", "(3,3)d"),
("left_distortion_coefficients", "8d"),
("left_extrinsics_affine_matrix", "(4,4)d"),
("crc", "u4"),
]
),
import typing as T
import numpy.typing as npt


class Calibration(T.NamedTuple):
dtype = np.dtype(
[
("version", "u1"),
("serial", "6a"),
("scene_camera_matrix", "(3,3)d"),
("scene_distortion_coefficients", "8d"),
("scene_extrinsics_affine_matrix", "(4,4)d"),
("right_camera_matrix", "(3,3)d"),
("right_distortion_coefficients", "8d"),
("right_extrinsics_affine_matrix", "(4,4)d"),
("left_camera_matrix", "(3,3)d"),
("left_distortion_coefficients", "8d"),
("left_extrinsics_affine_matrix", "(4,4)d"),
("crc", "u4"),
],
)

version: int
serial: str
scene_camera_matrix: npt.NDArray[np.float64]
scene_distortion_coefficients: npt.NDArray[np.float64]
scene_extrinsics_affine_matrix: npt.NDArray[np.float64]
right_camera_matrix: npt.NDArray[np.float64]
right_distortion_coefficients: npt.NDArray[np.float64]
right_extrinsics_affine_matrix: npt.NDArray[np.float64]
left_camera_matrix: npt.NDArray[np.float64]
left_distortion_coefficients: npt.NDArray[np.float64]
left_extrinsics_affine_matrix: npt.NDArray[np.float64]
crc: int

def __getitem__(self, key):
if isinstance(key, str):
return getattr(self, key)
return self[key]

@classmethod
def from_buffer(cls, buffer: bytes):
return cls(*np.frombuffer(buffer, cls)[0])

@classmethod
def from_file(cls, path: str):
return cls(*np.fromfile(path, cls)[0])
29 changes: 5 additions & 24 deletions src/pupil_labs/neon_recording/neon_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Union

from . import structlog
from .calib import Calibration, _parse_calib_bin
from .calib import Calibration
from .stream.gaze_stream import GazeStream
from .stream.event_stream import EventStream
from .stream.imu import IMUStream
Expand All @@ -23,10 +23,8 @@ class NeonRecording:
* `start_ts_ns` (int): Start timestamp in nanoseconds
* `start_ts` (float): Start timestamp in seconds
* `wearer` (dict): Wearer information containing uuid and name
* `serial` (int): Serial number of the device
* `scene_camera_calibration` (Calibration): Scene camera calibration data
* `right_eye_camera_calibration` (Calibration): Right eye camera calibration data
* `left_eye_camera_calibration` (Calibration): Left eye camera calibration data
* `calibration` (Calibration): Camera calibration data
* `device_serial` (str): Serial number of the device
* `streams` (dict): data streams of the recording
"""

Expand Down Expand Up @@ -63,25 +61,8 @@ def __init__(self, rec_dir_in: Union[pathlib.Path, str]):
self.wearer["name"] = wearer_data["name"]

log.debug("NeonRecording: Loading calibration data")
self._calib = _parse_calib_bin(self._rec_dir)

self.calib_version = str(self._calib["version"])
self.serial = int(self._calib["serial"][0])
self.scene_camera_calibration = Calibration(
self._calib["scene_camera_matrix"],
self._calib["scene_distortion_coefficients"],
self._calib["scene_extrinsics_affine_matrix"],
)
self.right_eye_camera_calibration = Calibration(
self._calib["right_camera_matrix"],
self._calib["right_distortion_coefficients"],
self._calib["right_extrinsics_affine_matrix"],
)
self.left_eye_camera_calibration = Calibration(
self._calib["left_camera_matrix"],
self._calib["left_distortion_coefficients"],
self._calib["left_extrinsics_affine_matrix"],
)
self.calibration = Calibration.from_file(self._rec_dir / "calibration.bin")
self.device_serial = self.calibration.serial.decode()

self.streams = {
"audio": None,
Expand Down

0 comments on commit f51201a

Please sign in to comment.