"""
Impute HSDATA data to TRDATA and write resulting TRDATA to OUTPUT file.
Only HSDATA with nonfiler=0 (that is, those simulated to be tax filers) are
used in the imputation process, which uses the Tax-Analyzer-Framework MICE
package.
"""
import os
import sys
import argparse
import numpy as np
import pandas as pd
import taf # Tax-Analyzer-Framework
COMMON_VARIABLES = [
'empinc', 'relief_kids_u18', 'disabled_indiv'
]
IMPUTED_VARIABLES = [
'gender', 'age', 'strata', 'married', 'propinc', 'birinc', 'traninc'
]
def main(arg):
"""
High-level logic.
"""
# read TRDATA file into Pandas DataFrame
tdf = pd.DataFrame(pd.read_csv(arg['TRDATA']))
if arg['tabulate']:
print_aggregate_statistics('TRDATA[pre] ', tdf)
# read HSDATA file into Pandas DataFrame
idf = pd.DataFrame(pd.read_csv(arg['HSDATA']))
if arg['tabulate']:
print_aggregate_statistics('HSDATA', idf)
# impute missing variables to TRDATA using HSDATA and taf.MICE class
adf = add_imputed_variables(tdf, idf[idf['nonfiler'] == 0].copy())
del tdf
del idf
if arg['tabulate']:
print_aggregate_statistics('TRDATA[post]', adf)
# write adf DataFrame to OUTPUT file
adf.to_csv(arg['OUTPUT'], index=False, float_format='%.2f')
return 0
# end of main function
def print_aggregate_statistics(label, xdf):
"""
Print mean statistics for specified label and data.
"""
print(f'{label} OBSERVATIONS: {xdf.shape[0]}')
total_weight = xdf['weight'].sum()
if 'pre' in label:
variables = ['empinc']
scales = [1e-3]
else:
variables = ['empinc'] + IMPUTED_VARIABLES
scales = [1e-3, 1.0, 1.0, 1.0, 1.0, 1e-3, 1e-3, 1e-3]
for var, scale in zip(variables, scales):
stat = np.dot(xdf[var], xdf['weight']) / total_weight
print(f'{label} mean {var}: {(stat * scale):.3f}')
print(f'{label} correlations:\n', xdf[variables].corr().to_string())
def add_imputed_variables(tdf, idf):
"""
Add imputed variables from idf to tdf and return augmented tdf.
Missing values in the tdf have a monotone missing data pattern, so can
impute them sequentially with univariate models using the MICE package.
"""
# construct DataFrame used by MICE package
# ... first build the tax-returns subsample with many missing variables
xtdf = pd.DataFrame({var: tdf[var] for var in COMMON_VARIABLES})
for var in IMPUTED_VARIABLES:
xtdf.loc[:, var] = np.nan
# ... second build the survey-individuals subsample with no missing values
xidf = idf[COMMON_VARIABLES + IMPUTED_VARIABLES]
# ... combine the two subsamples into a single DataFrame
xdf = pd.concat([xtdf, xidf], ignore_index=True)
# construct list of imputed variables column index numbers
col = list(xdf)
x_idx = [col.index(var) for var in IMPUTED_VARIABLES]
# construct instance of the MICE class and call its impute method
mice = taf.MICE(xdf.shape[0], xdf.shape[1], x_idx, [],
monotone=True, iters=1, min_leaf_node_size=5, verbose=True)
zarray = mice.impute(xdf.to_numpy())
# convert the returned numpy array into a DataFrame
zdf = pd.DataFrame(zarray, columns=col).convert_dtypes()
# copy the original tax-returns DataFrame into an augmented DataFrame
adf = tdf.copy().convert_dtypes()
# assign the imputed variable values to the augmented tax-returns DataFrame
for var in IMPUTED_VARIABLES:
adf.loc[:, var] = zdf[var]
return adf
def process_command_line_arguments():
"""
Process command-line arguments returning a dictionary with values for
these variables: TRDATA, HSDATA, OUTPUT, tabulate
"""
usage_str = ('python impute_i2t.py TRDATA HSDATA OUTPUT '
'[--tabulate] [--help]')
parser = argparse.ArgumentParser(
prog='',
usage=usage_str,
description=('Impute HSDATE data to TRDATA and write resulting '
'TRDATA to CSV-formatted OUTPUT file.')
)
parser.add_argument('TRDATA', type=str,
help='Tax-return data file name.')
parser.add_argument('HSDATA', type=str,
help='Household-survey data file name.')
parser.add_argument('OUTPUT', type=str,
help='Merged OUTPUT file name.')
parser.add_argument(
'--tabulate', default=False, action='store_true',
help='option that specifies whether to do tabulations'
)
args = parser.parse_args()
# check command-line arguments
args_ok = True
if not args.TRDATA.endswith('.csv'):
sys.stderr.write(
'ERROR: TRDATA file name does not end with .csv\n'
)
args_ok = False
if not os.path.isfile(args.TRDATA):
sys.stderr.write(
'ERROR: TRDATA file does not exist\n'
)
args_ok = False
if not args.HSDATA.endswith('.csv'):
sys.stderr.write(
'ERROR: HSDATA file name does not end with .csv\n'
)
args_ok = False
if not os.path.isfile(args.HSDATA):
sys.stderr.write(
'ERROR: HSDATA file does not exist\n'
)
args_ok = False
if not args.OUTPUT.endswith('.csv'):
sys.stderr.write(
'ERROR: OUTPUT file name does not end with .csv\n'
)
args_ok = False
if args_ok:
return {
'TRDATA': args.TRDATA,
'HSDATA': args.HSDATA,
'OUTPUT': args.OUTPUT,
'tabulate': args.tabulate
}
sys.stderr.write(f'USAGE: {usage_str}\n')
return {}
# end of process_command_line_arguments function
if __name__ == '__main__':
cliarg = process_command_line_arguments()
if cliarg:
sys.exit(main(cliarg))
else:
sys.exit(1)