"""
Simulate tax filing status of each unit in specified DUMP data using
parameters of a tax filing probability function that are fit to match
closely the specified number of taxpayers who file, and then write
implied input data used to create the DUMP file, in addition to the
simulated nonfiler variable, to a CSV-formatted text file.
"""
import os
import sys
import argparse
import numpy as np
import pandas as pd
from scipy import optimize
FINER_GRID = True
def main(arg):
"""
High-level logic.
"""
# pylint: disable=too-many-locals
# read DUMP output file into Pandas DataFrame
idf = pd.DataFrame(pd.read_csv(arg['DUMP']))
num = idf.shape[0]
pitax = idf['pitax'].to_numpy()
weight = idf['weight'].to_numpy()
# generate uniformly-distributed random numbers
rng = np.random.default_rng(seed=arg['SEED'])
urn = rng.random(size=num)
del rng
# specify target number of filers
filers_target = arg['FILERS']
# set fixed func parameters
params = (pitax, weight, urn, filers_target,)
# specify grid points of tax filing probability function parameter values
# (probability function is peicewise linear with two segments)
if FINER_GRID:
xslices = (
# TX0=0.0 : pitax at start
slice(0.010, 0.045, 0.005), # FP0 : filing probability at TX0
slice(0.050, 0.100, 0.005), # FPM : filing probability at TXM
slice(0, 400, 50), # TXM : pitax at middle
slice(200, 600, 50) # TX1 : pitax when reach FP1
# FP1=1.0 : filing probability at TX1 and above
)
else:
xslices = (
# TX0=0.0 : pitax at start
slice(0.00, 0.06, 0.01), # FP0 : filing probability at TX0
slice(0.00, 0.13, 0.01), # FPM : filing probability at TXM
slice(0., 800., 100.), # TXM : pitax at middle
slice(300., 1000., 100.) # TX1 : pitax when reach FP1
# FP1=1.0 : filing probability at TX1 and above
)
# find grid point that minimizes the value returned by the func
# function, which is the squared difference between the estimated
# number of filers and the specified filers_target using brute force
# (that is, computing the squared difference at each grid point
# defined by xslices and returning the grid point with the smallest
# squared difference)
res = optimize.brute(func, xslices, args=params, full_output=True,
disp=True, workers=-1, finish=None)
xvar = res[0]
print('xvar=', xvar) # xvar at func minimum
print('func(xvar)=', res[1]) # minimum function value
# generate filer variable using optimized xvar parameter value
filer = filer_variable(xvar, *params)
filers = np.dot(filer, weight) * 1e-6
aggtax = np.dot((filer * pitax), weight) * 1e-9
print(f'xvar ==> FILERS={filers:.3f} and AGGTAX={aggtax:.2f}')
# read INFILE, add nonfiler variable, and write OUTFILE
indf = pd.DataFrame(pd.read_csv(arg['INFILE']))
assert len(filer) == indf.shape[0], 'len(filer) not same as indf.shape[0]'
indf['nonfiler'] = np.where(filer == 0, 1, 0)
indf.to_csv(arg['OUTFILE'], index=False, float_format='%.2f')
return 0
# end of main function
def func(xvar, *params):
"""
Return squared difference between xvar-implied payers and payers_target.
"""
# pylint: disable=too-many-locals
# specify fixed func parameters
pitax, weight, urn, filers_target = params
# specify optimized func parameters
tx0 = 0.0
fp0 = np.clip(xvar[0], 0.0, 1.0)
fpm = np.clip(xvar[1], fp0, 1.0)
txm = np.clip(xvar[2], tx0, None)
tx1 = np.clip(xvar[3], txm, None)
fp1 = 1.0
# generate filer variable
fprob = np.interp(pitax, [tx0, txm, tx1], [fp0, fpm, fp1])
filer = np.where(urn < fprob, 1, 0)
# calculate func value
filers = np.dot(filer, weight) * 1e-6
return (filers - filers_target) ** 2
def filer_variable(xvar, *params):
"""
Return filer variable array given xvar parameters and params.
"""
# specify fixed func parameters
pitax, _, urn, _ = params
# specify optimized func parameters
tx0 = 0.0
fp0 = np.clip(xvar[0], 0.0, 1.0)
fpm = np.clip(xvar[1], fp0, 1.0)
txm = np.clip(xvar[2], tx0, None)
tx1 = np.clip(xvar[3], txm, None)
fp1 = 1.0
# generate filer variable
fprob = np.interp(pitax, [tx0, txm, tx1], [fp0, fpm, fp1])
filer = np.where(urn < fprob, 1, 0)
return filer
def process_command_line_arguments():
"""
Process command-line arguments returning a dictionary with values for
these variables: DUMP, SEED, FILERS
"""
usage_str = 'python filer.py DUMP SEED FILERS [--help]'
parser = argparse.ArgumentParser(
prog='',
usage=usage_str,
description=('Find tax filing probability parameters that, '
'given MYI-T-A DUMP output and a SEED, generate '
'a filer variable that implies FILERS, and write '
'new MYI-T-A input file including the additional '
'simulated nonfiler variable')
)
parser.add_argument('DUMP', type=str,
help='DUMP output file name.')
parser.add_argument('SEED', type=int,
help='Random number seed.')
parser.add_argument('FILERS', type=float,
help='Target millions of filing taxpayers.')
args = parser.parse_args()
# check command-line arguments
args_ok = True
if not os.path.isfile(args.DUMP):
sys.stderr.write(
'ERROR: DUMP file does not exist\n'
)
args_ok = False
if not args.DUMP.endswith('.csv'):
sys.stderr.write(
'ERROR: DUMP file name does not end with .csv\n'
)
args_ok = False
names = args.DUMP[:-4].split('-')
if len(names) < 3:
sys.stderr.write(
'ERROR: DUMP not a standard DUMP output file name\n'
)
args_ok = False
infile = names[0] + '.csv'
if not os.path.isfile(infile):
sys.stderr.write(
f'ERROR: implied input file (named {infile}) does not exist\n'
)
args_ok = False
outfile = names[0] + 'x.csv'
if os.path.isfile(outfile):
os.remove(outfile)
if not 1 <= args.SEED <= 999_999_999:
sys.stderr.write(
'ERROR: SEED must be in [1,999999999] range\n'
)
args_ok = False
if args.FILERS <= 0:
sys.stderr.write(
'ERROR: FILERS must be positive\n'
)
args_ok = False
if args_ok:
return {
'DUMP': args.DUMP,
'SEED': args.SEED,
'FILERS': args.FILERS,
'INFILE': infile,
'OUTFILE': outfile
}
sys.stderr.write(f'USAGE: {usage_str}\n')
return {}
# end of process_command_line_arguments function
if __name__ == '__main__':
cliarg = process_command_line_arguments()
if cliarg:
sys.exit(main(cliarg))
else:
sys.exit(1)