Tax-Analyzer-Framework

test5x.py
Login

File taf/dptests/test5x.py from the latest check-in


"""
Test IncomeUnderReporting class in a situation where there are no non-survey
data available to adjust the under reporting of high incomes in survey data.

If we are willing to make three assumptions, we can adjust for the
income under reporting in the survey data.  Here are the assumptions:
(1) we are willing to assume an income threshold at or above which
     we think income is under reported;
(2) we are willing to assume that incomes between a lower bound (which is
    less than the threshold) and the threshold are correctly reported in
    the survey data;
(3) we are willing to assume that incomes at or above the lower bound have
    a Pareto distribution.

This approach has been used in a 2016 OECD working paper by Nicolas
Ruiz and Nicolas Woloszko.

But the ability of this approach to make plausible adjustments of income
under reporting is severely limited because there is no information to
guide the selection of the adjustment threshold or the lower bound.  And
by it nature, this approach cannot deal with high income people being
missing from the survey data.

This script illustrates one of these problems: the sensitivity of the
adjustment results to a deviation from assumption (3).  It does that
in the following way.  This script employs two different sets of
assumptions about the distribution of true higher incomes: (a) pure
Pareto, and (b) mixed Pareto and logNormal.  Under the (a) assumptions,
if we guess correctly about the bound and threshold, the approach produces
a fairly accurate estimate of the alpha Pareto parameter, and hence, of
the required under-reporting adjustment (as can be seen in the
test5xpure.exp results file, which is summarized below).  However, under
the (b) assumptions, the inference method systematically produces estimates
of alpha that are too low, and hence, produces income under-reporting
adjustments that are much too large (as can be seen in the test5xmixed.exp
results file, which is summarized below).

Here is a summary of these two sets of results for 100,000 observations
(with mean incomes expressed in thousands):
-------------------------------------------------------------------------------
                 PURE    MIXED
-------------------------------------------------------------------------------
    TRUE MEAN   116.9    115.2
OBSERVED MEAN    99.7     91.4  (same under reporting that increases above 200)
INFERRED ALPHA   1.52     0.83  (assuming the same 120 bound and 200 threshold)
ADJUSTED MEAN   116.2    811.7
-------------------------------------------------------------------------------
"""

import sys
import time
import argparse
import numpy as np
import pandas as pd
from taf import IncomeUnderReporting

# debugging parameters:
DESCRIBE_ONLY = False
CSV_WRITE = False
TIME_IUR = False

# dPlN income distribution parameters:
SEED_SD = 987654321
SIZE_SD = 100_000  # number of observations in survey data
THRESHOLD_SD = 200e3

# descriptive statistics parameters:
FRACTILES = [0.01, 0.05, 0.10, 0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.99]

# IncomeUnderReporting class parameters:
SEED_IUR = 567891234
THRESHOLD_IUR = 200e3
BOUND_IUR = 120e3


def main(distribution):
    """
    High-level logic.
    """
    # pylint: disable=too-many-locals,too-many-statements

    # ----------------------------------------------------------------
    # (*) construct survey data set with no income under-reporting
    # ----------------------------------------------------------------
    # specify dPlN income distribution parameters
    if distribution == 'pure':
        # pure Pareto
        mean = 11.0
        sdev = 0.0
        alpha = 1.5
        beta = 2.0
    else:
        # mixed Pareto and logNormal
        mean = 10.5
        sdev = 1.0
        alpha = 1.5
        beta = 2.0
    # construct survey data set with complete information
    rng = np.random.default_rng(seed=SEED_SD)
    norm = rng.normal(size=SIZE_SD)
    expa = rng.exponential(size=SIZE_SD)
    expb = rng.exponential(size=SIZE_SD)
    del rng
    income = np.exp(mean + norm*sdev + expa/alpha - expb/beta).astype(int)
    tdf = pd.DataFrame({
        'inc': income,
    })
    df_columns = list(tdf)
    if CSV_WRITE:
        tdf.to_csv('test5x_true.csv', index=False)
    print('TRUE df:\n', tdf.describe(percentiles=FRACTILES))
    # ----------------------------------------------------------------
    # (*) specify income under-reporting in survey data
    # ----------------------------------------------------------------
    # construct survey data set with income under reporting
    sdf = tdf.copy()
    underreported = sdf.inc >= THRESHOLD_SD
    num_underreported = np.count_nonzero(underreported)
    ur_inc = np.clip((0.5 * sdf.inc + 0.5 * THRESHOLD_SD).astype(int),
                     THRESHOLD_SD, np.inf)
    sdf.loc[underreported, 'inc'] = ur_inc
    print('OBSV df:\n', sdf.describe(percentiles=FRACTILES))
    if CSV_WRITE:
        sdf.to_csv('test5x_obsv.csv', index=False)
    # ----------------------------------------------------------------
    # (*) adjust under-reported incomes using synthetic income data
    # ----------------------------------------------------------------
    # compute mean income in [BOUND_IUR,THRESHOLD_IUR) range
    inc = sdf.inc.to_numpy()
    in_range = np.logical_and(inc >= BOUND_IUR, inc < THRESHOLD_IUR)
    above_range = inc >= THRESHOLD_IUR
    print((f'IUR_log_income_range= [{np.log(BOUND_IUR):.3f},'
           f'{np.log(THRESHOLD_IUR):.3f})'))
    print(f'IUR_income_range= [{BOUND_IUR:.5e},{THRESHOLD_IUR:.5e})')
    count_in_range = inc[in_range].size
    print((f'percent_in_IUR_income_range= '
           f'{(100*count_in_range/SIZE_SD):.2f}'))
    count_above_range = inc[above_range].size
    print(('percent_above_IUR_income_range= '
           f'{(100*count_above_range/SIZE_SD):.2f}'))
    if DESCRIBE_ONLY:
        return 0
    # adjust survey income values at or above THRESHOLD_IUR using infer method
    time0 = time.time()
    iur = IncomeUnderReporting(THRESHOLD_IUR, SEED_IUR)
    adjval = iur.synthetic_adjustment_values_infer(num_underreported,
                                                   BOUND_IUR,
                                                   inc[in_range],
                                                   verbose=True)
    assert np.all(adjval >= THRESHOLD_IUR)
    adj = iur.adjust(df_columns.index('inc'), sdf.to_numpy(), adjval)
    if TIME_IUR:
        print(f'IUR_exec_time(secs)= {(time.time() - time0):.1f}')
    # check adj values returned from IncomeUnderReporting.adjust method
    adf = pd.DataFrame(adj, columns=df_columns).astype('int')
    assert adf.shape[0] == SIZE_SD
    adf_b_mean = adf.inc[adf.inc < THRESHOLD_SD].mean()
    sdf_b_mean = sdf.inc[sdf.inc < THRESHOLD_SD].mean()
    if not np.allclose(adf_b_mean, sdf_b_mean):
        print(f'adf,sdf_below_mean: {adf_b_mean:.2f} != {sdf_b_mean:.2f}')
    print('ADJT df:\n', adf.describe(percentiles=FRACTILES))
    if CSV_WRITE:
        adf.to_csv('test5x_adjt.csv', index=False)
    return 0
# end of main function code


def process_command_line_arguments():
    """
    Process command-line arguments returning a dictionary with values for
    these variables: DIST
    """
    usage_str = 'python test5x.py pure|mixed [--help]'
    parser = argparse.ArgumentParser(
        prog='',
        usage=usage_str,
        description=('Execute test5x.py using specified '
                     'distribution assumption.')
    )
    parser.add_argument('DIST', type=str,
                        help=('Assumed type of income distribution: '
                              'pure Pareto or mixed Pareto and logNormal.'))
    args = parser.parse_args()
    # check command-line arguments
    args_ok = True
    if args.DIST not in ('pure', 'mixed'):
        sys.stderr.write(
            "ERROR: DIST must be either 'pure' or 'mixed'\n"
        )
        args_ok = False
    if args_ok:
        return {'DIST': args.DIST}
    sys.stderr.write(f'USAGE: {usage_str}\n')
    return {}
# end of process_command_line_arguments function


if __name__ == '__main__':
    cliarg = process_command_line_arguments()
    if cliarg:
        sys.exit(main(cliarg['DIST']))
    else:
        sys.exit(1)