Pupil + Blink Fatigue/Engagement Timeline in Python (Pupil Labs)

This guide shows how to combine blink and pupil signals into a block-wise session timeline.

By the end, you will generate:
- A figure (pupil_blink_timeline.png) with blink-rate, pupil mean, and variability over blocks
- A block summary table (pupil_blink_block_summary.csv)
- An early-vs-late summary table (pupil_blink_early_vs_late.csv)

Note. This workflow uses blinks.csv and pupil_positions.csv. Optionally, gaze_positions.csv is used to anchor recording start time.

Download blinks.csv

Download pupil_positions.csv

Download gaze_positions.csv

Download block summary table

Requirements

- Anaconda Python Development Environment
- blinks.csv exported from Pupil Labs
- pupil_positions.csv exported from Pupil Labs
- optional: gaze_positions.csv
- Python packages:
  - pandas
  - numpy
  - matplotlib

Setup

1. Install Anaconda if needed.
2. Open Spyder, Jupyter Notebook, or VS Code.
3. Put blinks.csv and pupil_positions.csv in your working folder (and optionally gaze_positions.csv).
4. Save the script in the same folder (or pass explicit paths with command-line options).

To use Spyder, install Anaconda, run it, and launch Spyder. If you see Install instead of Launch for Spyder, install Spyder first. Create a new file in Spyder and save it in the same directory as your data when you run the examples below.

Step 1 Data

We start with blink and pupil exports from Pupil Labs.

Important variables from blinks.csv:
- start_timestamp, duration
- optional: confidence (filter with --confidence-threshold)

Important variables from pupil_positions.csv:
- pupil_timestamp
- diameter_3d (preferred) or diameter
- optional: confidence, method

Optional from gaze_positions.csv:
- gaze_timestamp to anchor recording start.

Step 2 Build block-wise timeline metrics

In this step, you:
1. Load and filter blink and pupil data by confidence
2. Smooth pupil signal with a rolling median window
3. Split the session into fixed blocks (default 30 s)
4. Compute per-block blink rate, median IBI, pupil mean, and pupil variability
5. Compute early-vs-late summary values

The final plot gives a compact timeline view of changes across the session.

Step 3 Generate outputs

1. Write pupil_blink_timeline.png
2. Write pupil_blink_block_summary.csv
3. Write pupil_blink_early_vs_late.csv

Outputs are saved to your outputs folder by default.

Step 4 Run the Script

Put blinks.csv and pupil_positions.csv in the same folder as the script and run the code

Step 5 Code

Use the script file below. It computes block-wise blink and pupil metrics, then plots them in a multi-panel timeline.
                                        
                                            """
                                            @author: Fjorda
                                            """

                                            from __future__ import annotations

                                            import argparse
                                            from pathlib import Path

                                            import matplotlib.pyplot as plt
                                            import numpy as np
                                            import pandas as pd

                                            SCRIPT_DIR = Path(__file__).resolve().parent
                                            DEFAULT_BLINKS = SCRIPT_DIR / "blinks.csv"
                                            DEFAULT_PUPIL = SCRIPT_DIR / "pupil_positions.csv"
                                            DEFAULT_GAZE = SCRIPT_DIR / "gaze_positions.csv"
                                            DEFAULT_OUT = SCRIPT_DIR / "outputs"


                                            def parse_args() -> argparse.Namespace:
                                                p = argparse.ArgumentParser(description="Tutorial 9: pupil + blink timeline")
                                                p.add_argument("--blinks", type=Path, default=DEFAULT_BLINKS, help="Path to blinks.csv")
                                                p.add_argument("--pupil", type=Path, default=DEFAULT_PUPIL, help="Path to pupil_positions.csv")
                                                p.add_argument("--gaze", type=Path, default=DEFAULT_GAZE, help="Path to gaze_positions.csv (for start anchor)")
                                                p.add_argument("--out-dir", type=Path, default=DEFAULT_OUT)
                                                p.add_argument("--label", type=str, default="Session")
                                                p.add_argument("--confidence-threshold", type=float, default=0.6)
                                                p.add_argument("--block-sec", type=float, default=30.0, help="Timeline block width in seconds")
                                                p.add_argument("--pupil-smooth-sec", type=float, default=0.4, help="Rolling median window for pupil smoothing")
                                                p.add_argument("--recording-start-ts", type=float, default=None, help="Optional absolute recording start timestamp")
                                                return p.parse_args()


                                            def _resolve_existing(path: Path, name: str) -> Path:
                                                p = path.expanduser().resolve()
                                                if not p.is_file():
                                                    raise FileNotFoundError(f"{name} not found: {p}")
                                                return p


                                            def load_blinks(path: Path, conf_thr: float) -> pd.DataFrame:
                                                df = pd.read_csv(path)
                                                need = {"start_timestamp", "duration"}
                                                miss = need - set(df.columns)
                                                if miss:
                                                    raise ValueError(f"Missing blink columns: {sorted(miss)}")
                                                if "confidence" in df.columns:
                                                    df = df[df["confidence"] >= conf_thr].copy()
                                                df = df.dropna(subset=["start_timestamp", "duration"]).copy()
                                                df["start_timestamp"] = pd.to_numeric(df["start_timestamp"], errors="coerce")
                                                df["duration"] = pd.to_numeric(df["duration"], errors="coerce")
                                                df = df.dropna(subset=["start_timestamp", "duration"]).copy()
                                                if len(df) and float(np.nanmedian(df["duration"].to_numpy(dtype=float))) > 10.0:
                                                    df["duration"] = df["duration"] / 1000.0
                                                df = df[df["duration"] > 0].copy()
                                                df = df.sort_values("start_timestamp").reset_index(drop=True)
                                                return df


                                            def load_pupil(path: Path, conf_thr: float) -> pd.DataFrame:
                                                df = pd.read_csv(path)
                                                if "pupil_timestamp" not in df.columns:
                                                    raise ValueError("Missing pupil_timestamp column in pupil_positions.csv")

                                                value_col = "diameter_3d" if "diameter_3d" in df.columns else ("diameter" if "diameter" in df.columns else None)
                                                if value_col is None:
                                                    raise ValueError("Missing pupil diameter column. Expected diameter_3d or diameter.")

                                                if "confidence" in df.columns:
                                                    df = df[df["confidence"] >= conf_thr].copy()

                                                if "method" in df.columns:
                                                    rank = df["method"].astype(str).str.contains("pye3d", case=False, na=False).astype(int)
                                                    df = df.assign(_rank=rank).sort_values(["pupil_timestamp", "_rank"], ascending=[True, False])
                                                    df = df.drop_duplicates(subset=["pupil_timestamp"], keep="first").drop(columns=["_rank"])
                                                else:
                                                    df = df.drop_duplicates(subset=["pupil_timestamp"], keep="first")

                                                df["pupil_timestamp"] = pd.to_numeric(df["pupil_timestamp"], errors="coerce")
                                                df[value_col] = pd.to_numeric(df[value_col], errors="coerce")
                                                df = df.dropna(subset=["pupil_timestamp", value_col]).copy()
                                                df = df.sort_values("pupil_timestamp").reset_index(drop=True)
                                                df = df.rename(columns={value_col: "pupil_value"})
                                                return df


                                            def _recording_start(gaze_path: Path, explicit: float | None, blink_df: pd.DataFrame, pupil_df: pd.DataFrame) -> float:
                                                if explicit is not None:
                                                    return float(explicit)
                                                if gaze_path.is_file():
                                                    g = pd.read_csv(gaze_path)
                                                    if "gaze_timestamp" in g.columns:
                                                        ts = pd.to_numeric(g["gaze_timestamp"], errors="coerce").dropna()
                                                        if len(ts):
                                                            return float(ts.min())
                                                return float(min(blink_df["start_timestamp"].min(), pupil_df["pupil_timestamp"].min()))


                                            def smooth_pupil(df: pd.DataFrame, window_sec: float) -> pd.DataFrame:
                                                out = df.copy()
                                                t = out["pupil_timestamp"].to_numpy(dtype=float)
                                                if len(t) < 3:
                                                    out["pupil_smooth"] = out["pupil_value"]
                                                    return out
                                                dt = float(np.median(np.diff(t)))
                                                n = int(max(3, round(max(window_sec, 0.05) / max(dt, 1e-6))))
                                                if n % 2 == 0:
                                                    n += 1
                                                out["pupil_smooth"] = out["pupil_value"].rolling(window=n, center=True, min_periods=1).median()
                                                return out


                                            def build_block_table(blinks: pd.DataFrame, pupil: pd.DataFrame, t0: float, block_sec: float) -> pd.DataFrame:
                                                b = blinks.copy()
                                                p = pupil.copy()
                                                b["t"] = b["start_timestamp"] - t0
                                                p["t"] = p["pupil_timestamp"] - t0
                                                b = b[b["t"] >= 0].copy()
                                                p = p[p["t"] >= 0].copy()
                                                if b.empty or p.empty:
                                                    raise ValueError("Not enough aligned data after timestamp anchoring.")

                                                t_end = float(max(b["t"].max(), p["t"].max()))
                                                edges = np.arange(0.0, t_end + block_sec, block_sec, dtype=float)
                                                if len(edges) < 2:
                                                    edges = np.array([0.0, max(block_sec, 1.0)], dtype=float)

                                                starts = b["t"].to_numpy(dtype=float)
                                                ibis = np.diff(starts) if len(starts) >= 2 else np.array([], dtype=float)
                                                ibi_times = starts[1:] if len(starts) >= 2 else np.array([], dtype=float)

                                                rows: list[dict[str, float | int]] = []
                                                for i in range(len(edges) - 1):
                                                    s, e = float(edges[i]), float(edges[i + 1])
                                                    bm = (b["t"] >= s) & (b["t"] < e if i < len(edges) - 2 else b["t"] <= e)
                                                    pm = (p["t"] >= s) & (p["t"] < e if i < len(edges) - 2 else p["t"] <= e)
                                                    im = (ibi_times >= s) & (ibi_times < e if i < len(edges) - 2 else ibi_times <= e)

                                                    n_blinks = int(np.sum(bm))
                                                    blink_rate = float(n_blinks / max(e - s, 1e-9) * 60.0)
                                                    ibi_med = float(np.median(ibis[im])) if np.any(im) else np.nan

                                                    ps = p.loc[pm, "pupil_smooth"].to_numpy(dtype=float)
                                                    pupil_mean = float(np.mean(ps)) if len(ps) else np.nan
                                                    pupil_std = float(np.std(ps)) if len(ps) else np.nan

                                                    rows.append(
                                                        {
                                                            "block_index": i + 1,
                                                            "block_start_s": s,
                                                            "block_end_s": e,
                                                            "blink_count": n_blinks,
                                                            "blink_rate_per_min": blink_rate,
                                                            "ibi_median_s": ibi_med,
                                                            "pupil_mean": pupil_mean,
                                                            "pupil_std": pupil_std,
                                                        }
                                                    )
                                                return pd.DataFrame(rows)


                                            def early_late_summary(blocks: pd.DataFrame) -> pd.DataFrame:
                                                if len(blocks) < 3:
                                                    thirds = max(1, len(blocks) // 2)
                                                else:
                                                    thirds = max(1, len(blocks) // 3)
                                                early = blocks.iloc[:thirds]
                                                late = blocks.iloc[-thirds:]
                                                out = pd.DataFrame(
                                                    [
                                                        {
                                                            "segment": "early",
                                                            "n_blocks": int(len(early)),
                                                            "blink_rate_per_min": float(np.nanmean(early["blink_rate_per_min"])),
                                                            "ibi_median_s": float(np.nanmean(early["ibi_median_s"])),
                                                            "pupil_mean": float(np.nanmean(early["pupil_mean"])),
                                                            "pupil_std": float(np.nanmean(early["pupil_std"])),
                                                        },
                                                        {
                                                            "segment": "late",
                                                            "n_blocks": int(len(late)),
                                                            "blink_rate_per_min": float(np.nanmean(late["blink_rate_per_min"])),
                                                            "ibi_median_s": float(np.nanmean(late["ibi_median_s"])),
                                                            "pupil_mean": float(np.nanmean(late["pupil_mean"])),
                                                            "pupil_std": float(np.nanmean(late["pupil_std"])),
                                                        },
                                                    ]
                                                )
                                                return out


                                            def plot_timeline(blocks: pd.DataFrame, label: str, out_path: Path) -> None:
                                                x = blocks["block_index"].to_numpy(dtype=float)

                                                fig = plt.figure(figsize=(13.2, 8.2))
                                                gs = fig.add_gridspec(3, 1, hspace=0.22)
                                                ax1 = fig.add_subplot(gs[0, 0])
                                                ax2 = fig.add_subplot(gs[1, 0])
                                                ax3 = fig.add_subplot(gs[2, 0])
                                                fig.subplots_adjust(left=0.08, right=0.98, bottom=0.08, top=0.93)

                                                ax1.bar(x, blocks["blink_rate_per_min"], color="#7a0018", alpha=0.85, width=0.75, edgecolor="white", linewidth=0.3)
                                                ax1.set_ylabel("Blink rate / min")
                                                title = "Pupil + blink timeline"
                                                if str(label).strip():
                                                    title = f"{title} ({label})"
                                                ax1.set_title(title, fontsize=11)
                                                ax1.grid(axis="y", alpha=0.22)

                                                ax2.plot(x, blocks["pupil_mean"], color="#1f4e79", linewidth=1.8, marker="o", markersize=3.5, alpha=0.9)
                                                ax2.set_ylabel("Pupil mean")
                                                ax2.grid(axis="y", alpha=0.22)

                                                ax3.plot(
                                                    x,
                                                    blocks["ibi_median_s"],
                                                    color="#6b8e23",
                                                    linewidth=1.8,
                                                    marker="o",
                                                    markersize=3.5,
                                                    alpha=0.9,
                                                    label="Median IBI",
                                                )
                                                ax3.set_ylabel("Median IBI (s)")

                                                # Pupil variability has a much smaller numeric range than IBI.
                                                # Plot on a secondary axis so its changes are visible.
                                                ax3b = ax3.twinx()
                                                ax3b.plot(
                                                    x,
                                                    blocks["pupil_std"],
                                                    color="#6a4c93",
                                                    linewidth=1.6,
                                                    marker="s",
                                                    markersize=3.2,
                                                    alpha=0.85,
                                                    label="Pupil std",
                                                )
                                                ax3b.set_ylabel("Pupil std")
                                                ax3.set_xlabel("Block index")
                                                # IBI is NaN in blocks with <2 blinks, which breaks line segments.
                                                # Interpolate only for visualization so the trend remains readable.
                                                ibi_vis = blocks["ibi_median_s"].interpolate(limit_direction="both")
                                                ax3.lines[0].set_ydata(ibi_vis.to_numpy(dtype=float))
                                                ax3.grid(axis="y", alpha=0.22)
                                                h1, l1 = ax3.get_legend_handles_labels()
                                                h2, l2 = ax3b.get_legend_handles_labels()
                                                ax3.legend(h1 + h2, l1 + l2, loc="upper right", frameon=False, fontsize=8)

                                                fig.savefig(out_path, dpi=220)
                                                plt.close(fig)


                                            def main() -> None:
                                                args = parse_args()
                                                blinks_path = _resolve_existing(args.blinks, "Blinks file")
                                                pupil_path = _resolve_existing(args.pupil, "Pupil file")
                                                gaze_path = args.gaze.expanduser().resolve()

                                                out_dir = args.out_dir.expanduser().resolve()
                                                out_dir.mkdir(parents=True, exist_ok=True)

                                                blinks = load_blinks(blinks_path, conf_thr=args.confidence_threshold)
                                                pupil = load_pupil(pupil_path, conf_thr=args.confidence_threshold)
                                                if blinks.empty:
                                                    raise ValueError("No blink events after filtering.")
                                                if pupil.empty:
                                                    raise ValueError("No pupil samples after filtering.")

                                                t0 = _recording_start(gaze_path, args.recording_start_ts, blinks, pupil)
                                                pupil = smooth_pupil(pupil, window_sec=args.pupil_smooth_sec)
                                                blocks = build_block_table(blinks, pupil, t0=t0, block_sec=max(args.block_sec, 5.0))
                                                seg = early_late_summary(blocks)

                                                plot_timeline(blocks, label=args.label, out_path=out_dir / "pupil_blink_timeline.png")
                                                blocks.to_csv(out_dir / "pupil_blink_block_summary.csv", index=False)
                                                seg.to_csv(out_dir / "pupil_blink_early_vs_late.csv", index=False)

                                                print(f"Outputs saved to: {out_dir}")
                                                print(f"Blinks used: {blinks_path}")
                                                print(f"Pupil used: {pupil_path}")
                                                print(f"Gaze start anchor: {gaze_path if gaze_path.is_file() else 'not found; fallback used'}")


                                            if __name__ == "__main__":
                                                main()


                                            
                                        
                                    
After you run the script, you will obtain the outputs listed in Step 3. The preview below uses the same asset path as on this site.

Conclusions

  • Block-wise blink rate and IBI summarize how blink behavior changes over time.
  • Pupil mean and variability add complementary information about session dynamics.
  • Early-vs-late summaries provide a quick, interpretable comparison for single-session analysis.