Source code for econirl.estimators.nfxp

"""Sklearn-style NFXP estimator for dynamic discrete choice models.

This module provides an NFXP class with a scikit-learn style API that wraps
the underlying NFXPEstimator from econirl.estimation.nfxp. It accepts pandas
DataFrames with column names instead of the low-level Panel API.

Example:
    >>> from econirl.estimators import NFXP
    >>> import pandas as pd
    >>>
    >>> # Load bus replacement data
    >>> df = pd.read_csv("zurcher_bus.csv")
    >>>
    >>> # Create estimator and fit
    >>> model = NFXP(n_states=90, discount=0.9999, utility="linear_cost")
    >>> model.fit(data=df, state="mileage_bin", action="replaced", id="bus_id")
    >>>
    >>> # Access results sklearn-style
    >>> print(model.params_)        # {"theta_c": 0.001, "RC": 9.35}
    >>> print(model.coef_)          # numpy array [0.001, 9.35]
    >>> print(model.log_likelihood_)
    >>> print(model.summary())
    >>>
    >>> # Predict choice probabilities
    >>> proba = model.predict_proba(states=np.array([0, 10, 50]))
    >>>
    >>> # Simulate new data under estimated policy
    >>> sim_df = model.simulate(n_agents=100, n_periods=50, seed=42)
    >>>
    >>> # Counterfactual analysis: what if RC was higher?
    >>> cf_result = model.counterfactual(RC=15.0)
    >>> print(cf_result.policy)  # New policy under higher replacement cost
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Literal

import numpy as np
import pandas as pd
from scipy.stats import norm as scipy_norm

from econirl.core.bellman import SoftBellmanOperator
from econirl.core.reward_spec import RewardSpec
from econirl.core.solvers import value_iteration
from econirl.core.types import DDCProblem, Panel, Trajectory, TrajectoryPanel
from econirl.estimation.nfxp import NFXPEstimator
from econirl.preferences.linear import LinearUtility
from econirl.transitions import TransitionEstimator


@dataclass
class CounterfactualResult:
    """Result of a counterfactual policy analysis.

    Contains the value function and policy computed under alternative
    parameter values, enabling "what if" analysis after estimation.

    Attributes:
        params: Dictionary of parameter values used in the counterfactual.
                Contains both the modified parameters and the original
                estimated values for unchanged parameters.
        value_function: Value function V(s) under the counterfactual parameters,
                       shape (n_states,).
        policy: Choice probabilities P(a|s) under the counterfactual parameters,
               shape (n_states, n_actions). Each row sums to 1.

    Example:
        >>> # After fitting NFXP estimator
        >>> cf = estimator.counterfactual(RC=15.0)
        >>> print(f"Under RC=15.0, P(replace|state=50) = {cf.policy[50, 1]:.3f}")
    """

    params: dict[str, float]
    value_function: np.ndarray
    policy: np.ndarray


[docs] class NFXP: """Sklearn-style NFXP estimator for dynamic discrete choice models. The Nested Fixed Point (NFXP) algorithm estimates utility parameters by nesting the solution of the Bellman equation within likelihood maximization. This is the classic approach from Rust (1987, 1988). Parameters ---------- n_states : int, default=90 Number of discrete states (e.g., mileage bins). n_actions : int, default=2 Number of discrete actions (e.g., keep/replace). discount : float, default=0.9999 Time discount factor (beta). utility : str or RewardSpec, default="linear_cost" Utility specification. Pass ``"linear_cost"`` for the classic Rust bus model (``u = -theta_c * s * (1-a) - RC * a``), or a ``RewardSpec`` for custom features. se_method : str, default="robust" Method for computing standard errors. Options: "robust", "asymptotic". verbose : bool, default=False Whether to print progress messages during estimation. Attributes ---------- params_ : dict Estimated parameters after fitting. Keys are parameter names (e.g., "theta_c", "RC") and values are point estimates. se_ : dict Standard errors for each parameter. coef_ : numpy.ndarray Coefficients as a numpy array (sklearn convention). log_likelihood_ : float Maximized log-likelihood value. pvalues_ : dict P-values for each parameter (from Wald t-test). policy_ : numpy.ndarray Estimated choice probabilities P(a|s) of shape (n_states, n_actions). value_ : numpy.ndarray Estimated value function V(s) of shape (n_states,). value_function_ : numpy.ndarray Alias for ``value_`` (backward compatibility). transitions_ : numpy.ndarray Transition probability matrix (n_states x n_states). converged_ : bool Whether the optimization converged. reward_spec_ : RewardSpec The reward specification used for estimation. Examples -------- >>> from econirl.estimators import NFXP >>> import pandas as pd >>> >>> df = pd.DataFrame({ ... "bus_id": [0, 0, 1, 1], ... "mileage": [10, 20, 15, 30], ... "replaced": [0, 0, 0, 1], ... }) >>> >>> model = NFXP(n_states=90) >>> model.fit(df, state="mileage", action="replaced", id="bus_id") >>> print(model.params_) """
[docs] def __init__( self, n_states: int = 90, n_actions: int = 2, discount: float = 0.9999, utility: str | RewardSpec = "linear_cost", se_method: Literal["robust", "asymptotic"] = "robust", verbose: bool = False, ): """Initialize the NFXP estimator. Parameters ---------- n_states : int, default=90 Number of discrete states. n_actions : int, default=2 Number of discrete actions. discount : float, default=0.9999 Time discount factor (beta). utility : str or RewardSpec, default="linear_cost" Utility specification to use. Pass ``"linear_cost"`` for the classic Rust bus model, or a ``RewardSpec`` for custom features. se_method : str, default="robust" Method for computing standard errors. verbose : bool, default=False Whether to print progress messages. """ self.n_states = n_states self.n_actions = n_actions self.discount = discount self.utility = utility self.se_method = se_method self.verbose = verbose # Fitted attributes (set after fit()) self.params_: dict[str, float] | None = None self.se_: dict[str, float] | None = None self.pvalues_: dict[str, float] | None = None self.coef_: np.ndarray | None = None self.log_likelihood_: float | None = None self.value_function_: np.ndarray | None = None self.policy_: np.ndarray | None = None self.value_: np.ndarray | None = None self.transitions_: np.ndarray | None = None self.converged_: bool | None = None self.reward_spec_: RewardSpec | None = None # Internal storage self._result = None self._panel = None self._utility_fn = None self._problem = None
[docs] def fit( self, data: pd.DataFrame | Panel | TrajectoryPanel, state: str | None = None, action: str | None = None, id: str | None = None, transitions: np.ndarray | None = None, reward: RewardSpec | None = None, ) -> "NFXP": """Fit the NFXP estimator to data. Parameters ---------- data : pandas.DataFrame or Panel or TrajectoryPanel Panel data with observations. When a DataFrame is passed, ``state``, ``action``, and ``id`` column names are required. When a Panel/TrajectoryPanel is passed, column names are ignored. state : str, optional Column name for the state variable (required for DataFrame input). action : str, optional Column name for the action variable (required for DataFrame input). id : str, optional Column name for the individual identifier (required for DataFrame input). transitions : numpy.ndarray, optional Pre-estimated transition matrix of shape (n_states, n_states). If None, transitions are estimated from the data. reward : RewardSpec, optional Reward/utility specification. If provided, overrides the ``utility`` parameter passed at construction time. Returns ------- self : NFXP Returns self for method chaining. """ # Resolve reward spec: explicit argument > constructor parameter reward_spec = reward if reward is not None else self.utility # --- Handle data: DataFrame or Panel/TrajectoryPanel --- if isinstance(data, pd.DataFrame): if state is None or action is None or id is None: raise ValueError( "state, action, and id column names are required " "when data is a DataFrame" ) self._panel = TrajectoryPanel.from_dataframe( data, state=state, action=action, id=id ) elif isinstance(data, (Panel, TrajectoryPanel)): self._panel = data else: raise TypeError( f"data must be a DataFrame, Panel, or TrajectoryPanel, " f"got {type(data)}" ) # --- Handle reward: RewardSpec or string --- if isinstance(reward_spec, RewardSpec): self.reward_spec_ = reward_spec self._utility_fn = reward_spec.to_linear_utility() elif reward_spec == "linear_cost": self._utility_fn = self._create_utility() # Also create RewardSpec from the utility for consistency self.reward_spec_ = RewardSpec( self._utility_fn.feature_matrix, self._utility_fn.parameter_names, ) else: raise ValueError(f"Unknown reward/utility specification: {reward_spec}") # Estimate transitions if not provided if transitions is None: trans_estimator = TransitionEstimator( n_states=self.n_states, max_increase=2, ) trans_estimator.fit(self._panel) self.transitions_ = trans_estimator.matrix_ else: self.transitions_ = np.asarray(transitions) # Build full transition matrices (for both actions) # Action 0 (keep): use estimated transitions # Action 1 (replace): reset to state 0, then apply transition transition_tensor = self._build_transition_tensor(self.transitions_) # Create problem specification self._problem = DDCProblem( num_states=self.n_states, num_actions=self.n_actions, discount_factor=self.discount, scale_parameter=1.0, ) # Create the underlying NFXP estimator estimator = NFXPEstimator( se_method=self.se_method, verbose=self.verbose, ) # Run estimation self._result = estimator.estimate( panel=self._panel, utility=self._utility_fn, problem=self._problem, transitions=transition_tensor, ) # Extract results self._extract_results() return self
def _dataframe_to_panel( self, data: pd.DataFrame, state: str, action: str, id: str, ) -> Panel: """Convert pandas DataFrame to Panel object. Parameters ---------- data : pandas.DataFrame Input data. state : str Column name for state. action : str Column name for action. id : str Column name for individual id. Returns ------- Panel Panel object suitable for estimation. """ trajectories = [] # Group by individual for ind_id, group in data.groupby(id, sort=True): # Sort by any time/period column if available, otherwise keep order sorted_group = group.sort_index() states = sorted_group[state].values.astype(np.int64) actions = sorted_group[action].values.astype(np.int64) # Compute next states # For the last observation, we need to handle it specially # We'll use the "wrap around" convention or just use the current state next_states = np.zeros_like(states) next_states[:-1] = states[1:] # For the last observation, apply transition logic based on action if len(states) > 0: last_state = states[-1] last_action = actions[-1] if last_action == 1: # Replace -> reset to 0 next_states[-1] = 0 else: # Keep -> stay at same state (conservative) next_states[-1] = min(last_state + 1, self.n_states - 1) traj = Trajectory( states=np.array(states, dtype=np.int64), actions=np.array(actions, dtype=np.int64), next_states=np.array(next_states, dtype=np.int64), individual_id=ind_id, ) trajectories.append(traj) return Panel(trajectories=trajectories) def _build_transition_tensor(self, keep_transitions: np.ndarray) -> np.ndarray: """Build full transition tensor for both actions. Parameters ---------- keep_transitions : numpy.ndarray Transition matrix for action=0 (keep), shape (n_states, n_states). Returns ------- numpy.ndarray Transition tensor of shape (n_actions, n_states, n_states). """ n = self.n_states transitions = np.zeros((self.n_actions, n, n), dtype=np.float32) # Action 0 (keep): use provided transitions transitions[0] = np.array(keep_transitions, dtype=np.float32) # Action 1 (replace): reset to state 0, then transition # After replacement, start at state 0 and apply the same transition # The row for state 0 in keep_transitions tells us where we go from 0 for s in range(n): # After replacement from any state, we transition as if from state 0 transitions[1, s, :] = transitions[0, 0, :] return transitions def _create_utility(self) -> LinearUtility: """Create utility function for estimation. Returns ------- LinearUtility Utility function with appropriate features. """ if self.utility != "linear_cost": raise ValueError(f"Unknown utility specification: {self.utility}") # Build feature matrix for linear cost utility # U(s, keep) = -theta_c * s # U(s, replace) = -RC n = self.n_states features = np.zeros((n, self.n_actions, 2), dtype=np.float32) mileage = np.arange(n, dtype=np.float32) # Keep action (a=0): feature = [-s, 0] features[:, 0, 0] = -mileage features[:, 0, 1] = 0.0 # Replace action (a=1): feature = [0, -1] features[:, 1, 0] = 0.0 features[:, 1, 1] = -1.0 return LinearUtility( feature_matrix=features, parameter_names=["theta_c", "RC"], ) def _extract_results(self) -> None: """Extract results from estimation into sklearn-style attributes.""" if self._result is None: return # Parameter estimates params = np.asarray(self._result.parameters) param_names = self._result.parameter_names self.params_ = {name: float(val) for name, val in zip(param_names, params)} self.coef_ = params.copy() # Standard errors se = np.asarray(self._result.standard_errors) self.se_ = {name: float(val) for name, val in zip(param_names, se)} # Other attributes self.log_likelihood_ = float(self._result.log_likelihood) self.converged_ = bool(self._result.converged) if self._result.value_function is not None: self.value_function_ = np.asarray(self._result.value_function) self.value_ = self.value_function_ # Policy matrix if self._result.policy is not None: self.policy_ = np.asarray(self._result.policy) # p-values from t-statistics if self.se_ is not None: pvalues: dict[str, float] = {} for name in self.params_: se_val = self.se_[name] if se_val and se_val > 0: t_stat = self.params_[name] / se_val pvalues[name] = float( 2 * (1 - scipy_norm.cdf(abs(t_stat))) ) else: pvalues[name] = float("nan") self.pvalues_ = pvalues @property def reward_matrix_(self) -> np.ndarray | None: """Structural reward matrix R(s,a) of shape (n_states, n_actions). Computes the utility matrix from the fitted parameters and the feature specification. Returns None if the model has not been fitted. """ if self.params_ is None or self._utility_fn is None or self._result is None: return None param_names = self._result.parameter_names param_vector = np.array( [self.params_[name] for name in param_names], dtype=np.float32, ) utility_matrix = self._utility_fn.compute(param_vector) return np.asarray(utility_matrix)
[docs] def summary(self) -> str: """Generate a formatted summary of estimation results. Returns ------- str Human-readable summary of the estimation. """ if self._result is None: return "NFXP: Not fitted yet. Call fit() first." return self._result.summary()
[docs] def conf_int(self, alpha: float = 0.05) -> dict: """Compute confidence intervals for parameters. Parameters ---------- alpha : float, default=0.05 Significance level. Returns (1 - alpha) confidence intervals. Returns ------- dict ``{param_name: (lower, upper)}`` confidence intervals. Raises ------ RuntimeError If the model has not been fitted yet. """ if self.params_ is None or self.se_ is None: raise RuntimeError("Model not fitted. Call fit() first.") z = scipy_norm.ppf(1 - alpha / 2) intervals: dict[str, tuple[float, float]] = {} for name in self.params_: est = self.params_[name] se = self.se_[name] intervals[name] = (est - z * se, est + z * se) return intervals
[docs] def predict_proba(self, states: np.ndarray) -> np.ndarray: """Predict choice probabilities for given states. Parameters ---------- states : numpy.ndarray Array of state indices. Returns ------- numpy.ndarray Choice probabilities of shape (len(states), n_actions). Each row sums to 1. """ if self._result is None: raise RuntimeError("Model not fitted. Call fit() first.") states = np.asarray(states, dtype=np.int64) # Get policy (choice probabilities) from result policy = np.asarray(self._result.policy) # Index into the policy for the requested states proba = policy[states] return proba
[docs] def simulate( self, n_agents: int, n_periods: int, seed: int | None = None, ) -> pd.DataFrame: """Simulate choices under the estimated policy. Generates synthetic data by simulating agents making decisions according to the fitted model. Each agent starts at state 0 and evolves according to the estimated transition probabilities and choice probabilities. Parameters ---------- n_agents : int Number of agents to simulate. n_periods : int Number of time periods per agent. seed : int, optional Random seed for reproducibility. Returns ------- pandas.DataFrame DataFrame with columns: - agent_id: Identifier for each agent (0 to n_agents-1) - period: Time period (0 to n_periods-1) - state: State at the beginning of the period - action: Action taken (sampled from estimated policy) Raises ------ RuntimeError If the model has not been fitted yet. Examples -------- >>> model = NFXP(n_states=90) >>> model.fit(data, state="mileage", action="replaced", id="bus_id") >>> sim_data = model.simulate(n_agents=100, n_periods=50, seed=42) >>> print(sim_data.head()) """ if self._result is None: raise RuntimeError("Model not fitted. Call fit() first.") # Set random seed if seed is not None: np.random.seed(seed) # Get the policy (choice probabilities) policy = np.asarray(self._result.policy) # shape: (n_states, n_actions) # Get transition probabilities (for action 0 = keep) transitions = self.transitions_ # shape: (n_states, n_states) # Storage for results data = [] for agent_id in range(n_agents): state = 0 # All agents start at state 0 for period in range(n_periods): # Sample action from policy action_probs = policy[state] action = np.random.choice(self.n_actions, p=action_probs) # Record observation data.append({ "agent_id": agent_id, "period": period, "state": state, "action": action, }) # Transition to next state if action == 1: # Replace: reset to state 0, then transition state = 0 trans_probs = transitions[0] else: # Keep: use transition from current state trans_probs = transitions[state] state = np.random.choice(self.n_states, p=trans_probs) return pd.DataFrame(data)
[docs] def counterfactual(self, **param_changes) -> CounterfactualResult: """Compute outcomes under different parameter values. Performs counterfactual analysis by solving the dynamic programming problem under alternative parameter values. This enables "what if" questions like "what would the policy be if RC was 15 instead of 10?" Parameters ---------- **param_changes : float Keyword arguments specifying parameter changes. Keys must be valid parameter names (e.g., "theta_c", "RC"). Values are the counterfactual parameter values. Returns ------- CounterfactualResult Object containing: - params: Dictionary of all parameter values used - value_function: V(s) under new parameters - policy: P(a|s) under new parameters Raises ------ RuntimeError If the model has not been fitted yet. ValueError If an unknown parameter name is provided. Examples -------- >>> model = NFXP(n_states=90) >>> model.fit(data, state="mileage", action="replaced", id="bus_id") >>> >>> # What if replacement cost was higher? >>> cf = model.counterfactual(RC=15.0) >>> print(f"Original RC: {model.params_['RC']:.2f}") >>> print(f"Counterfactual RC: {cf.params['RC']:.2f}") >>> print(f"P(replace|state=50) changes from " ... f"{model.predict_proba(np.array([50]))[0,1]:.3f} to " ... f"{cf.policy[50,1]:.3f}") >>> >>> # Multiple parameter changes >>> cf2 = model.counterfactual(RC=15.0, theta_c=0.05) """ if self._result is None: raise RuntimeError("Model not fitted. Call fit() first.") # Check for invalid parameter names valid_params = set(self.params_.keys()) for param_name in param_changes: if param_name not in valid_params: raise ValueError( f"Unknown parameter '{param_name}'. " f"Valid parameters are: {sorted(valid_params)}" ) # Build counterfactual parameter dictionary cf_params = self.params_.copy() cf_params.update(param_changes) # Build parameter vector in correct order param_names = self._result.parameter_names param_vector = np.array( [cf_params[name] for name in param_names], dtype=np.float32, ) # Compute utility matrix with new parameters utility_matrix = self._utility_fn.compute(param_vector) # Build transition tensor transition_tensor = self._build_transition_tensor(self.transitions_) # Create Bellman operator operator = SoftBellmanOperator( problem=self._problem, transitions=transition_tensor, ) # Solve for new value function and policy result = value_iteration(operator, utility_matrix) return CounterfactualResult( params=cf_params, value_function=np.asarray(result.V), policy=np.asarray(result.policy), )
def __repr__(self) -> str: if self.params_ is not None: return ( f"NFXP(n_states={self.n_states}, n_actions={self.n_actions}, " f"discount={self.discount}, fitted=True)" ) return ( f"NFXP(n_states={self.n_states}, n_actions={self.n_actions}, " f"discount={self.discount}, fitted=False)" )