Tax-Analyzer-Framework

test5.py
Login

File taf/dptests/test5.py from the latest check-in


"""
Test IncomeUnderReporting class in a situation where there are no micro tax
data available to adjust for missing and under-reporting high-income people
in survey data, but there is an estimate of the number of and the mean of
post-adjustment incomes above an income threshold.  If we assume incomes at
or above the threshold have a Pareto distribution, we can adjust the survey
data.  We assume that the survey data have been structured so that each
observation has the same weight.
"""

import sys
import time
import argparse
import numpy as np
import pandas as pd
from taf import IncomeUnderReporting

# output parameters:
CSV_WRITE = False
CLI_WRITE = False
ETC_WRITE = False
DO_TIMING = False

# dPlN income distribution parameters:
SEED_SD = 987654321
SIZE_SD = 100_000  # number of observations in micro data
MEAN = 13.2
SDEV = 0.15
ALPHA = 2.3
BETA = 0.85
THRESHOLD_SD = 850e3

# descriptive statistics parameters:
FRACTILES = [0.005, 0.01, 0.05, 0.10, 0.15, 0.20,
             0.25, 0.50, 0.75,  # median and inter-quartile range
             0.80, 0.85, 0.90, 0.95, 0.99, 0.995]

HIGH_TAX = 150e3  # tax liability of 150,000


def main(seed_iur, threshold_iur, num_above_iur, mean_above_iur):
    """
    High-level logic.
    """
    # pylint: disable=too-many-statements,too-many-locals,too-many-branches
    if DO_TIMING:
        start_time = time.time()
    print((f'IUR: seed={seed_iur}  '
           f'threshold={threshold_iur:.0f}  '
           f'num_above={num_above_iur}  '
           f'mean_above={mean_above_iur:.0f}'))
    # ----------------------------------------------------------------
    # (*) construct tax data with no missing high incomes
    # ----------------------------------------------------------------
    # construct data with complete income information
    rng = np.random.default_rng(seed=SEED_SD)
    norm = rng.normal(size=SIZE_SD)
    expa = rng.exponential(size=SIZE_SD)
    expb = rng.exponential(size=SIZE_SD)
    del rng
    tinc = np.exp(MEAN + norm*SDEV + expa/ALPHA - expb/BETA).astype(int)
    trevenue, tallpayers, thipayers = aggregate_tax_statistics(tinc)
    covar = np.arange(tinc.size) + 1
    tdf = pd.DataFrame({'income': tinc, 'covar': covar})
    df_columns = list(tdf)
    if not CLI_WRITE:
        print('TAX df:\n', tdf.describe(percentiles=FRACTILES))
    if CSV_WRITE:
        tdf.to_csv('test5_tax.csv', index=False)
    del tdf
    # tabulate accurate IUR assumptions
    above = tinc >= THRESHOLD_SD
    abinc = tinc[above]
    num_above = abinc.size  # number of incomes above before incomes go missing
    mean_above = abinc.mean()  # mean income above before incomes go missing
    if not CLI_WRITE:
        print(f'num_above= {num_above}')
        print(f'mean_above= {mean_above:.0f}')
    # ----------------------------------------------------------------
    # (*) remove missing high incomes and "reweight" survey data
    # ----------------------------------------------------------------
    rng = np.random.default_rng(seed=SEED_SD)
    urn = rng.random(size=tinc.size)
    del rng
    prob = np.minimum(0.30 + 0.25e-6 * (tinc - THRESHOLD_SD), 1.0)
    missing_prob = np.where(above, prob, 0.0)
    mdf = pd.DataFrame({'missing_prob': missing_prob})
    if not CLI_WRITE:
        print('MISSING PROBABILITY:\n', mdf.describe(percentiles=FRACTILES))
    del mdf
    missing = np.logical_and(above, urn < missing_prob)
    num_missing = np.count_nonzero(missing)
    if not CLI_WRITE:
        print(f'num_missing= {num_missing}')
    oinc = tinc[~missing].copy()  # excludes missing incomes above threshold
    # add some below-threshold incomes to "reweight" survey data
    below = oinc < THRESHOLD_SD
    num_below = np.count_nonzero(below)
    rng = np.random.default_rng(seed=SEED_SD)
    add_indexes = rng.choice(num_below, size=num_missing, replace=False)
    del rng
    for idx in add_indexes:
        oinc = np.append(oinc, [oinc[idx]], axis=0)
    assert oinc.size == tinc.size
    # ----------------------------------------------------------------
    # (*) reduce high incomes to simulate income under-reporting
    # ----------------------------------------------------------------
    ur_above = oinc >= 1.5 * THRESHOLD_SD
    ur_income = (0.5 * oinc[ur_above] + 0.5 * THRESHOLD_SD).astype(int)
    oinc[ur_above] = ur_income
    assert oinc.size == tinc.size
    # ----------------------------------------------------------------
    # (*) summarize observed survey data
    # ----------------------------------------------------------------
    orevenue, oallpayers, ohipayers = aggregate_tax_statistics(oinc)
    odf = pd.DataFrame({'income': oinc, 'covar': covar}).astype('int')
    if not CLI_WRITE:
        print('OBS df:\n', odf.describe(percentiles=FRACTILES))
    if CSV_WRITE:
        odf.to_csv('test5_obs.csv', index=False)
    if ETC_WRITE:
        print('ETC:', np.quantile(oinc, [0.815, 0.902, 0.950, 0.988]).round())
    # ----------------------------------------------------------------
    # (*) adjust observed incomes using synthetic incomes
    # ----------------------------------------------------------------
    # create synthetic adjustment incomes using _mean method
    if DO_TIMING:
        print(f'prep_execution_time= {(time.time() - start_time):.2f} secs')
        start_time = time.time()
    iur = IncomeUnderReporting(threshold_iur, seed_iur)
    adjval = iur.synthetic_adjustment_values_mean(num_above_iur,
                                                  mean_above_iur,
                                                  verbose=not CLI_WRITE)
    assert np.all(adjval >= threshold_iur)
    adj = iur.adjust(df_columns.index('income'), odf.to_numpy(), adjval)
    if DO_TIMING:
        print(f'IUR_execution_time= {(time.time() - start_time):.2f} secs')
        start_time = time.time()
    # check adj values returned from IncomeUnderReporting.adjust method
    assert isinstance(adj, np.ndarray)
    assert adj.shape[0] == SIZE_SD, \
        f'adj.shape[0]={adj.shape[0]} != SIZE_SD={SIZE_SD}'
    adf = pd.DataFrame(adj, columns=df_columns).astype('int')
    ainc = adf['income'].to_numpy()
    arevenue, aallpayers, ahipayers = aggregate_tax_statistics(ainc)
    if not CLI_WRITE:
        print('ADJ df:\n', adf.describe(percentiles=FRACTILES))
    if CSV_WRITE:
        adf.to_csv('test5_adj.csv', index=False)
    del adf
    print(('AGG_TAX_REVENUE:TAX,OBS,ADJ:  '
           f'{trevenue:6.3f}  {orevenue:6.3f}  {arevenue:6.3f}'))
    print(('ALL_TAX_PAYERS: TAX,OBS,ADJ:  '
           f'{tallpayers:6.2f}  {oallpayers:6.2f}  {aallpayers:6.2f}'))
    print(('HIGH_TAX_PAYERS:TAX,OBS,ADJ:  '
           f'{thipayers:6.2f}  {ohipayers:6.2f}  {ahipayers:6.2f}'))
    print(f'HIGH_TAX= {HIGH_TAX:.0f}')
    if DO_TIMING:
        print(f'post_execution_time= {(time.time() - start_time):.2f} secs')
    return 0
# end of main function code


def aggregate_tax_statistics(income):
    """
    Return aggregate tax statistics for the specified income array.
    """
    assert isinstance(income, np.ndarray)
    # specify income tax policy parameters
    brk1 = 360e3  # top of 1st income bracket
    brk2 = 720e3  # top of 2nd income bracket
    # top of 3rd income bracket is infinity
    rate1 = 0.00  # marginal tax rate in 1st income bracket
    rate2 = 0.15  # marginal tax rate in 2nd income bracket
    rate3 = 0.30  # marginal tax rate in 3rd income bracket
    tax1 = rate1 * brk1
    tax2 = tax1 + rate2 * (brk2 - brk1)
    # compute tax liability for each income
    tax = np.where(
        income <= brk1,
        rate1 * income,
        np.where(
            income <= brk2,
            tax1 + rate2 * (income - brk1),
            tax2 + rate3 * (income - brk2)
        )
    )
    # compute aggregate tax statistics
    totaltax = tax.sum() * 1e-9  # total aggregate tax revenue (in billions)
    allpayers = np.count_nonzero(tax) * 1e-3  # all taxpayers (in thousands)
    hipayers = np.count_nonzero(tax >= HIGH_TAX) * 1e-3  # high taxpayers (")
    return (totaltax, allpayers, hipayers)


def process_command_line_arguments():
    """
    Process optional command-line arguments returning a dictionary
    with these values: seed, threshold, num_above, mean_above.
    """
    usage_str = ('python test5.py [--seed SEED] [--threshold LEVEL] '
                 '[--num_above NUM [--mean_above MEAN] [--help]')
    parser = argparse.ArgumentParser(
        prog='',
        usage=usage_str,
        description=('Run test5 using the specified values of the '
                     'optional four command-line parameters.')
    )
    parser.add_argument(
        '--seed', metavar='SEED', type=int, default=789123456,
        help='option that specifies SEED [default=789123456]'
    )
    parser.add_argument(
        '--threshold', metavar='LEVEL', type=float, default=850e3,
        help='option that specifies threshold LEVEL [default=850e3]'
    )
    parser.add_argument(
        '--num_above', metavar='NUM', type=int, default=10108,
        help='option that specifies NUM above LEVEL [default=10108]'
    )
    parser.add_argument(
        '--mean_above', metavar='MEAN', type=float, default=1506.081e3,
        help='option that specifies MEAN above LEVEL [default=1506.081e3]'
    )
    args = parser.parse_args()
    # check command-line arguments
    args_ok = True
    if args.seed <= 0 or args.seed > 999_999_999:
        sys.stderr.write('ERROR: SEED not in [1,999999999] range\n')
        args_ok = False
    if args.threshold <= 0:
        sys.stderr.write('ERROR: threshold LEVEL is not positive\n')
        args_ok = False
    if args.num_above <= 0:
        sys.stderr.write('ERROR: num_above NUM is not positive\n')
        args_ok = False
    if args.mean_above <= args.threshold:
        sys.stderr.write('ERROR: mean_above MEAN <= threshold LEVEL\n')
        args_ok = False
    if args_ok:
        return {'seed': args.seed,
                'threshold': args.threshold,
                'num_above': args.num_above,
                'mean_above': args.mean_above}
    sys.stderr.write(f'USAGE: {usage_str}\n')
    return {}


if __name__ == '__main__':
    arg = process_command_line_arguments()
    if arg:
        sys.exit(main(arg['seed'], arg['threshold'],
                      arg['num_above'], arg['mean_above']))
    else:
        sys.exit(1)