Tax-Analyzer-Framework

iur.py
Login

File taf/iur.py from the latest check-in


"""
IncomeUnderReporting class implements an adjust method that replaces
income amounts at or above a threshold (typically from survey data)
with adjustment amounts (typically derived from tax data).

The adjustment techniques used here are similar to those discussed by
Nora Lustig, "The 'Missing Rich' in Household Surveys: Causes and
Correction Approaches" (Commitment to Equity (CEQ) Working Paper
Series 75, Tulane University, Department of Economics, November 2019).
https://repec.tulane.edu/RePEc/ceq/ceq75.pdf

The adjustment amounts can be drawn either from actual data or from
synthetic (Pareto distributed) data generated by using either the
synthetic_adjustment_values_mean method or the
synthetic_adjustment_values_infer method.  Both the mean and infer
methods assume the under-reported incomes (above the assumed
threshold) have a Pareto distribution, which is represented here using a
truncated dPlN distribution that is documented in the following paper:
W. Reed and M. Jorgensen, "The Double Pareto-Lognormal Distribution – A
New Parametric Model for Size Distributions" (2003).
https://www.math.uvic.ca/faculty/reed/dPlN.3.pdf

The only differences between the mean and infer methods are the
assumptions made.  The infer method assumes a bound (lower than the
threshold) and makes the assumptions that income values at or above
this bound have a Pareto distribution and that the values in the
[bound,threshold) range are not under reported.  On the other hand,
the mean method makes the assumption that the true mean value of
incomes above the threshold is known and that the distribution of
incomes at or above the threshold is Pareto.  Note that the infer
method is similar to the method used in a recent OECD working paper:
Nicolas Ruiz and Nicolas Woloszko, "What do household surveys suggest
about the top 1% incomes and inequality in OECD countries?"
OECD Economics Department Working Papers No. 1265 (Jan 2016).
https://doi.org/10.1787/5jrs556f36zt-en

The adjustment of income values at or above a specified threshold is
not dissimilar to imputing missing data values in that the adjustment
is stochastic: that is, the adjusted values will differ when using
different seeds for the random number generator.  The adjustment is
not stochastic in the special case where a synthetic_adjustment_values
method is not called AND the number of non-synthetic adjustment values
(adjust method A argument) is exactly equal to the number of elements
at or above the income threshold in the adjust method X argument.

If the adjustment is stochastic, you can generate multiple adjustments
by creating multiple objects of the IncomeUnderReporting class that
have different seed values (just like with the MICE algorithm).

IMPORTANT NOTE:  The methods in this class assume that the survey
data have a uniform weight (that is, each sample observation has
the same weight), which means that a typical survey data sample
needs to be transformed from a variable-weight sample into a much
larger uniform-weight sample.  This tranformation can be done using
the uniform_weight_sample method of the SampleReWeighting class.

ALSO NOTE:  The adjust method in this class sorts the observations, so
the observations in the resulting adjusted sample will be in an order
that is different from the those in the X data supplied to the adjust
method.
"""

import time
import warnings
import numpy as np
from scipy import optimize


class IncomeUnderReporting:
    """
    IncomeUnderReporting class constructor.
    """
    def __init__(
            self,
            threshold,  # income threshold (incomes at or above are adjusted)
            seed  # integer in [1,999999999] range
    ):
        # process arguments
        assert isinstance(threshold, (int, float)), \
            'threshold must be a number'
        assert threshold > 0, \
            f'threshold={threshold} must be a positive number'
        self._threshold = threshold
        assert isinstance(seed, int), \
            'seed must be an integer'
        assert seed > 0, \
            f'seed={seed} must be positive'
        assert seed <= 999_999_999, \
            f'seed={seed} must be no greater than 999999999'
        self._rng_seed = seed

    # ------------------------------------------------------------------------
    # class methods when assuming mean above threshold:
    # ---------------------------------------------------------------

    def synthetic_adjustment_values_mean(
            self,
            num,   # number of adjustment values to generate
            upper_mean,  # mean of the synthetic adjustment values (that is,
                         # values above the threshold set in the class
                         # constructor
            verbose=False  # True ==> details on minimization
    ):
        """
        Returns np.ndarray containing adjustment values generated assuming
        a Pareto distribution of values at or above the threshold; the
        returned array is suitable for use as the A argument in the class
        adjust method (assuming num is no less than the number of X elements
        at or above the threshold).
        """
        # check arguments
        assert isinstance(num, int), 'num must be an integer'
        assert num > 0, f'num={num} must be positive'
        assert isinstance(upper_mean, (int, float)), \
            'upper_mean must be a number'
        assert upper_mean > self._threshold, \
            (f'upper_mean={upper_mean:.5e} must be greater than '
             f'threshold={self._threshold:.5e}')
        # specify fixed IncomeUnderReporting.func_mean params
        rng = np.random.default_rng(seed=self._rng_seed)
        expa = rng.exponential(size=num)
        del rng
        params = (expa, np.log(self._threshold), upper_mean,)
        # find alpha that implies target upper_mean
        start_time = time.time()
        res = optimize.minimize_scalar(IncomeUnderReporting.func_mean,
                                       args=params, method='brent')
        exectime = time.time() - start_time
        alpha = np.exp(res.x)
        if verbose:
            label = 'IUR.syn_adj_vals_mean:'
            print(f'{label} alpha at minimum =', alpha)
            print(f'{label} number of function evaluations =', res.nfev)
            print(f'{label} minimization execution time = {exectime:.3f} secs')
        assert res.success, \
            'synthetic minimization problem; set verbose=True for details'
        # compute adjustment values at minimizing xvar value for alpha
        adjval = self.adjustment_values(alpha, expa)
        return adjval

    @staticmethod
    def func_mean(log_alpha, *params):
        """
        Function of log_alpha that is minimized in the
        synthetic_adjustment_values_mean method so that
        so that the adjustment values have the target upper_mean.
        """
        # assign fixed parameter values including target statistic
        expa, log_threshold, target_upper_mean = params
        # use double Pareto log normal (dPlN) alpha parameter value to
        # generate synthetic income values with truncated dPlN distribution
        alpha = np.exp(log_alpha)
        log_income = log_threshold + expa/alpha
        adjval = np.exp(np.clip(log_income, -300, 300))
        # calculate function value and return
        return (np.mean(adjval) - target_upper_mean) ** 2

    # ------------------------------------------------------------------------

    # ------------------------------------------------------------------------
    # class methods when assuming Pareto between bound and threshold:
    # ---------------------------------------------------------------

    def synthetic_adjustment_values_infer(
            self,
            num,   # number of adjustment values to generate
            bound,  # Pareto parameter is inferred using data between this
                    # lower bound and the underreporting threshold set in
                    # the class constructor
            income,  # income observations: only values in [bound,threshold)
                     # range will be used in the calculations
            verbose=False  # True ==> details on minimization
    ):
        """
        Returns np.ndarray containing adjustment values generated assuming
        a Pareto distribution of values at or above the bound with values
        in the [bound,threshold) range being assumed to be not underreported;
        the returned array is suitable for use as the A argument in the class
        adjust method (assuming num is no less than the number of X elements
        at or above the threshold).  This method assumes values between the
        lower bound and the threshold are not underreported and have a
        Pareto distribution.
        """
        # check arguments
        assert isinstance(num, int), 'num must be an integer'
        assert num > 0, f'num={num} must be positive'
        assert isinstance(bound, (int, float)), 'bound must be a number'
        threshold = self._threshold
        assert bound < threshold, \
            f'bound={bound:.5e} must be less than threshold={threshold:.5e}'
        assert isinstance(income, np.ndarray), 'income must be an np.ndarray'
        assert income.ndim == 1, 'income must be a one-dimensional array'
        # estimate alpha
        alpha = IncomeUnderReporting.truncated_pareto_alpha(
            bound, threshold, income, verbose=verbose
        )
        # compute adjustment values using estimated alpha
        rng = np.random.default_rng(seed=self._rng_seed)
        expa = rng.exponential(size=num)
        del rng
        adjval = self.adjustment_values(alpha, expa)
        return adjval

    @staticmethod
    def truncated_pareto_alpha(
            lobound,  # lower bound of truncated Pareto income distribution
            hibound,  # higher bound of truncated Pareto income distribution
            income,  # income observations (only values in [lobound,hibound]
                     # range will be used in the calculations)
            verbose=False  # True ==> details on finding alpha
    ):
        """
        Return maximum-likelihood estimate of alpha for the lower- and
        upper-truncated Pareto distribution using equation (4) in
        I. Aban, M. Meerschaert, and A. Panorska,
        "Parameter Estimation for the Truncated Pareto Distribution"
        Journal of the American Statistical Association, March 2006,
        Vol. 101, No. 473, Theory and Methods.
        https://www.stt.msu.edu/~mcubed/TPareto.pdf
        """
        # check arguments
        assert isinstance(lobound, (int, float)), 'lobound must be a number'
        assert isinstance(hibound, (int, float)), 'hibound must be a number'
        assert isinstance(income, np.ndarray), 'income must be an np.ndarray'
        assert income.ndim == 1, 'income must be a one-dimensional array'
        # compute sum of log relative (to lobound) income values in range
        in_range = np.logical_and(lobound <= income, income <= hibound)
        inc = income[in_range]
        sumlogrelinc = np.log(inc / lobound).sum()
        # specify fixed IncomeUnderReporting.func_tpa params
        params = (inc.size, sumlogrelinc, lobound, hibound,)
        # find maximum-likelihood estimate of alpha
        start_time = time.time()
        warnings.simplefilter(action='ignore', category=RuntimeWarning)
        res = optimize.minimize_scalar(IncomeUnderReporting.func_tpa,
                                       args=params, method='brent')
        exectime = time.time() - start_time
        alpha = np.exp(res.x)
        if verbose:
            label = 'IUR.truncated_pareto_alpha:'
            print(f'{label} alpha at minimum = {alpha:.5f}')
            print(f'{label} minimized function value =', res.fun)
            print(f'{label} number of function evaluations =', res.nfev)
            print(f'{label} minimization execution time = {exectime:.3f} secs')
        assert res.success, \
            'synthetic minimization problem; set verbose=True for details'
        return alpha

    @staticmethod
    def func_tpa(log_alpha, *params):
        """
        Function of log_alpha that is minimized in the
        truncated_pareto_alpha method in order to obtain
        the maximum-likelihood estimate of alpha for the
        lower-truncated and upper-truncated Pareto distribution.
        See equation (4) in I. Aban, M. Meerschaert, and A. Panorska,
        "Parameter Estimation for the Truncated Pareto Distribution"
        Journal of the American Statistical Association, March 2006,
        Vol. 101, No. 473, Theory and Methods
        https://www.stt.msu.edu/~mcubed/TPareto.pdf
        """
        # assign fixed parameter values including target statistic
        num, sumlogrelx, lobound, hibound = params
        # calculate function value and return
        alpha = np.exp(log_alpha)
        ratio = lobound / hibound
        ratio2a = np.power(ratio, alpha)
        logratio = np.log(ratio)
        expr = num / alpha + num * ratio2a * logratio / (1 - ratio2a)
        return 1e6 * (expr - sumlogrelx) ** 2

    # ------------------------------------------------------------------------

    def adjustment_values(self, alpha, expa):
        """
        Returns np.ndarray containing synthetic adjustment values implied
        by alpha and expa (an array of exponential variates); called from
        both the synthetic_adjustment_values_mean and the
        synthetic_adjustment_values_infer methods.  The returned values
        are all at or above the threshold specified in the class
        constructor.
        """
        assert isinstance(alpha, float), 'alpha must be a float'
        assert alpha > 0.0, 'alpha must be positive'
        assert isinstance(expa, np.ndarray), 'expa is not a numpy array'
        # generate synthetic income values at or above the threshold
        # assuming a truncated double Pareto log normal (dPlN) distribution
        log_income = np.log(self._threshold) + expa/alpha
        adjval = np.exp(np.clip(log_income, -300, 300))
        return adjval

    @property
    def income_threshold(self):
        """
        Returns threshold value specified in class constructor.
        """
        return self._threshold

    @property
    def random_number_seed(self):
        """
        Returns seed value specified in class constructor.
        """
        return self._rng_seed

    # pylint: disable=invalid-name
    def adjust(
            self,
            idx,  # X array column index for income variable to be adjusted
            X,  # np.ndarray shape (J,M) with no np.nan values for idx variable
            A,  # np.ndarray shape (K,) with no np.nan values for idx variable
            id_idx=-1,  # X array column index for id variable
            id_offset=1_000_000,  # id_offset used when extra rows are added
            verbose=False
    ):
        """
        Returns np.ndarray with A values replacing X[:,idx] values that are
        at or above the income threshold specified in the class constructor.

        If K, the number of A values, is equal to the number of X[:,idx]
        values at or above the threshold, X[:,idx] values below the threshold
        are unchanged and the algorithm replaces the smallest X[:,idx] value
        at or above the threshold with the smallest A value, the next to the
        smallest X[:,idx] value at or above the threshold with the next to
        the smallest A value, ..., and replaces the largest X[:,idx] value
        with the largest A value.

        If K, the number of A values, is larger than the number of X[:,idx]
        values at or above the threshold, then extra X rows are added at or
        above the threshold using a nearest neighbor algorithm (which
        simplifies to the above algorithm when the number of A values is
        equal to the number of X[:,idx] values), and randomly-selected X
        rows below the threshold are removed to ensure that the number of
        rows in the returned array are equal to the number of rows in the
        specified X array.

        Note that the returned array is not in the same order as the
        specified X array.
        """
        # pylint: disable=too-many-arguments
        # pylint: disable=too-many-statements,too-many-locals
        if verbose:
            start_time = time.time()
        # check X argument
        assert isinstance(X, np.ndarray), 'X must be a np.ndarray'
        assert X.ndim == 2, 'X must have two dimensions'
        # check idx argument
        assert 0 <= idx < X.shape[1], \
            f'idx={idx} not in X row range [0,{X.shape[1]-1}]'
        assert not np.any(np.isnan(X[:, idx])), \
            'X[:,idx] must not contain np.nan values'
        # check A argument
        assert isinstance(A, np.ndarray), 'A must be a np.ndarray'
        assert A.ndim == 1, 'A must have one dimension'
        assert not np.any(np.isnan(A)), 'A must not contain np.nan values'
        # cross check number of X[:,idx] at or above threshold and number of A
        above = X[:, idx] >= self._threshold
        num_above = np.count_nonzero(above)
        assert num_above > 0, \
            (f'X[:,idx] must contain values at or '
             f'above threshold={self._threshold}')
        assert A.size >= num_above, \
            (f'number of A values [{A.size}] must be no less than '
             f'number of X[:,idx] values at or above threshold [{num_above}]')
        # extract X rows below threshold
        XB = X[~above].copy()
        # extract X rows at or above threshold and sort by idx variable
        XA = X[above].copy()
        XA = XA[np.argsort(XA[:, idx])]
        # sort A values
        AS = np.sort(A)
        if verbose:
            print(f'IUR.adjust: A.size = {A.size} and num_above = {num_above}')
        # handle large A.size situation if necessary
        if AS.size > num_above:
            #   Note: the basic approach in this situation has similarities
            #   to that used in T.Blanchet, I.Flores, M.Morgan, "The Weight
            #   of the Rich: Improving Surveys Using Tax Data" (WID.world
            #   WORKING PAPER SERIES N° 2018/12)
            # add rows at or above threshold to XA
            assert XA.shape[0] == num_above
            ratio = num_above / AS.size
            ratio_cumsum = np.cumsum(ratio * np.ones_like(AS))
            ratio_cumsum[-1] -= 1e-5  # adjust last cumsum down by tiny amount
            indexes = np.floor(ratio_cumsum).astype('int')
            assert indexes.min() >= 0, \
                (f'at least one XA_index={indexes.min()} < 0 '
                 f'when ratio={ratio:.9f}')
            assert indexes.max() < num_above, \
                (f'at least one XA_index={indexes.max()} >= {num_above} '
                 f'when ratio={ratio:.9f}')
            del ratio_cumsum
            idxval, idxcnt = np.unique(indexes, return_counts=True)
            del indexes
            num_added = 0
            for ival, icnt in zip(idxval, idxcnt):
                idoffset = 0
                for iteration in range(0, icnt-1):
                    if iteration == 0:
                        XAB = XA[ival].copy()
                    if id_idx >= 0:
                        idoffset += id_offset
                        XAB[id_idx] += idoffset
                    XA = np.append(XA, [XAB], axis=0)
                    num_added += 1
            del idxval
            del idxcnt
            if verbose:
                print(f'IUR:adjust: num_added = {num_added}')
            # sort augmented XA by idx variable
            XA = XA[np.argsort(XA[:, idx])]
            # drop rows below threshold from XB
            rng = np.random.default_rng(seed=self._rng_seed)
            drop = rng.choice(XB.shape[0], size=num_added, replace=False)
            del rng
            XB = np.delete(XB, drop, axis=0)
            del drop
        # replace XA[:,idx] values with AS values
        XA[:, idx] = AS
        # return new array containing both rows below and above the threshold
        XX = np.concatenate((XB, XA))
        assert XX.shape == X.shape, f'XX.shape={XX.shape} != X.shape={X.shape}'
        if verbose:
            exectime = time.time() - start_time
            print(f'IUR.adjust: execution_time = {exectime:.3f} secs')
        return XX