"""Sklearn-style TD-CCP estimator for dynamic discrete choice models.
This module provides a TDCCP class with a scikit-learn style API that wraps
the underlying TDCCPEstimator from econirl.estimation.td_ccp. It accepts pandas
DataFrames with column names instead of the low-level Panel API.
TD-CCP (Temporal Difference CCP) is the neural extension of CCP. Instead of
matrix inversion, it trains neural EV component networks via semi-gradient
TD learning. The approach combines CCP estimation with approximate value
iteration (AVI), decomposing the expected value function into per-feature
components each learned by a separate MLP.
The killer feature of TD-CCP is the ``ev_features_`` attribute after fitting,
which shows how much of the continuation value comes from each structural
feature -- providing interpretable decomposition of forward-looking behavior.
Example:
>>> from econirl.estimators import TDCCP
>>> import pandas as pd
>>>
>>> # Load bus replacement data
>>> df = pd.read_csv("zurcher_bus.csv")
>>>
>>> # Create estimator and fit
>>> model = TDCCP(n_states=90, discount=0.9999, utility="linear_cost")
>>> model.fit(data=df, state="mileage_bin", action="replaced", id="bus_id")
>>>
>>> # Access results sklearn-style
>>> print(model.params_) # {"theta_c": 0.001, "RC": 9.35}
>>> print(model.summary())
>>>
>>> # Interpretable EV decomposition
>>> print(model.ev_features_) # shape (n_states, n_features)
>>>
>>> # Predict choice probabilities
>>> proba = model.predict_proba(states=np.array([0, 10, 50]))
"""
from __future__ import annotations
from typing import Literal
import jax.numpy as jnp
import numpy as np
import pandas as pd
from scipy.stats import norm as scipy_norm
from econirl.core.reward_spec import RewardSpec
from econirl.core.types import DDCProblem, Panel, Trajectory, TrajectoryPanel
from econirl.estimation.td_ccp import TDCCPConfig, TDCCPEstimator
from econirl.preferences.linear import LinearUtility
from econirl.transitions import TransitionEstimator
[docs]
class TDCCP:
"""Sklearn-style TD-CCP estimator for dynamic discrete choice models.
TD-CCP (Temporal Difference CCP) estimates utility parameters using
neural network-based approximate value iteration combined with CCP
decomposition. Instead of matrix inversion (as in standard CCP), it
trains per-feature EV component networks via semi-gradient TD learning,
then uses the learned components in a partial MLE for structural
parameters.
This is particularly useful for large state spaces where matrix-based
CCP methods are computationally infeasible.
Parameters
----------
n_states : int, default=90
Number of discrete states (e.g., mileage bins).
n_actions : int, default=2
Number of discrete actions (e.g., keep/replace).
discount : float, default=0.9999
Time discount factor (beta).
utility : str or RewardSpec, default="linear_cost"
Utility specification. Pass ``"linear_cost"`` for the classic Rust
bus model (``u = -theta_c * s * (1-a) - RC * a``), or a
``RewardSpec`` for custom features.
se_method : str, default="asymptotic"
Method for computing standard errors. Options: "robust", "asymptotic".
hidden_dim : int, default=64
Number of hidden units per layer in the EV component networks.
num_hidden_layers : int, default=2
Number of hidden layers in the EV component networks.
avi_iterations : int, default=20
Number of approximate value iteration rounds.
epochs_per_avi : int, default=30
Number of SGD epochs per AVI iteration.
learning_rate : float, default=1e-3
Learning rate for neural network training.
batch_size : int, default=8192
Mini-batch size for SGD training.
n_policy_iterations : int, default=3
Number of NPL-style policy iterations.
verbose : bool, default=False
Whether to print progress messages during estimation.
Attributes
----------
params_ : dict
Estimated parameters after fitting. Keys are parameter names
(e.g., "theta_c", "RC") and values are point estimates.
se_ : dict
Standard errors for each parameter.
coef_ : numpy.ndarray
Coefficients as a numpy array (sklearn convention).
log_likelihood_ : float
Maximized log-likelihood value.
pvalues_ : dict
P-values for each parameter (from Wald t-test).
policy_ : numpy.ndarray
Estimated choice probabilities P(a|s) of shape (n_states, n_actions).
value_ : numpy.ndarray
Estimated value function V(s) of shape (n_states,).
ev_features_ : numpy.ndarray or None
Per-feature EV component values of shape (n_states, n_features).
Shows how much of the continuation value comes from each structural
feature. Available after fitting if the underlying estimator
includes them in metadata.
converged_ : bool
Whether the optimization converged.
reward_spec_ : RewardSpec
The reward specification used for estimation.
Examples
--------
>>> from econirl.estimators import TDCCP
>>> import pandas as pd
>>>
>>> df = pd.DataFrame({
... "bus_id": [0, 0, 1, 1],
... "mileage": [10, 20, 15, 30],
... "replaced": [0, 0, 0, 1],
... })
>>>
>>> model = TDCCP(n_states=90, hidden_dim=64, avi_iterations=20)
>>> model.fit(df, state="mileage", action="replaced", id="bus_id")
>>> print(model.params_)
"""
[docs]
def __init__(
self,
n_states: int = 90,
n_actions: int = 2,
discount: float = 0.9999,
utility: str | RewardSpec = "linear_cost",
se_method: Literal["robust", "asymptotic"] = "asymptotic",
# Method selection (new: Adusumilli-Eckardt 2025)
method: Literal["semigradient", "neural"] = "semigradient",
cross_fitting: bool = True,
robust_se: bool = True,
# Semi-gradient specific
basis_dim: int = 8,
basis_type: Literal["polynomial", "encoded", "tabular"] = "polynomial",
basis_include_rewards: bool = False,
basis_ridge: float = 1e-8,
basis_pinv_rcond: float | None = None,
# Neural AVI specific
hidden_dim: int = 64,
num_hidden_layers: int = 2,
avi_iterations: int = 20,
epochs_per_avi: int = 30,
learning_rate: float = 1e-3,
batch_size: int = 8192,
# CCP estimation
ccp_method: Literal["frequency", "logit"] = "frequency",
# NPL iteration (not in paper, optional)
n_policy_iterations: int = 1,
verbose: bool = False,
):
"""Initialize the TD-CCP estimator.
Parameters
----------
n_states : int, default=90
Number of discrete states.
n_actions : int, default=2
Number of discrete actions.
discount : float, default=0.9999
Time discount factor (beta).
utility : str or RewardSpec, default="linear_cost"
Utility specification to use.
se_method : str, default="asymptotic"
Method for computing standard errors.
method : str, default="semigradient"
TD method: "semigradient" (fast closed-form, eq 3.5) or
"neural" (AVI with neural networks, Algorithm 1).
cross_fitting : bool, default=True
Use 2-fold cross-fitting (Algorithm 2) for valid inference.
robust_se : bool, default=True
Compute locally robust standard errors (Section 4).
basis_dim : int, default=8
Number of polynomial basis functions for semi-gradient method.
basis_type : str, default="polynomial"
Semi-gradient basis: "polynomial", "encoded", or "tabular".
basis_include_rewards : bool, default=False
Include reward features in the encoded semi-gradient basis.
basis_ridge : float, default=1e-8
Ridge stabilization for the semi-gradient normal equation.
basis_pinv_rcond : float, optional
Pseudoinverse cutoff for nearly singular semi-gradient bases.
hidden_dim : int, default=64
Hidden units per layer in EV component networks.
num_hidden_layers : int, default=2
Number of hidden layers in EV component networks.
avi_iterations : int, default=20
Number of approximate value iteration rounds.
epochs_per_avi : int, default=30
Number of SGD epochs per AVI iteration.
learning_rate : float, default=1e-3
Learning rate for neural network training.
batch_size : int, default=8192
Mini-batch size for SGD training.
ccp_method : str, default="frequency"
CCP estimation: "frequency" or "logit".
n_policy_iterations : int, default=1
Number of NPL-style policy iterations. Paper uses 1.
verbose : bool, default=False
Whether to print progress messages.
"""
self.n_states = n_states
self.n_actions = n_actions
self.discount = discount
self.utility = utility
self.se_method = se_method
self.method = method
self.cross_fitting = cross_fitting
self.robust_se = robust_se
self.basis_dim = basis_dim
self.basis_type = basis_type
self.basis_include_rewards = basis_include_rewards
self.basis_ridge = basis_ridge
self.basis_pinv_rcond = basis_pinv_rcond
self.hidden_dim = hidden_dim
self.num_hidden_layers = num_hidden_layers
self.avi_iterations = avi_iterations
self.epochs_per_avi = epochs_per_avi
self.learning_rate = learning_rate
self.batch_size = batch_size
self.ccp_method = ccp_method
self.n_policy_iterations = n_policy_iterations
self.verbose = verbose
# Fitted attributes (set after fit())
self.params_: dict[str, float] | None = None
self.se_: dict[str, float] | None = None
self.pvalues_: dict[str, float] | None = None
self.coef_: np.ndarray | None = None
self.log_likelihood_: float | None = None
self.value_: np.ndarray | None = None
self.policy_: np.ndarray | None = None
self.transitions_: np.ndarray | None = None
self.converged_: bool | None = None
self.reward_spec_: RewardSpec | None = None
self.ev_features_: np.ndarray | None = None
# Internal storage
self._result = None
self._panel = None
self._utility_fn = None
self._problem = None
[docs]
def fit(
self,
data: pd.DataFrame | Panel | TrajectoryPanel,
state: str | None = None,
action: str | None = None,
id: str | None = None,
transitions: np.ndarray | None = None,
reward: RewardSpec | None = None,
) -> "TDCCP":
"""Fit the TD-CCP estimator to data.
Parameters
----------
data : pandas.DataFrame or Panel or TrajectoryPanel
Panel data with observations. When a DataFrame is passed,
``state``, ``action``, and ``id`` column names are required.
When a Panel/TrajectoryPanel is passed, column names are ignored.
state : str, optional
Column name for the state variable (required for DataFrame input).
action : str, optional
Column name for the action variable (required for DataFrame input).
id : str, optional
Column name for the individual identifier (required for DataFrame
input).
transitions : numpy.ndarray, optional
Pre-estimated transition matrix of shape (n_states, n_states).
If None, transitions are estimated from the data.
reward : RewardSpec, optional
Reward/utility specification. If provided, overrides the
``utility`` parameter passed at construction time.
Returns
-------
self : TDCCP
Returns self for method chaining.
"""
# Resolve reward spec: explicit argument > constructor parameter
reward_spec = reward if reward is not None else self.utility
# --- Handle data: DataFrame or Panel/TrajectoryPanel ---
if isinstance(data, pd.DataFrame):
if state is None or action is None or id is None:
raise ValueError(
"state, action, and id column names are required "
"when data is a DataFrame"
)
self._panel = TrajectoryPanel.from_dataframe(
data, state=state, action=action, id=id
)
elif isinstance(data, (Panel, TrajectoryPanel)):
self._panel = data
else:
raise TypeError(
f"data must be a DataFrame, Panel, or TrajectoryPanel, "
f"got {type(data)}"
)
# --- Handle reward: RewardSpec or string ---
if isinstance(reward_spec, RewardSpec):
self.reward_spec_ = reward_spec
self._utility_fn = reward_spec.to_linear_utility()
elif reward_spec == "linear_cost":
self._utility_fn = self._create_utility()
# Also create RewardSpec from the utility for consistency
self.reward_spec_ = RewardSpec(
self._utility_fn.feature_matrix,
self._utility_fn.parameter_names,
)
else:
raise ValueError(f"Unknown reward/utility specification: {reward_spec}")
# Estimate transitions if not provided
if transitions is None:
trans_estimator = TransitionEstimator(
n_states=self.n_states,
max_increase=2,
)
trans_estimator.fit(self._panel)
self.transitions_ = trans_estimator.matrix_
else:
self.transitions_ = np.asarray(transitions)
# Build full transition matrices (for both actions)
transition_tensor = self._build_transition_tensor(self.transitions_)
# Create problem specification with state encoder for neural network
self._problem = DDCProblem(
num_states=self.n_states,
num_actions=self.n_actions,
discount_factor=self.discount,
scale_parameter=1.0,
state_dim=1,
state_encoder=lambda s: jnp.expand_dims(jnp.asarray(s, dtype=jnp.float32) / max(self.n_states - 1, 1), axis=-1),
)
# Create the underlying TD-CCP estimator with all config options
config = TDCCPConfig(
method=self.method,
basis_dim=self.basis_dim,
basis_type=self.basis_type,
basis_include_rewards=self.basis_include_rewards,
basis_ridge=self.basis_ridge,
basis_pinv_rcond=self.basis_pinv_rcond,
hidden_dim=self.hidden_dim,
num_hidden_layers=self.num_hidden_layers,
avi_iterations=self.avi_iterations,
epochs_per_avi=self.epochs_per_avi,
learning_rate=self.learning_rate,
batch_size=self.batch_size,
ccp_method=self.ccp_method,
cross_fitting=self.cross_fitting,
robust_se=self.robust_se,
n_policy_iterations=self.n_policy_iterations,
compute_se=True,
verbose=self.verbose,
)
estimator = TDCCPEstimator(config=config, se_method=self.se_method)
# Run estimation
self._result = estimator.estimate(
panel=self._panel,
utility=self._utility_fn,
problem=self._problem,
transitions=transition_tensor,
)
# Extract results
self._extract_results()
return self
def _build_transition_tensor(self, keep_transitions: np.ndarray) -> jnp.ndarray:
"""Build full transition tensor for both actions."""
n = self.n_states
transitions = np.zeros((self.n_actions, n, n), dtype=np.float32)
transitions[0] = np.asarray(keep_transitions, dtype=np.float32)
for s in range(n):
transitions[1, s, :] = transitions[0, 0, :]
return jnp.array(transitions)
def _create_utility(self) -> LinearUtility:
"""Create utility function for estimation."""
if self.utility != "linear_cost":
raise ValueError(f"Unknown utility specification: {self.utility}")
n = self.n_states
features = jnp.zeros((n, self.n_actions, 2))
mileage = jnp.arange(n, dtype=jnp.float32)
features = features.at[:, 0, 0].set(-mileage)
features = features.at[:, 1, 1].set(-1.0)
return LinearUtility(
feature_matrix=features,
parameter_names=["theta_c", "RC"],
)
def _extract_results(self) -> None:
"""Extract results from estimation into sklearn-style attributes."""
if self._result is None:
return
# Parameter estimates
params = np.asarray(self._result.parameters)
param_names = self._result.parameter_names
self.params_ = {name: float(val) for name, val in zip(param_names, params)}
self.coef_ = params.copy()
# Standard errors
se = np.asarray(self._result.standard_errors)
self.se_ = {name: float(val) for name, val in zip(param_names, se)}
# Other attributes
self.log_likelihood_ = float(self._result.log_likelihood)
self.converged_ = bool(self._result.converged)
if self._result.value_function is not None:
self.value_ = np.asarray(self._result.value_function)
# Policy matrix
if self._result.policy is not None:
self.policy_ = np.asarray(self._result.policy)
# p-values from t-statistics
if self.se_ is not None:
pvalues: dict[str, float] = {}
for name in self.params_:
se_val = self.se_[name]
if se_val and se_val > 0:
t_stat = self.params_[name] / se_val
pvalues[name] = float(
2 * (1 - scipy_norm.cdf(abs(t_stat)))
)
else:
pvalues[name] = float("nan")
self.pvalues_ = pvalues
# TD-CCP specific: EV feature components
if self._result.metadata:
ev = self._result.metadata.get("ev_features")
if ev is not None:
self.ev_features_ = np.asarray(ev)
@property
def reward_matrix_(self) -> np.ndarray | None:
"""Structural reward matrix R(s,a) of shape (n_states, n_actions).
Computes the utility matrix from the fitted parameters and the
feature specification. Returns None if the model has not been fitted.
"""
if self.params_ is None or self._utility_fn is None or self._result is None:
return None
param_names = self._result.parameter_names
param_vector = jnp.array(
[self.params_[name] for name in param_names],
dtype=jnp.float32,
)
utility_matrix = self._utility_fn.compute(param_vector)
return np.asarray(utility_matrix)
[docs]
def summary(self) -> str:
"""Generate a formatted summary of estimation results.
Returns
-------
str
Human-readable summary of the estimation.
"""
if self._result is None:
return "TD-CCP: Not fitted yet. Call fit() first."
return self._result.summary()
[docs]
def conf_int(self, alpha: float = 0.05) -> dict:
"""Compute confidence intervals for parameters.
Parameters
----------
alpha : float, default=0.05
Significance level. Returns (1 - alpha) confidence intervals.
Returns
-------
dict
``{param_name: (lower, upper)}`` confidence intervals.
Raises
------
RuntimeError
If the model has not been fitted yet.
"""
if self.params_ is None or self.se_ is None:
raise RuntimeError("Model not fitted. Call fit() first.")
z = scipy_norm.ppf(1 - alpha / 2)
intervals: dict[str, tuple[float, float]] = {}
for name in self.params_:
est = self.params_[name]
se = self.se_[name]
intervals[name] = (est - z * se, est + z * se)
return intervals
[docs]
def predict_proba(self, states: np.ndarray) -> np.ndarray:
"""Predict choice probabilities for given states.
Parameters
----------
states : numpy.ndarray
Array of state indices.
Returns
-------
numpy.ndarray
Choice probabilities of shape (len(states), n_actions).
Each row sums to 1.
"""
if self._result is None:
raise RuntimeError("Model not fitted. Call fit() first.")
states = np.asarray(states, dtype=np.int64)
# Get policy (choice probabilities) from result
policy = np.asarray(self._result.policy)
# Index into the policy for the requested states
proba = policy[states]
return proba
def __repr__(self) -> str:
if self.params_ is not None:
return (
f"TDCCP(n_states={self.n_states}, n_actions={self.n_actions}, "
f"discount={self.discount}, fitted=True)"
)
return (
f"TDCCP(n_states={self.n_states}, n_actions={self.n_actions}, "
f"discount={self.discount}, fitted=False)"
)