Source code for econirl.environments.rust_bus

"""Rust (1987) bus engine replacement environment.

This module implements the classic dynamic discrete choice model from
John Rust's 1987 Econometrica paper "Optimal Replacement of GMC Bus Engines."

The model:
- State: Discretized mileage (odometer reading in bins)
- Actions: Keep running (0) or Replace engine (1)
- Utility: Operating cost increases with mileage; replacement has fixed cost
- Transitions: Mileage increases stochastically; replacement resets to zero

This is the canonical example for DDC estimation methods and serves as
a benchmark for testing estimators.

Five cost function specifications are supported, following the ruspy
library (OpenSourceEconomics). The cost_type parameter selects the
functional form of the operating cost: linear, quadratic, cubic,
square root, or hyperbolic.

Reference:
    Rust, J. (1987). "Optimal Replacement of GMC Bus Engines: An Empirical
    Model of Harold Zurcher." Econometrica, 55(5), 999-1033.
"""

from __future__ import annotations

from typing import Literal

import jax.numpy as jnp
import numpy as np
from gymnasium import spaces

from econirl.environments.base import DDCEnvironment

# Valid cost function types
CostType = Literal["linear", "quadratic", "cubic", "sqrt", "hyperbolic"]
VALID_COST_TYPES: tuple[str, ...] = ("linear", "quadratic", "cubic", "sqrt", "hyperbolic")


[docs] class RustBusEnvironment(DDCEnvironment): """Rust (1987) bus engine replacement environment. Harold Zurcher's decision problem: Each period, observe bus mileage and decide whether to keep the current engine or replace it. State space: Mileage discretized into bins {0, 1, ..., num_mileage_bins - 1} Each bin represents approximately 5,000 miles. Action space: 0 = Keep: Continue with current engine 1 = Replace: Install new engine (mileage resets to 0) Utility specification (depends on cost_type): linear: U(s, keep) = -theta_1 * s quadratic: U(s, keep) = -theta_1 * s - theta_2 * s^2 cubic: U(s, keep) = -theta_1 * s - theta_2 * s^2 - theta_3 * s^3 sqrt: U(s, keep) = -theta_1 * sqrt(s) hyperbolic: U(s, keep) = -theta_1 / (N+1 - s) U(s, replace) = -replacement_cost + ε_replace where ε are i.i.d. Type I Extreme Value (Gumbel) shocks. Transition dynamics: If keep: mileage increases by {0, 1, 2} with probabilities (θ_0, θ_1, θ_2) If replace: mileage resets to 0 Example: >>> env = RustBusEnvironment( ... operating_cost=0.001, ... replacement_cost=3.0, ... discount_factor=0.9999, ... ) >>> obs, info = env.reset() >>> print(f"Initial mileage bin: {obs}") >>> env_quad = RustBusEnvironment( ... cost_type="quadratic", ... operating_cost_params=(0.001, 0.00001), ... replacement_cost=3.0, ... ) """ # Action constants for clarity KEEP = 0 REPLACE = 1
[docs] def __init__( self, operating_cost: float = 0.001, replacement_cost: float = 3.0, num_mileage_bins: int = 90, mileage_transition_probs: tuple[float, float, float] = (0.3919, 0.5953, 0.0128), discount_factor: float = 0.9999, scale_parameter: float = 1.0, seed: int | None = None, cost_type: CostType = "linear", operating_cost_params: tuple[float, ...] | None = None, ): """Initialize the Rust bus environment. Args: operating_cost: Cost per unit mileage for operating (θ_1 in Rust). Used as the single operating cost coefficient for linear, sqrt, and hyperbolic cost types. Ignored when operating_cost_params is provided for quadratic or cubic types. replacement_cost: Fixed cost of engine replacement (RC in Rust) num_mileage_bins: Number of mileage discretization bins (default 90) mileage_transition_probs: Probabilities of mileage increase (0, 1, or 2 bins) Must sum to 1. Default from Rust (1987) estimates. discount_factor: Time discount factor β scale_parameter: Logit scale parameter σ seed: Random seed for reproducibility cost_type: Functional form of operating cost. One of "linear", "quadratic", "cubic", "sqrt", "hyperbolic". operating_cost_params: Tuple of operating cost coefficients for multi-parameter cost types (quadratic, cubic). For quadratic, provide (theta_1, theta_2). For cubic, provide (theta_1, theta_2, theta_3). If None, uses operating_cost as the single coefficient. """ super().__init__( discount_factor=discount_factor, scale_parameter=scale_parameter, seed=seed, ) if cost_type not in VALID_COST_TYPES: raise ValueError( f"cost_type must be one of {VALID_COST_TYPES}, got {cost_type!r}" ) self._cost_type = cost_type # Determine operating cost coefficients based on cost_type if operating_cost_params is not None: self._operating_cost_params = tuple(operating_cost_params) else: self._operating_cost_params = (operating_cost,) expected_n_params = {"linear": 1, "quadratic": 2, "cubic": 3, "sqrt": 1, "hyperbolic": 1} n_expected = expected_n_params[cost_type] if len(self._operating_cost_params) != n_expected: raise ValueError( f"cost_type={cost_type!r} expects {n_expected} operating cost " f"parameter(s), got {len(self._operating_cost_params)}" ) # Keep operating_cost for backward compatibility (first coefficient) self._operating_cost = self._operating_cost_params[0] self._replacement_cost = replacement_cost self._num_mileage_bins = num_mileage_bins self._mileage_transition_probs = np.array(mileage_transition_probs) # Validate transition probabilities if len(self._mileage_transition_probs) != 3: raise ValueError("mileage_transition_probs must have exactly 3 elements") if not np.isclose(self._mileage_transition_probs.sum(), 1.0, atol=1e-4): raise ValueError("mileage_transition_probs must sum to 1") # Set up Gymnasium spaces self.observation_space = spaces.Discrete(num_mileage_bins) self.action_space = spaces.Discrete(2) # Keep or Replace # Pre-compute transition matrices and features self._transition_matrices = self._build_transition_matrices() self._feature_matrix = self._build_feature_matrix()
@property def num_states(self) -> int: return self._num_mileage_bins @property def num_actions(self) -> int: return 2 # Keep, Replace @property def transition_matrices(self) -> jnp.ndarray: return self._transition_matrices @property def feature_matrix(self) -> jnp.ndarray: return self._feature_matrix @property def cost_type(self) -> str: """Return the operating cost function type.""" return self._cost_type @property def true_parameters(self) -> dict[str, float]: params = {} names = self._operating_cost_param_names() for name, val in zip(names, self._operating_cost_params): params[name] = val params["replacement_cost"] = self._replacement_cost return params @property def parameter_names(self) -> list[str]: return self._operating_cost_param_names() + ["replacement_cost"] def _operating_cost_param_names(self) -> list[str]: """Return parameter names for the operating cost coefficients.""" if self._cost_type in ("linear", "sqrt", "hyperbolic"): return ["operating_cost"] elif self._cost_type == "quadratic": return ["operating_cost_1", "operating_cost_2"] else: # cubic return ["operating_cost_1", "operating_cost_2", "operating_cost_3"] @property def mileage_transition_probs(self) -> np.ndarray: """Return the mileage transition probabilities.""" return self._mileage_transition_probs.copy() def _build_transition_matrices(self) -> jnp.ndarray: """Build transition probability matrices P(s'|s,a). Returns: Tensor of shape (2, num_states, num_states) - transitions[0, s, s'] = P(s' | s, keep) - transitions[1, s, s'] = P(s' | s, replace) """ n = self._num_mileage_bins p = self._mileage_transition_probs # Build with numpy then convert (JAX arrays are immutable) transitions = np.zeros((2, n, n), dtype=np.float32) # Keep action: mileage increases by 0, 1, or 2 with given probabilities for s in range(n): for delta, prob in enumerate(p): next_s = min(s + delta, n - 1) # Cap at max mileage transitions[self.KEEP, s, next_s] += prob # Replace action: always transition to state 0, then increase # After replacement, mileage starts at 0 and increases by 0, 1, or 2 for delta, prob in enumerate(p): next_s = min(delta, n - 1) transitions[self.REPLACE, :, next_s] = prob return jnp.array(transitions) def _build_feature_matrix(self) -> jnp.ndarray: """Build feature matrix for utility computation. Features are structured so that U(s, a) = theta dot phi(s, a). The number of features depends on the cost_type. Replacement cost is always the last feature column. Returns: Tensor of shape (num_states, num_actions, num_features) """ n = self._num_mileage_bins n_params = len(self._operating_cost_params) + 1 # +1 for RC features = np.zeros((n, 2, n_params), dtype=np.float32) s = np.arange(n, dtype=np.float32) if self._cost_type == "linear": # U(s, keep) = -theta_1 * s features[:, self.KEEP, 0] = -s elif self._cost_type == "quadratic": # U(s, keep) = -theta_1 * s - theta_2 * s^2 features[:, self.KEEP, 0] = -s features[:, self.KEEP, 1] = -s ** 2 elif self._cost_type == "cubic": # U(s, keep) = -theta_1 * s - theta_2 * s^2 - theta_3 * s^3 features[:, self.KEEP, 0] = -s features[:, self.KEEP, 1] = -s ** 2 features[:, self.KEEP, 2] = -s ** 3 elif self._cost_type == "sqrt": # U(s, keep) = -theta_1 * sqrt(s) features[:, self.KEEP, 0] = -np.sqrt(s) elif self._cost_type == "hyperbolic": # U(s, keep) = -theta_1 / (N+1 - s) features[:, self.KEEP, 0] = -1.0 / (n + 1.0 - s) # Replace action: utility = -replacement_cost (last feature column) features[:, self.REPLACE, -1] = -1.0 return jnp.array(features) def _get_initial_state_distribution(self) -> np.ndarray: """Return initial state distribution (start at mileage 0).""" dist = np.zeros(self._num_mileage_bins) dist[0] = 1.0 return dist def _compute_flow_utility(self, state: int, action: int) -> float: """Compute flow utility for state-action pair.""" if action == self.REPLACE: return -self._replacement_cost # Keep action: cost depends on cost_type s = float(state) p = self._operating_cost_params if self._cost_type == "linear": return -p[0] * s elif self._cost_type == "quadratic": return -p[0] * s - p[1] * s ** 2 elif self._cost_type == "cubic": return -p[0] * s - p[1] * s ** 2 - p[2] * s ** 3 elif self._cost_type == "sqrt": return -p[0] * (s ** 0.5) else: # hyperbolic n = self._num_mileage_bins return -p[0] / (n + 1.0 - s) def _sample_next_state(self, state: int, action: int) -> int: """Sample next state given current state and action.""" if action == self.REPLACE: # Reset to 0, then increase base_state = 0 else: base_state = state # Sample mileage increase delta = self._np_random.choice(3, p=self._mileage_transition_probs) next_state = min(base_state + delta, self._num_mileage_bins - 1) return int(next_state)
[docs] def mileage_to_state(self, mileage: float, bin_size: float = 5000.0) -> int: """Convert actual mileage to state index. Args: mileage: Actual odometer reading bin_size: Size of each mileage bin (default 5000 miles) Returns: State index (capped at num_mileage_bins - 1) """ state = int(mileage / bin_size) return min(state, self._num_mileage_bins - 1)
[docs] def state_to_mileage(self, state: int, bin_size: float = 5000.0) -> float: """Convert state index to midpoint mileage. Args: state: State index bin_size: Size of each mileage bin Returns: Midpoint mileage for this bin """ return (state + 0.5) * bin_size
[docs] def describe(self) -> str: """Return a human-readable description of the environment.""" param_lines = [] for name, val in self.true_parameters.items(): param_lines.append(f" {name}: {val}") params_str = "\n".join(param_lines) return f"""Rust (1987) Bus Engine Replacement Environment =============================================== States: {self.num_states} mileage bins (0 to {self.num_states - 1}) Actions: Keep (0), Replace (1) Cost type: {self._cost_type} True Parameters: {params_str} Structural Parameters: Discount factor (β): {self._discount_factor} Scale parameter (σ): {self._scale_parameter} Mileage Transition Probabilities: P(+0 bins): {self._mileage_transition_probs[0]:.4f} P(+1 bin): {self._mileage_transition_probs[1]:.4f} P(+2 bins): {self._mileage_transition_probs[2]:.4f} """