Source code for econirl.estimators.ccp

"""Sklearn-style CCP estimator for dynamic discrete choice models.

This module provides a CCP class with a scikit-learn style API that wraps
the underlying CCPEstimator from econirl.estimation.ccp. It implements the
Hotz-Miller (1993) approach and NPL extension (Aguirregabiria-Mira 2002).

The CCP estimator has the same interface as NFXP, allowing users to easily
switch between estimation methods.

Example:
    >>> from econirl.estimators import CCP
    >>> import pandas as pd
    >>>
    >>> # Load bus replacement data
    >>> df = pd.read_csv("zurcher_bus.csv")
    >>>
    >>> # Create estimator and fit (Hotz-Miller by default)
    >>> model = CCP(n_states=90, discount=0.9999)
    >>> model.fit(data=df, state="mileage_bin", action="replaced", id="bus_id")
    >>>
    >>> # Access results sklearn-style
    >>> print(model.params_)        # {"theta_c": 0.001, "RC": 9.35}
    >>> print(model.summary())
    >>>
    >>> # Use NPL (iterative refinement)
    >>> model_npl = CCP(n_states=90, num_policy_iterations=10)
    >>> model_npl.fit(data=df, state="mileage_bin", action="replaced", id="bus_id")
"""

from __future__ import annotations

from typing import Literal

import numpy as np
import pandas as pd

from econirl.core.reward_spec import RewardSpec
from econirl.core.types import DDCProblem, Panel, TrajectoryPanel
from econirl.estimation.ccp import CCPEstimator
from econirl.estimators.nfxp import NFXP
from econirl.transitions import TransitionEstimator


[docs] class CCP(NFXP): """Sklearn-style CCP estimator for dynamic discrete choice models. The CCP (Conditional Choice Probability) approach estimates utility parameters using the Hotz-Miller inversion theorem. This is typically faster than NFXP because it avoids the nested fixed-point computation. With num_policy_iterations=1, this is the classic Hotz-Miller estimator. With num_policy_iterations>1, this becomes the NPL (Nested Pseudo Likelihood) algorithm that iterates to the MLE. Parameters ---------- n_states : int, default=90 Number of discrete states (e.g., mileage bins). n_actions : int, default=2 Number of discrete actions (e.g., keep/replace). discount : float, default=0.9999 Time discount factor (beta). utility : str or RewardSpec, default="linear_cost" Utility specification. Pass ``"linear_cost"`` for the classic Rust bus model (``u = -theta_c * s * (1-a) - RC * a``), or a ``RewardSpec`` for custom features. se_method : str, default="robust" Method for computing standard errors. Options: "robust", "asymptotic". verbose : bool, default=False Whether to print progress messages during estimation. num_policy_iterations : int, default=1 Number of policy iterations. K=1 is Hotz-Miller, K>1 is NPL. Set to -1 for convergence-based stopping. Attributes ---------- params_ : dict Estimated parameters after fitting. Keys are parameter names (e.g., "theta_c", "RC") and values are point estimates. se_ : dict Standard errors for each parameter. coef_ : numpy.ndarray Coefficients as a numpy array (sklearn convention). log_likelihood_ : float Maximized log-likelihood value. pvalues_ : dict P-values for each parameter (from Wald t-test). policy_ : numpy.ndarray Estimated choice probabilities P(a|s) of shape (n_states, n_actions). value_ : numpy.ndarray Estimated value function V(s) of shape (n_states,). value_function_ : numpy.ndarray Alias for ``value_`` (backward compatibility). transitions_ : numpy.ndarray Transition probability matrix (n_states x n_states). converged_ : bool Whether the optimization converged. reward_spec_ : RewardSpec The reward specification used for estimation. Examples -------- >>> from econirl.estimators import CCP >>> import pandas as pd >>> >>> df = pd.DataFrame({ ... "bus_id": [0, 0, 1, 1], ... "mileage": [10, 20, 15, 30], ... "replaced": [0, 0, 0, 1], ... }) >>> >>> # Hotz-Miller (fast, one-step) >>> model = CCP(n_states=90) >>> model.fit(df, state="mileage", action="replaced", id="bus_id") >>> >>> # NPL (iterates towards MLE) >>> model_npl = CCP(n_states=90, num_policy_iterations=10) >>> model_npl.fit(df, state="mileage", action="replaced", id="bus_id") References ---------- Hotz, V.J. and Miller, R.A. (1993). "Conditional Choice Probabilities and the Estimation of Dynamic Models." RES 60(3), 497-529. Aguirregabiria, V. and Mira, P. (2002). "Swapping the Nested Fixed Point Algorithm." Econometrica 70(4), 1519-1543. """
[docs] def __init__( self, n_states: int = 90, n_actions: int = 2, discount: float = 0.9999, utility: str | RewardSpec = "linear_cost", se_method: Literal["robust", "asymptotic"] = "robust", verbose: bool = False, num_policy_iterations: int = 1, ): """Initialize the CCP estimator. Parameters ---------- n_states : int, default=90 Number of discrete states. n_actions : int, default=2 Number of discrete actions. discount : float, default=0.9999 Time discount factor (beta). utility : str or RewardSpec, default="linear_cost" Utility specification to use. Pass ``"linear_cost"`` for the classic Rust bus model, or a ``RewardSpec`` for custom features. se_method : str, default="robust" Method for computing standard errors. verbose : bool, default=False Whether to print progress messages. num_policy_iterations : int, default=1 Number of NPL iterations (K=1 is Hotz-Miller). """ # Initialize parent with shared parameters super().__init__( n_states=n_states, n_actions=n_actions, discount=discount, utility=utility, se_method=se_method, verbose=verbose, ) # CCP-specific parameter self.num_policy_iterations = num_policy_iterations
[docs] def fit( self, data: pd.DataFrame | Panel | TrajectoryPanel, state: str | None = None, action: str | None = None, id: str | None = None, transitions: np.ndarray | None = None, reward: RewardSpec | None = None, ) -> "CCP": """Fit the CCP estimator to data. Parameters ---------- data : pandas.DataFrame or Panel or TrajectoryPanel Panel data with observations. When a DataFrame is passed, ``state``, ``action``, and ``id`` column names are required. When a Panel/TrajectoryPanel is passed, column names are ignored. state : str, optional Column name for the state variable (required for DataFrame input). action : str, optional Column name for the action variable (required for DataFrame input). id : str, optional Column name for the individual identifier (required for DataFrame input). transitions : numpy.ndarray, optional Pre-estimated transition matrix of shape (n_states, n_states). If None, transitions are estimated from the data. reward : RewardSpec, optional Reward/utility specification. If provided, overrides the ``utility`` parameter passed at construction time. Returns ------- self : CCP Returns self for method chaining. """ # Resolve reward spec: explicit argument > constructor parameter reward_spec = reward if reward is not None else self.utility # --- Handle data: DataFrame or Panel/TrajectoryPanel --- if isinstance(data, pd.DataFrame): if state is None or action is None or id is None: raise ValueError( "state, action, and id column names are required " "when data is a DataFrame" ) self._panel = TrajectoryPanel.from_dataframe( data, state=state, action=action, id=id ) elif isinstance(data, (Panel, TrajectoryPanel)): self._panel = data else: raise TypeError( f"data must be a DataFrame, Panel, or TrajectoryPanel, " f"got {type(data)}" ) # --- Handle reward: RewardSpec or string --- if isinstance(reward_spec, RewardSpec): self.reward_spec_ = reward_spec self._utility_fn = reward_spec.to_linear_utility() elif reward_spec == "linear_cost": self._utility_fn = self._create_utility() # Also create RewardSpec from the utility for consistency self.reward_spec_ = RewardSpec( self._utility_fn.feature_matrix, self._utility_fn.parameter_names, ) else: raise ValueError(f"Unknown reward/utility specification: {reward_spec}") # Estimate transitions if not provided if transitions is None: trans_estimator = TransitionEstimator( n_states=self.n_states, max_increase=2, ) trans_estimator.fit(self._panel) self.transitions_ = trans_estimator.matrix_ else: self.transitions_ = np.asarray(transitions) # Build full transition matrices (for both actions) transition_tensor = self._build_transition_tensor(self.transitions_) # Create problem specification self._problem = DDCProblem( num_states=self.n_states, num_actions=self.n_actions, discount_factor=self.discount, scale_parameter=1.0, ) # Create the underlying CCP estimator (this is the key difference from NFXP) estimator = CCPEstimator( num_policy_iterations=self.num_policy_iterations, se_method=self.se_method, verbose=self.verbose, ) # Run estimation self._result = estimator.estimate( panel=self._panel, utility=self._utility_fn, problem=self._problem, transitions=transition_tensor, ) # Extract results self._extract_results() return self
[docs] def summary(self) -> str: """Generate a formatted summary of estimation results. Returns ------- str Human-readable summary of the estimation. """ if self._result is None: return "CCP: Not fitted yet. Call fit() first." return self._result.summary()
def __repr__(self) -> str: if self.params_ is not None: return ( f"CCP(n_states={self.n_states}, n_actions={self.n_actions}, " f"discount={self.discount}, num_policy_iterations={self.num_policy_iterations}, " f"fitted=True)" ) return ( f"CCP(n_states={self.n_states}, n_actions={self.n_actions}, " f"discount={self.discount}, num_policy_iterations={self.num_policy_iterations}, " f"fitted=False)" )