pupil_blink_timeline.png) with blink-rate, pupil mean, and variability over blocks
pupil_blink_block_summary.csv)
pupil_blink_early_vs_late.csv)
blinks.csv and pupil_positions.csv. Optionally, gaze_positions.csv is used to anchor recording start time.
blinks.csv exported from Pupil Labspupil_positions.csv exported from Pupil Labsgaze_positions.csvpandasnumpymatplotlib
blinks.csv and pupil_positions.csv in your working folder (and optionally gaze_positions.csv).blinks.csv: start_timestamp, durationconfidence (filter with --confidence-threshold)pupil_positions.csv:pupil_timestampdiameter_3d (preferred) or diameterconfidence, methodgaze_positions.csv:gaze_timestamp to anchor recording start.
30 s)pupil_blink_timeline.pngpupil_blink_block_summary.csvpupil_blink_early_vs_late.csvoutputs folder by default.
blinks.csv and pupil_positions.csv in the same folder as the script and run the code
"""
@author: Fjorda
"""
from __future__ import annotations
import argparse
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BLINKS = SCRIPT_DIR / "blinks.csv"
DEFAULT_PUPIL = SCRIPT_DIR / "pupil_positions.csv"
DEFAULT_GAZE = SCRIPT_DIR / "gaze_positions.csv"
DEFAULT_OUT = SCRIPT_DIR / "outputs"
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Tutorial 9: pupil + blink timeline")
p.add_argument("--blinks", type=Path, default=DEFAULT_BLINKS, help="Path to blinks.csv")
p.add_argument("--pupil", type=Path, default=DEFAULT_PUPIL, help="Path to pupil_positions.csv")
p.add_argument("--gaze", type=Path, default=DEFAULT_GAZE, help="Path to gaze_positions.csv (for start anchor)")
p.add_argument("--out-dir", type=Path, default=DEFAULT_OUT)
p.add_argument("--label", type=str, default="Session")
p.add_argument("--confidence-threshold", type=float, default=0.6)
p.add_argument("--block-sec", type=float, default=30.0, help="Timeline block width in seconds")
p.add_argument("--pupil-smooth-sec", type=float, default=0.4, help="Rolling median window for pupil smoothing")
p.add_argument("--recording-start-ts", type=float, default=None, help="Optional absolute recording start timestamp")
return p.parse_args()
def _resolve_existing(path: Path, name: str) -> Path:
p = path.expanduser().resolve()
if not p.is_file():
raise FileNotFoundError(f"{name} not found: {p}")
return p
def load_blinks(path: Path, conf_thr: float) -> pd.DataFrame:
df = pd.read_csv(path)
need = {"start_timestamp", "duration"}
miss = need - set(df.columns)
if miss:
raise ValueError(f"Missing blink columns: {sorted(miss)}")
if "confidence" in df.columns:
df = df[df["confidence"] >= conf_thr].copy()
df = df.dropna(subset=["start_timestamp", "duration"]).copy()
df["start_timestamp"] = pd.to_numeric(df["start_timestamp"], errors="coerce")
df["duration"] = pd.to_numeric(df["duration"], errors="coerce")
df = df.dropna(subset=["start_timestamp", "duration"]).copy()
if len(df) and float(np.nanmedian(df["duration"].to_numpy(dtype=float))) > 10.0:
df["duration"] = df["duration"] / 1000.0
df = df[df["duration"] > 0].copy()
df = df.sort_values("start_timestamp").reset_index(drop=True)
return df
def load_pupil(path: Path, conf_thr: float) -> pd.DataFrame:
df = pd.read_csv(path)
if "pupil_timestamp" not in df.columns:
raise ValueError("Missing pupil_timestamp column in pupil_positions.csv")
value_col = "diameter_3d" if "diameter_3d" in df.columns else ("diameter" if "diameter" in df.columns else None)
if value_col is None:
raise ValueError("Missing pupil diameter column. Expected diameter_3d or diameter.")
if "confidence" in df.columns:
df = df[df["confidence"] >= conf_thr].copy()
if "method" in df.columns:
rank = df["method"].astype(str).str.contains("pye3d", case=False, na=False).astype(int)
df = df.assign(_rank=rank).sort_values(["pupil_timestamp", "_rank"], ascending=[True, False])
df = df.drop_duplicates(subset=["pupil_timestamp"], keep="first").drop(columns=["_rank"])
else:
df = df.drop_duplicates(subset=["pupil_timestamp"], keep="first")
df["pupil_timestamp"] = pd.to_numeric(df["pupil_timestamp"], errors="coerce")
df[value_col] = pd.to_numeric(df[value_col], errors="coerce")
df = df.dropna(subset=["pupil_timestamp", value_col]).copy()
df = df.sort_values("pupil_timestamp").reset_index(drop=True)
df = df.rename(columns={value_col: "pupil_value"})
return df
def _recording_start(gaze_path: Path, explicit: float | None, blink_df: pd.DataFrame, pupil_df: pd.DataFrame) -> float:
if explicit is not None:
return float(explicit)
if gaze_path.is_file():
g = pd.read_csv(gaze_path)
if "gaze_timestamp" in g.columns:
ts = pd.to_numeric(g["gaze_timestamp"], errors="coerce").dropna()
if len(ts):
return float(ts.min())
return float(min(blink_df["start_timestamp"].min(), pupil_df["pupil_timestamp"].min()))
def smooth_pupil(df: pd.DataFrame, window_sec: float) -> pd.DataFrame:
out = df.copy()
t = out["pupil_timestamp"].to_numpy(dtype=float)
if len(t) < 3:
out["pupil_smooth"] = out["pupil_value"]
return out
dt = float(np.median(np.diff(t)))
n = int(max(3, round(max(window_sec, 0.05) / max(dt, 1e-6))))
if n % 2 == 0:
n += 1
out["pupil_smooth"] = out["pupil_value"].rolling(window=n, center=True, min_periods=1).median()
return out
def build_block_table(blinks: pd.DataFrame, pupil: pd.DataFrame, t0: float, block_sec: float) -> pd.DataFrame:
b = blinks.copy()
p = pupil.copy()
b["t"] = b["start_timestamp"] - t0
p["t"] = p["pupil_timestamp"] - t0
b = b[b["t"] >= 0].copy()
p = p[p["t"] >= 0].copy()
if b.empty or p.empty:
raise ValueError("Not enough aligned data after timestamp anchoring.")
t_end = float(max(b["t"].max(), p["t"].max()))
edges = np.arange(0.0, t_end + block_sec, block_sec, dtype=float)
if len(edges) < 2:
edges = np.array([0.0, max(block_sec, 1.0)], dtype=float)
starts = b["t"].to_numpy(dtype=float)
ibis = np.diff(starts) if len(starts) >= 2 else np.array([], dtype=float)
ibi_times = starts[1:] if len(starts) >= 2 else np.array([], dtype=float)
rows: list[dict[str, float | int]] = []
for i in range(len(edges) - 1):
s, e = float(edges[i]), float(edges[i + 1])
bm = (b["t"] >= s) & (b["t"] < e if i < len(edges) - 2 else b["t"] <= e)
pm = (p["t"] >= s) & (p["t"] < e if i < len(edges) - 2 else p["t"] <= e)
im = (ibi_times >= s) & (ibi_times < e if i < len(edges) - 2 else ibi_times <= e)
n_blinks = int(np.sum(bm))
blink_rate = float(n_blinks / max(e - s, 1e-9) * 60.0)
ibi_med = float(np.median(ibis[im])) if np.any(im) else np.nan
ps = p.loc[pm, "pupil_smooth"].to_numpy(dtype=float)
pupil_mean = float(np.mean(ps)) if len(ps) else np.nan
pupil_std = float(np.std(ps)) if len(ps) else np.nan
rows.append(
{
"block_index": i + 1,
"block_start_s": s,
"block_end_s": e,
"blink_count": n_blinks,
"blink_rate_per_min": blink_rate,
"ibi_median_s": ibi_med,
"pupil_mean": pupil_mean,
"pupil_std": pupil_std,
}
)
return pd.DataFrame(rows)
def early_late_summary(blocks: pd.DataFrame) -> pd.DataFrame:
if len(blocks) < 3:
thirds = max(1, len(blocks) // 2)
else:
thirds = max(1, len(blocks) // 3)
early = blocks.iloc[:thirds]
late = blocks.iloc[-thirds:]
out = pd.DataFrame(
[
{
"segment": "early",
"n_blocks": int(len(early)),
"blink_rate_per_min": float(np.nanmean(early["blink_rate_per_min"])),
"ibi_median_s": float(np.nanmean(early["ibi_median_s"])),
"pupil_mean": float(np.nanmean(early["pupil_mean"])),
"pupil_std": float(np.nanmean(early["pupil_std"])),
},
{
"segment": "late",
"n_blocks": int(len(late)),
"blink_rate_per_min": float(np.nanmean(late["blink_rate_per_min"])),
"ibi_median_s": float(np.nanmean(late["ibi_median_s"])),
"pupil_mean": float(np.nanmean(late["pupil_mean"])),
"pupil_std": float(np.nanmean(late["pupil_std"])),
},
]
)
return out
def plot_timeline(blocks: pd.DataFrame, label: str, out_path: Path) -> None:
x = blocks["block_index"].to_numpy(dtype=float)
fig = plt.figure(figsize=(13.2, 8.2))
gs = fig.add_gridspec(3, 1, hspace=0.22)
ax1 = fig.add_subplot(gs[0, 0])
ax2 = fig.add_subplot(gs[1, 0])
ax3 = fig.add_subplot(gs[2, 0])
fig.subplots_adjust(left=0.08, right=0.98, bottom=0.08, top=0.93)
ax1.bar(x, blocks["blink_rate_per_min"], color="#7a0018", alpha=0.85, width=0.75, edgecolor="white", linewidth=0.3)
ax1.set_ylabel("Blink rate / min")
title = "Pupil + blink timeline"
if str(label).strip():
title = f"{title} ({label})"
ax1.set_title(title, fontsize=11)
ax1.grid(axis="y", alpha=0.22)
ax2.plot(x, blocks["pupil_mean"], color="#1f4e79", linewidth=1.8, marker="o", markersize=3.5, alpha=0.9)
ax2.set_ylabel("Pupil mean")
ax2.grid(axis="y", alpha=0.22)
ax3.plot(
x,
blocks["ibi_median_s"],
color="#6b8e23",
linewidth=1.8,
marker="o",
markersize=3.5,
alpha=0.9,
label="Median IBI",
)
ax3.set_ylabel("Median IBI (s)")
# Pupil variability has a much smaller numeric range than IBI.
# Plot on a secondary axis so its changes are visible.
ax3b = ax3.twinx()
ax3b.plot(
x,
blocks["pupil_std"],
color="#6a4c93",
linewidth=1.6,
marker="s",
markersize=3.2,
alpha=0.85,
label="Pupil std",
)
ax3b.set_ylabel("Pupil std")
ax3.set_xlabel("Block index")
# IBI is NaN in blocks with <2 blinks, which breaks line segments.
# Interpolate only for visualization so the trend remains readable.
ibi_vis = blocks["ibi_median_s"].interpolate(limit_direction="both")
ax3.lines[0].set_ydata(ibi_vis.to_numpy(dtype=float))
ax3.grid(axis="y", alpha=0.22)
h1, l1 = ax3.get_legend_handles_labels()
h2, l2 = ax3b.get_legend_handles_labels()
ax3.legend(h1 + h2, l1 + l2, loc="upper right", frameon=False, fontsize=8)
fig.savefig(out_path, dpi=220)
plt.close(fig)
def main() -> None:
args = parse_args()
blinks_path = _resolve_existing(args.blinks, "Blinks file")
pupil_path = _resolve_existing(args.pupil, "Pupil file")
gaze_path = args.gaze.expanduser().resolve()
out_dir = args.out_dir.expanduser().resolve()
out_dir.mkdir(parents=True, exist_ok=True)
blinks = load_blinks(blinks_path, conf_thr=args.confidence_threshold)
pupil = load_pupil(pupil_path, conf_thr=args.confidence_threshold)
if blinks.empty:
raise ValueError("No blink events after filtering.")
if pupil.empty:
raise ValueError("No pupil samples after filtering.")
t0 = _recording_start(gaze_path, args.recording_start_ts, blinks, pupil)
pupil = smooth_pupil(pupil, window_sec=args.pupil_smooth_sec)
blocks = build_block_table(blinks, pupil, t0=t0, block_sec=max(args.block_sec, 5.0))
seg = early_late_summary(blocks)
plot_timeline(blocks, label=args.label, out_path=out_dir / "pupil_blink_timeline.png")
blocks.to_csv(out_dir / "pupil_blink_block_summary.csv", index=False)
seg.to_csv(out_dir / "pupil_blink_early_vs_late.csv", index=False)
print(f"Outputs saved to: {out_dir}")
print(f"Blinks used: {blinks_path}")
print(f"Pupil used: {pupil_path}")
print(f"Gaze start anchor: {gaze_path if gaze_path.is_file() else 'not found; fallback used'}")
if __name__ == "__main__":
main()