"""
Test IncomeUnderReporting class in a situation where there are no micro tax
data available to adjust for missing and under-reporting high-income people
in survey data, but there is an estimate of the number of and the mean of
post-adjustment incomes above an income threshold. If we assume incomes at
or above the threshold have a Pareto distribution, we can adjust the survey
data. We assume that the survey data have been structured so that each
observation has the same weight.
"""
import sys
import time
import argparse
import numpy as np
import pandas as pd
from taf import IncomeUnderReporting
# output parameters:
CSV_WRITE = False
CLI_WRITE = False
ETC_WRITE = False
DO_TIMING = False
# dPlN income distribution parameters:
SEED_SD = 987654321
SIZE_SD = 100_000 # number of observations in micro data
MEAN = 13.2
SDEV = 0.15
ALPHA = 2.3
BETA = 0.85
THRESHOLD_SD = 850e3
# descriptive statistics parameters:
FRACTILES = [0.005, 0.01, 0.05, 0.10, 0.15, 0.20,
0.25, 0.50, 0.75, # median and inter-quartile range
0.80, 0.85, 0.90, 0.95, 0.99, 0.995]
HIGH_TAX = 150e3 # tax liability of 150,000
def main(seed_iur, threshold_iur, num_above_iur, mean_above_iur):
"""
High-level logic.
"""
# pylint: disable=too-many-statements,too-many-locals,too-many-branches
if DO_TIMING:
start_time = time.time()
print((f'IUR: seed={seed_iur} '
f'threshold={threshold_iur:.0f} '
f'num_above={num_above_iur} '
f'mean_above={mean_above_iur:.0f}'))
# ----------------------------------------------------------------
# (*) construct tax data with no missing high incomes
# ----------------------------------------------------------------
# construct data with complete income information
rng = np.random.default_rng(seed=SEED_SD)
norm = rng.normal(size=SIZE_SD)
expa = rng.exponential(size=SIZE_SD)
expb = rng.exponential(size=SIZE_SD)
del rng
tinc = np.exp(MEAN + norm*SDEV + expa/ALPHA - expb/BETA).astype(int)
trevenue, tallpayers, thipayers = aggregate_tax_statistics(tinc)
covar = np.arange(tinc.size) + 1
tdf = pd.DataFrame({'income': tinc, 'covar': covar})
df_columns = list(tdf)
if not CLI_WRITE:
print('TAX df:\n', tdf.describe(percentiles=FRACTILES))
if CSV_WRITE:
tdf.to_csv('test5_tax.csv', index=False)
del tdf
# tabulate accurate IUR assumptions
above = tinc >= THRESHOLD_SD
abinc = tinc[above]
num_above = abinc.size # number of incomes above before incomes go missing
mean_above = abinc.mean() # mean income above before incomes go missing
if not CLI_WRITE:
print(f'num_above= {num_above}')
print(f'mean_above= {mean_above:.0f}')
# ----------------------------------------------------------------
# (*) remove missing high incomes and "reweight" survey data
# ----------------------------------------------------------------
rng = np.random.default_rng(seed=SEED_SD)
urn = rng.random(size=tinc.size)
del rng
prob = np.minimum(0.30 + 0.25e-6 * (tinc - THRESHOLD_SD), 1.0)
missing_prob = np.where(above, prob, 0.0)
mdf = pd.DataFrame({'missing_prob': missing_prob})
if not CLI_WRITE:
print('MISSING PROBABILITY:\n', mdf.describe(percentiles=FRACTILES))
del mdf
missing = np.logical_and(above, urn < missing_prob)
num_missing = np.count_nonzero(missing)
if not CLI_WRITE:
print(f'num_missing= {num_missing}')
oinc = tinc[~missing].copy() # excludes missing incomes above threshold
# add some below-threshold incomes to "reweight" survey data
below = oinc < THRESHOLD_SD
num_below = np.count_nonzero(below)
rng = np.random.default_rng(seed=SEED_SD)
add_indexes = rng.choice(num_below, size=num_missing, replace=False)
del rng
for idx in add_indexes:
oinc = np.append(oinc, [oinc[idx]], axis=0)
assert oinc.size == tinc.size
# ----------------------------------------------------------------
# (*) reduce high incomes to simulate income under-reporting
# ----------------------------------------------------------------
ur_above = oinc >= 1.5 * THRESHOLD_SD
ur_income = (0.5 * oinc[ur_above] + 0.5 * THRESHOLD_SD).astype(int)
oinc[ur_above] = ur_income
assert oinc.size == tinc.size
# ----------------------------------------------------------------
# (*) summarize observed survey data
# ----------------------------------------------------------------
orevenue, oallpayers, ohipayers = aggregate_tax_statistics(oinc)
odf = pd.DataFrame({'income': oinc, 'covar': covar}).astype('int')
if not CLI_WRITE:
print('OBS df:\n', odf.describe(percentiles=FRACTILES))
if CSV_WRITE:
odf.to_csv('test5_obs.csv', index=False)
if ETC_WRITE:
print('ETC:', np.quantile(oinc, [0.815, 0.902, 0.950, 0.988]).round())
# ----------------------------------------------------------------
# (*) adjust observed incomes using synthetic incomes
# ----------------------------------------------------------------
# create synthetic adjustment incomes using _mean method
if DO_TIMING:
print(f'prep_execution_time= {(time.time() - start_time):.2f} secs')
start_time = time.time()
iur = IncomeUnderReporting(threshold_iur, seed_iur)
adjval = iur.synthetic_adjustment_values_mean(num_above_iur,
mean_above_iur,
verbose=not CLI_WRITE)
assert np.all(adjval >= threshold_iur)
adj = iur.adjust(df_columns.index('income'), odf.to_numpy(), adjval)
if DO_TIMING:
print(f'IUR_execution_time= {(time.time() - start_time):.2f} secs')
start_time = time.time()
# check adj values returned from IncomeUnderReporting.adjust method
assert isinstance(adj, np.ndarray)
assert adj.shape[0] == SIZE_SD, \
f'adj.shape[0]={adj.shape[0]} != SIZE_SD={SIZE_SD}'
adf = pd.DataFrame(adj, columns=df_columns).astype('int')
ainc = adf['income'].to_numpy()
arevenue, aallpayers, ahipayers = aggregate_tax_statistics(ainc)
if not CLI_WRITE:
print('ADJ df:\n', adf.describe(percentiles=FRACTILES))
if CSV_WRITE:
adf.to_csv('test5_adj.csv', index=False)
del adf
print(('AGG_TAX_REVENUE:TAX,OBS,ADJ: '
f'{trevenue:6.3f} {orevenue:6.3f} {arevenue:6.3f}'))
print(('ALL_TAX_PAYERS: TAX,OBS,ADJ: '
f'{tallpayers:6.2f} {oallpayers:6.2f} {aallpayers:6.2f}'))
print(('HIGH_TAX_PAYERS:TAX,OBS,ADJ: '
f'{thipayers:6.2f} {ohipayers:6.2f} {ahipayers:6.2f}'))
print(f'HIGH_TAX= {HIGH_TAX:.0f}')
if DO_TIMING:
print(f'post_execution_time= {(time.time() - start_time):.2f} secs')
return 0
# end of main function code
def aggregate_tax_statistics(income):
"""
Return aggregate tax statistics for the specified income array.
"""
assert isinstance(income, np.ndarray)
# specify income tax policy parameters
brk1 = 360e3 # top of 1st income bracket
brk2 = 720e3 # top of 2nd income bracket
# top of 3rd income bracket is infinity
rate1 = 0.00 # marginal tax rate in 1st income bracket
rate2 = 0.15 # marginal tax rate in 2nd income bracket
rate3 = 0.30 # marginal tax rate in 3rd income bracket
tax1 = rate1 * brk1
tax2 = tax1 + rate2 * (brk2 - brk1)
# compute tax liability for each income
tax = np.where(
income <= brk1,
rate1 * income,
np.where(
income <= brk2,
tax1 + rate2 * (income - brk1),
tax2 + rate3 * (income - brk2)
)
)
# compute aggregate tax statistics
totaltax = tax.sum() * 1e-9 # total aggregate tax revenue (in billions)
allpayers = np.count_nonzero(tax) * 1e-3 # all taxpayers (in thousands)
hipayers = np.count_nonzero(tax >= HIGH_TAX) * 1e-3 # high taxpayers (")
return (totaltax, allpayers, hipayers)
def process_command_line_arguments():
"""
Process optional command-line arguments returning a dictionary
with these values: seed, threshold, num_above, mean_above.
"""
usage_str = ('python test5.py [--seed SEED] [--threshold LEVEL] '
'[--num_above NUM [--mean_above MEAN] [--help]')
parser = argparse.ArgumentParser(
prog='',
usage=usage_str,
description=('Run test5 using the specified values of the '
'optional four command-line parameters.')
)
parser.add_argument(
'--seed', metavar='SEED', type=int, default=789123456,
help='option that specifies SEED [default=789123456]'
)
parser.add_argument(
'--threshold', metavar='LEVEL', type=float, default=850e3,
help='option that specifies threshold LEVEL [default=850e3]'
)
parser.add_argument(
'--num_above', metavar='NUM', type=int, default=10108,
help='option that specifies NUM above LEVEL [default=10108]'
)
parser.add_argument(
'--mean_above', metavar='MEAN', type=float, default=1506.081e3,
help='option that specifies MEAN above LEVEL [default=1506.081e3]'
)
args = parser.parse_args()
# check command-line arguments
args_ok = True
if args.seed <= 0 or args.seed > 999_999_999:
sys.stderr.write('ERROR: SEED not in [1,999999999] range\n')
args_ok = False
if args.threshold <= 0:
sys.stderr.write('ERROR: threshold LEVEL is not positive\n')
args_ok = False
if args.num_above <= 0:
sys.stderr.write('ERROR: num_above NUM is not positive\n')
args_ok = False
if args.mean_above <= args.threshold:
sys.stderr.write('ERROR: mean_above MEAN <= threshold LEVEL\n')
args_ok = False
if args_ok:
return {'seed': args.seed,
'threshold': args.threshold,
'num_above': args.num_above,
'mean_above': args.mean_above}
sys.stderr.write(f'USAGE: {usage_str}\n')
return {}
if __name__ == '__main__':
arg = process_command_line_arguments()
if arg:
sys.exit(main(arg['seed'], arg['threshold'],
arg['num_above'], arg['mean_above']))
else:
sys.exit(1)