Source code for econirl.estimators.max_margin_irl

"""Sklearn-style Max Margin IRL estimator.

This module provides a MaxMarginIRL class with a scikit-learn style API that wraps
the underlying MaxMarginIRLEstimator from econirl.contrib.max_margin_irl. It
implements the Abbeel & Ng (2004) apprenticeship learning algorithm.

Example:
    >>> from econirl.estimators import MaxMarginIRL
    >>> import pandas as pd
    >>>
    >>> # Load expert demonstration data
    >>> df = pd.read_csv("expert_demos.csv")
    >>>
    >>> # Create estimator and fit
    >>> model = MaxMarginIRL(n_states=90, n_actions=2, discount=0.99)
    >>> model.fit(data=df, state="state", action="action", id="agent_id")
    >>>
    >>> # Access results sklearn-style
    >>> print(model.params_)        # {"feature_0": 0.5, "feature_1": -0.3, ...}
    >>> print(model.reward_)        # R(s) array for each state
    >>> print(model.summary())
    >>>
    >>> # Predict choice probabilities
    >>> proba = model.predict_proba(states=np.array([0, 10, 50]))
"""

from __future__ import annotations

from typing import Literal

import numpy as np
import pandas as pd
from scipy.stats import norm as scipy_norm

from econirl.core.types import DDCProblem, Panel, Trajectory
from econirl.contrib.max_margin_irl import MaxMarginIRLEstimator
from econirl.preferences.reward import LinearReward
from econirl.transitions import TransitionEstimator


[docs] class MaxMarginIRL: """Sklearn-style Max Margin IRL estimator (Abbeel & Ng 2004). Maximum Margin Inverse Reinforcement Learning finds reward weights that make the expert policy better than any other policy by a margin. It uses an iterative constraint generation approach to solve a quadratic program. This is useful when you have expert demonstrations and want to recover the underlying reward function that explains the behavior. Parameters ---------- n_states : int, default=90 Number of discrete states. n_actions : int, default=2 Number of discrete actions. discount : float, default=0.99 Time discount factor (gamma). n_features : int, default=None Number of reward features. If None, uses n_states (one-hot encoding). features : numpy.ndarray, optional State feature matrix of shape (n_states, n_features). If None, uses identity matrix (one-hot state features). feature_names : list[str], optional Names for each feature. If None, uses "feature_0", "feature_1", etc. max_iterations : int, default=50 Maximum constraint generation iterations. margin_tol : float, default=1e-4 Convergence tolerance on margin improvement. se_method : str, default="asymptotic" Method for computing standard errors. verbose : bool, default=False Whether to print progress messages during estimation. Attributes ---------- params_ : dict Estimated reward weights after fitting. Keys are feature names and values are point estimates. se_ : dict Standard errors for each parameter. coef_ : numpy.ndarray Coefficients as a numpy array (sklearn convention). reward_ : numpy.ndarray Recovered reward R(s) for each state, shape (n_states,). margin_ : float Achieved margin between expert and best alternative policy. value_function_ : numpy.ndarray Estimated value function V(s) for each state. transitions_ : numpy.ndarray Transition probability matrix (n_states x n_states). converged_ : bool Whether the optimization converged. Examples -------- >>> from econirl.estimators import MaxMarginIRL >>> import pandas as pd >>> >>> df = pd.DataFrame({ ... "agent_id": [0, 0, 1, 1], ... "state": [10, 20, 15, 30], ... "action": [0, 0, 0, 1], ... }) >>> >>> model = MaxMarginIRL(n_states=90, n_actions=2) >>> model.fit(df, state="state", action="action", id="agent_id") >>> print(model.reward_) # Recovered reward for each state References ---------- Abbeel, P., & Ng, A. Y. (2004). "Apprenticeship learning via inverse reinforcement learning." In Proceedings of ICML. """
[docs] def __init__( self, n_states: int = 90, n_actions: int = 2, discount: float = 0.99, n_features: int | None = None, features: np.ndarray | None = None, feature_names: list[str] | None = None, max_iterations: int = 50, margin_tol: float = 1e-4, se_method: Literal["asymptotic"] = "asymptotic", verbose: bool = False, ): """Initialize the MaxMarginIRL estimator. Parameters ---------- n_states : int, default=90 Number of discrete states. n_actions : int, default=2 Number of discrete actions. discount : float, default=0.99 Time discount factor. n_features : int, optional Number of reward features. If None, defaults to n_states. features : numpy.ndarray, optional State feature matrix (n_states, n_features). If None, uses identity. feature_names : list[str], optional Names for each feature. max_iterations : int, default=50 Maximum constraint generation iterations. margin_tol : float, default=1e-4 Convergence tolerance on margin. se_method : str, default="asymptotic" Method for computing standard errors. verbose : bool, default=False Whether to print progress messages. """ self.n_states = n_states self.n_actions = n_actions self.discount = discount self.n_features = n_features if n_features is not None else n_states self.features = features self.feature_names = feature_names self.max_iterations = max_iterations self.margin_tol = margin_tol self.se_method = se_method self.verbose = verbose # Fitted attributes (set after fit()) self.params_: dict[str, float] | None = None self.se_: dict[str, float] | None = None self.pvalues_: dict[str, float] | None = None self.coef_: np.ndarray | None = None self.reward_: np.ndarray | None = None self.margin_: float | None = None self.value_function_: np.ndarray | None = None self.policy_: np.ndarray | None = None self.transitions_: np.ndarray | None = None self.converged_: bool | None = None # Internal storage self._result = None self._panel = None self._reward_fn = None self._problem = None
[docs] def fit( self, data: pd.DataFrame, state: str, action: str, id: str, transitions: np.ndarray | None = None, ) -> "MaxMarginIRL": """Fit the MaxMarginIRL estimator to expert demonstration data. Parameters ---------- data : pandas.DataFrame Panel data with expert demonstrations. Must contain columns for state, action, and individual id. state : str Column name for the state variable. action : str Column name for the action variable. id : str Column name for the individual/agent identifier. transitions : numpy.ndarray, optional Pre-estimated transition matrix of shape (n_states, n_states). If None, transitions are estimated from the data. Returns ------- self : MaxMarginIRL Returns self for method chaining. """ # Convert DataFrame to Panel self._panel = self._dataframe_to_panel(data, state, action, id) # Estimate transitions if not provided if transitions is None: trans_estimator = TransitionEstimator( n_states=self.n_states, max_increase=2, ) trans_estimator.fit(self._panel) self.transitions_ = trans_estimator.matrix_ else: self.transitions_ = np.asarray(transitions) # Build full transition matrices (for both actions) transition_tensor = self._build_transition_tensor(self.transitions_) # Create problem specification self._problem = DDCProblem( num_states=self.n_states, num_actions=self.n_actions, discount_factor=self.discount, scale_parameter=1.0, ) # Create reward function with state features self._reward_fn = self._create_reward() # Create the underlying MaxMarginIRL estimator estimator = MaxMarginIRLEstimator( se_method=self.se_method, max_iterations=self.max_iterations, margin_tol=self.margin_tol, verbose=self.verbose, ) # Run estimation self._result = estimator.estimate( panel=self._panel, utility=self._reward_fn, # LinearReward is a UtilityFunction problem=self._problem, transitions=transition_tensor, ) # Extract results self._extract_results() return self
def _dataframe_to_panel( self, data: pd.DataFrame, state: str, action: str, id: str, ) -> Panel: """Convert pandas DataFrame to Panel object. Parameters ---------- data : pandas.DataFrame Input data. state : str Column name for state. action : str Column name for action. id : str Column name for individual id. Returns ------- Panel Panel object suitable for estimation. """ trajectories = [] # Group by individual for ind_id, group in data.groupby(id, sort=True): # Sort by any time/period column if available, otherwise keep order sorted_group = group.sort_index() states = sorted_group[state].values.astype(np.int64) actions = sorted_group[action].values.astype(np.int64) # Compute next states next_states = np.zeros_like(states) next_states[:-1] = states[1:] # For the last observation, apply transition logic based on action if len(states) > 0: last_state = states[-1] last_action = actions[-1] if last_action == 1: # Replace -> reset to 0 next_states[-1] = 0 else: # Keep -> stay at same state (conservative) next_states[-1] = min(last_state + 1, self.n_states - 1) traj = Trajectory( states=np.array(states, dtype=np.int64), actions=np.array(actions, dtype=np.int64), next_states=np.array(next_states, dtype=np.int64), individual_id=ind_id, ) trajectories.append(traj) return Panel(trajectories=trajectories) def _build_transition_tensor(self, keep_transitions: np.ndarray) -> np.ndarray: """Build full transition tensor for both actions. Parameters ---------- keep_transitions : numpy.ndarray Transition matrix for action=0 (keep), shape (n_states, n_states). Returns ------- numpy.ndarray Transition tensor of shape (n_actions, n_states, n_states). """ n = self.n_states transitions = np.zeros((self.n_actions, n, n), dtype=np.float32) # Action 0 (keep): use provided transitions transitions[0] = np.array(keep_transitions, dtype=np.float32) # Action 1 (replace): reset to state 0, then transition # After replacement, start at state 0 and apply the same transition for s in range(n): # After replacement from any state, we transition as if from state 0 transitions[1, s, :] = transitions[0, 0, :] return transitions def _create_reward(self) -> LinearReward: """Create reward function for estimation. Returns ------- LinearReward Reward function with appropriate state features. """ # Build feature matrix if self.features is not None: # Use provided features feature_matrix = np.array(self.features, dtype=np.float32) if feature_matrix.shape[0] != self.n_states: raise ValueError( f"features must have {self.n_states} rows, " f"got {feature_matrix.shape[0]}" ) n_features = feature_matrix.shape[1] else: # Use identity (one-hot state encoding) if n_features == n_states # Otherwise use random features (for dimensionality reduction) if self.n_features == self.n_states: feature_matrix = np.eye(self.n_states, dtype=np.float32) else: # Use simple state-dependent features # e.g., polynomial features of normalized state feature_matrix = np.zeros( (self.n_states, self.n_features), dtype=np.float32 ) states_normalized = np.arange( self.n_states, dtype=np.float32 ) / (self.n_states - 1) for k in range(self.n_features): # Polynomial features: s^0, s^1, s^2, ... feature_matrix[:, k] = states_normalized ** k n_features = self.n_features # Create parameter names if self.feature_names is not None: if len(self.feature_names) != n_features: raise ValueError( f"feature_names must have {n_features} elements, " f"got {len(self.feature_names)}" ) param_names = list(self.feature_names) else: param_names = [f"feature_{i}" for i in range(n_features)] return LinearReward( state_features=feature_matrix, parameter_names=param_names, n_actions=self.n_actions, ) def _extract_results(self) -> None: """Extract results from estimation into sklearn-style attributes.""" if self._result is None: return # Parameter estimates (reward weights) params = np.asarray(self._result.parameters) param_names = self._result.parameter_names self.params_ = {name: float(val) for name, val in zip(param_names, params)} self.coef_ = params.copy() # Standard errors se = np.asarray(self._result.standard_errors) self.se_ = {name: float(val) for name, val in zip(param_names, se)} # Compute reward for each state: R(s) = theta * phi(s) theta = self._result.parameters reward_matrix = self._reward_fn.compute(theta) # (n_states, n_actions) self.reward_ = np.asarray(reward_matrix[:, 0]) # Same for all actions # Other attributes self.converged_ = bool(self._result.converged) # Get margin from metadata if available if self._result.metadata and "margin" in self._result.metadata: self.margin_ = float(self._result.metadata["margin"]) if self._result.value_function is not None: self.value_function_ = np.asarray(self._result.value_function) if self._result.policy is not None: self.policy_ = np.asarray(self._result.policy) # p-values from t-statistics if self.se_ is not None and self.params_ is not None: pvalues: dict[str, float] = {} for name in self.params_: se_val = self.se_[name] if se_val and se_val > 0: t_stat = self.params_[name] / se_val pvalues[name] = float( 2 * (1 - scipy_norm.cdf(abs(t_stat))) ) else: pvalues[name] = float("nan") self.pvalues_ = pvalues @property def value_(self) -> np.ndarray | None: """Value function V(s) of shape (n_states,).""" return self.value_function_ @property def reward_matrix_(self) -> np.ndarray | None: """Reward matrix R(s,a) of shape (n_states, n_actions). MaxMarginIRL learns a state-only reward R(s). This property broadcasts it to all actions so that the shape matches the protocol requirement. """ if self.reward_ is None: return None return np.tile(self.reward_[:, np.newaxis], (1, self.n_actions))
[docs] def conf_int(self, alpha: float = 0.05) -> dict: """Compute confidence intervals for parameters. Parameters ---------- alpha : float, default=0.05 Significance level. Returns (1 - alpha) confidence intervals. Returns ------- dict ``{param_name: (lower, upper)}`` confidence intervals. Raises ------ RuntimeError If the model has not been fitted yet. """ if self.params_ is None or self.se_ is None: raise RuntimeError("Model not fitted. Call fit() first.") z = scipy_norm.ppf(1 - alpha / 2) intervals: dict[str, tuple[float, float]] = {} for name in self.params_: est = self.params_[name] se = self.se_[name] intervals[name] = (est - z * se, est + z * se) return intervals
[docs] def summary(self) -> str: """Generate a formatted summary of estimation results. Returns ------- str Human-readable summary of the estimation. """ if self._result is None: return "MaxMarginIRL: Not fitted yet. Call fit() first." return self._result.summary()
[docs] def predict_proba(self, states: np.ndarray) -> np.ndarray: """Predict choice probabilities for given states. Parameters ---------- states : numpy.ndarray Array of state indices. Returns ------- numpy.ndarray Choice probabilities of shape (len(states), n_actions). Each row sums to 1. """ if self._result is None: raise RuntimeError("Model not fitted. Call fit() first.") states = np.asarray(states, dtype=np.int64) # Get policy (choice probabilities) from result policy = np.asarray(self._result.policy) # Index into the policy for the requested states proba = policy[states] return proba
def __repr__(self) -> str: if self.params_ is not None: return ( f"MaxMarginIRL(n_states={self.n_states}, n_actions={self.n_actions}, " f"discount={self.discount}, fitted=True)" ) return ( f"MaxMarginIRL(n_states={self.n_states}, n_actions={self.n_actions}, " f"discount={self.discount}, fitted=False)" )