Source code for econirl.datasets.ngsim

"""
NGSIM US-101 Vehicle Trajectory Dataset for Lane-Change IRL.

This module provides the NGSIM US-101 highway vehicle trajectory dataset,
modeling lane-change decisions as a dynamic discrete choice problem.

Each vehicle's trajectory is treated as a panel observation where:
- State: (lane, speed_bin) — which lane and how fast
- Action: lane change decision (left, stay, right)
- Transitions: empirical lane/speed dynamics

Reference:
    FHWA (2006). "Next Generation Simulation (NGSIM) Vehicle Trajectories
    and Supporting Data." U.S. Department of Transportation.
    https://datahub.transportation.gov/
"""

from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd


N_LANES = 5  # Mainline lanes 1-5 (drop ramps 6-8)
N_SPEED_BINS = 10
SPEED_BIN_WIDTH = 5.0  # ft/s per bin (~3.4 mph)
ACTION_LEFT = 0
ACTION_STAY = 1
ACTION_RIGHT = 2
N_ACTIONS = 3
LANE_NAMES = ["Lane 1 (leftmost)", "Lane 2", "Lane 3 (center)", "Lane 4", "Lane 5 (rightmost)"]
ACTION_NAMES = ["Lane Left", "Stay", "Lane Right"]


[docs] def load_ngsim( as_panel: bool = False, n_speed_bins: int = N_SPEED_BINS, subsample_frames: int = 10, min_frames: int = 50, max_vehicles: Optional[int] = None, ) -> pd.DataFrame: """ Load the NGSIM US-101 dataset as a lane-change discrete choice problem. Args: as_panel: If True, return as Panel object for econirl estimators. n_speed_bins: Number of speed bins (default 10, covering 0-50 ft/s). subsample_frames: Take every Nth frame to reduce autocorrelation (default 10, i.e., 1Hz from 10Hz raw data). min_frames: Minimum frames per vehicle after subsampling (default 50). max_vehicles: If set, limit to this many vehicles (for faster testing). Returns: DataFrame with columns: vehicle_id, frame, state, action, next_state, lane, speed_bin, v_vel, space_headway, lane_change """ data_path = Path(__file__).parent.parent.parent.parent / "data" / "raw" / "ngsim" / "us101_trajectories.csv" if not data_path.exists(): raise FileNotFoundError( f"NGSIM data not found at {data_path}. " "Download from: https://datahub.transportation.gov/resource/8ect-6jqj.csv" ) # Load with only needed columns for memory efficiency usecols = ["vehicle_id", "frame_id", "v_vel", "v_acc", "lane_id", "space_headway", "time_headway"] df = pd.read_csv(data_path, usecols=usecols, dtype={ "vehicle_id": "int32", "frame_id": "int32", "lane_id": "int8", "v_vel": "float32", "v_acc": "float32", "space_headway": "float32", "time_headway": "float32", }) # Strip quotes from column values if present for col in df.columns: if df[col].dtype == object: df[col] = pd.to_numeric(df[col].str.strip('"'), errors="coerce") # Filter to mainline lanes only (1-5) df = df[df["lane_id"].between(1, N_LANES)].copy() # Sort by vehicle and frame df = df.sort_values(["vehicle_id", "frame_id"]).reset_index(drop=True) # Subsample frames (10Hz → 1Hz by default) if subsample_frames > 1: df = df.groupby("vehicle_id", group_keys=False).apply( lambda x: x.iloc[::subsample_frames] ).reset_index(drop=True) # Discretize speed df["speed_bin"] = np.clip( (df["v_vel"] / SPEED_BIN_WIDTH).astype(int), 0, n_speed_bins - 1 ) # Compute lane (0-indexed) df["lane"] = (df["lane_id"] - 1).astype(int) # Compute state: lane * n_speed_bins + speed_bin df["state"] = df["lane"] * n_speed_bins + df["speed_bin"] # Detect lane changes (action) from consecutive frames df["next_lane"] = df.groupby("vehicle_id")["lane"].shift(-1) df["next_speed_bin"] = df.groupby("vehicle_id")["speed_bin"].shift(-1) df["lane_change"] = df["next_lane"] - df["lane"] # Map lane change to action df["action"] = ACTION_STAY # default df.loc[df["lane_change"] == -1, "action"] = ACTION_LEFT df.loc[df["lane_change"] == 1, "action"] = ACTION_RIGHT # Drop multi-lane changes (rare, noisy) df = df[df["lane_change"].abs() <= 1].copy() # Compute next_state df["next_state"] = df["next_lane"] * n_speed_bins + df["next_speed_bin"] # Drop last frame per vehicle (no next_state) and NaN rows df = df.dropna(subset=["next_lane", "next_speed_bin"]).copy() df["next_state"] = df["next_state"].astype(int) df["action"] = df["action"].astype(int) # Filter vehicles with enough frames vehicle_counts = df["vehicle_id"].value_counts() valid_vehicles = vehicle_counts[vehicle_counts >= min_frames].index df = df[df["vehicle_id"].isin(valid_vehicles)].copy() if max_vehicles is not None: selected = df["vehicle_id"].unique()[:max_vehicles] df = df[df["vehicle_id"].isin(selected)].copy() # Add period (time index within vehicle) df["period"] = df.groupby("vehicle_id").cumcount() # Select output columns result = df[["vehicle_id", "period", "state", "action", "next_state", "lane", "speed_bin", "v_vel", "space_headway", "lane_change"]].copy() result = result.reset_index(drop=True) if as_panel: from econirl.core.types import Panel, Trajectory import jax.numpy as jnp trajectories = [] for vid in result["vehicle_id"].unique(): vdata = result[result["vehicle_id"] == vid].sort_values("period") traj = Trajectory( states=jnp.array(vdata["state"].values, dtype=jnp.int32), actions=jnp.array(vdata["action"].values, dtype=jnp.int32), next_states=jnp.array(vdata["next_state"].values, dtype=jnp.int32), individual_id=int(vid), ) trajectories.append(traj) return Panel(trajectories=trajectories) return result
[docs] def get_ngsim_info() -> dict: """Return metadata about the NGSIM US-101 dataset.""" return { "name": "NGSIM US-101 Vehicle Trajectories", "description": "Lane-change decisions on US-101 freeway, Los Angeles", "source": "FHWA Next Generation Simulation", "url": "https://datahub.transportation.gov/resource/8ect-6jqj", "n_states": N_LANES * N_SPEED_BINS, # 50 "n_actions": N_ACTIONS, # 3 "n_vehicles": 2848, "n_observations": "~4.8M raw frames (480K at 1Hz)", "state_description": "(lane, speed_bin)", "action_description": "lane change: left / stay / right", "lane_names": LANE_NAMES, "action_names": ACTION_NAMES, }