MYI-Tax-Analyzer

impute_i2t.py
Login

impute_i2t.py

File data/survey_data_task1/impute_i2t.py from the latest check-in


"""
Impute HSDATA data to TRDATA and write resulting TRDATA to OUTPUT file.
Only HSDATA with nonfiler=0 (that is, those simulated to be tax filers) are
used in the imputation process, which uses the Tax-Analyzer-Framework MICE
package.
"""

import os
import sys
import argparse
import numpy as np
import pandas as pd
import taf  # Tax-Analyzer-Framework


COMMON_VARIABLES = [
        'empinc', 'relief_kids_u18', 'disabled_indiv'
]
IMPUTED_VARIABLES = [
        'gender', 'age', 'strata', 'married', 'propinc', 'birinc', 'traninc'
]


def main(arg):
    """
    High-level logic.
    """
    # read TRDATA file into Pandas DataFrame
    tdf = pd.DataFrame(pd.read_csv(arg['TRDATA']))
    if arg['tabulate']:
        print_aggregate_statistics('TRDATA[pre] ', tdf)

    # read HSDATA file into Pandas DataFrame
    idf = pd.DataFrame(pd.read_csv(arg['HSDATA']))
    if arg['tabulate']:
        print_aggregate_statistics('HSDATA', idf)

    # impute missing variables to TRDATA using HSDATA and taf.MICE class
    adf = add_imputed_variables(tdf, idf[idf['nonfiler'] == 0].copy())
    del tdf
    del idf
    if arg['tabulate']:
        print_aggregate_statistics('TRDATA[post]', adf)

    # write adf DataFrame to OUTPUT file
    adf.to_csv(arg['OUTPUT'], index=False, float_format='%.2f')

    return 0
# end of main function


def print_aggregate_statistics(label, xdf):
    """
    Print mean statistics for specified label and data.
    """
    print(f'{label} OBSERVATIONS: {xdf.shape[0]}')
    total_weight = xdf['weight'].sum()
    if 'pre' in label:
        variables = ['empinc']
        scales = [1e-3]
    else:
        variables = ['empinc'] + IMPUTED_VARIABLES
        scales = [1e-3, 1.0, 1.0, 1.0, 1.0, 1e-3, 1e-3, 1e-3]
    for var, scale in zip(variables, scales):
        stat = np.dot(xdf[var], xdf['weight']) / total_weight
        print(f'{label} mean {var}: {(stat * scale):.3f}')
    print(f'{label} correlations:\n', xdf[variables].corr().to_string())


def add_imputed_variables(tdf, idf):
    """
    Add imputed variables from idf to tdf and return augmented tdf.
    Missing values in the tdf have a monotone missing data pattern, so can
    impute them sequentially with univariate models using the MICE package.
    """
    # construct DataFrame used by MICE package
    # ... first build the tax-returns subsample with many missing variables
    xtdf = pd.DataFrame({var: tdf[var] for var in COMMON_VARIABLES})
    for var in IMPUTED_VARIABLES:
        xtdf.loc[:, var] = np.nan
    # ... second build the survey-individuals subsample with no missing values
    xidf = idf[COMMON_VARIABLES + IMPUTED_VARIABLES]
    # ... combine the two subsamples into a single DataFrame
    xdf = pd.concat([xtdf, xidf], ignore_index=True)
    # construct list of imputed variables column index numbers
    col = list(xdf)
    x_idx = [col.index(var) for var in IMPUTED_VARIABLES]
    # construct instance of the MICE class and call its impute method
    mice = taf.MICE(xdf.shape[0], xdf.shape[1], x_idx, [],
                    monotone=True, iters=1, min_leaf_node_size=5, verbose=True)
    zarray = mice.impute(xdf.to_numpy())
    # convert the returned numpy array into a DataFrame
    zdf = pd.DataFrame(zarray, columns=col).convert_dtypes()
    # copy the original tax-returns DataFrame into an augmented DataFrame
    adf = tdf.copy().convert_dtypes()
    # assign the imputed variable values to the augmented tax-returns DataFrame
    for var in IMPUTED_VARIABLES:
        adf.loc[:, var] = zdf[var]
    return adf


def process_command_line_arguments():
    """
    Process command-line arguments returning a dictionary with values for
    these variables: TRDATA, HSDATA, OUTPUT, tabulate
    """
    usage_str = ('python impute_i2t.py TRDATA HSDATA OUTPUT '
                 '[--tabulate] [--help]')
    parser = argparse.ArgumentParser(
        prog='',
        usage=usage_str,
        description=('Impute HSDATE data to TRDATA and write resulting '
                     'TRDATA to CSV-formatted OUTPUT file.')
    )
    parser.add_argument('TRDATA', type=str,
                        help='Tax-return data file name.')
    parser.add_argument('HSDATA', type=str,
                        help='Household-survey data file name.')
    parser.add_argument('OUTPUT', type=str,
                        help='Merged OUTPUT file name.')
    parser.add_argument(
        '--tabulate', default=False, action='store_true',
        help='option that specifies whether to do tabulations'
    )
    args = parser.parse_args()
    # check command-line arguments
    args_ok = True
    if not args.TRDATA.endswith('.csv'):
        sys.stderr.write(
            'ERROR: TRDATA file name does not end with .csv\n'
        )
        args_ok = False
    if not os.path.isfile(args.TRDATA):
        sys.stderr.write(
            'ERROR: TRDATA file does not exist\n'
        )
        args_ok = False
    if not args.HSDATA.endswith('.csv'):
        sys.stderr.write(
            'ERROR: HSDATA file name does not end with .csv\n'
        )
        args_ok = False
    if not os.path.isfile(args.HSDATA):
        sys.stderr.write(
            'ERROR: HSDATA file does not exist\n'
        )
        args_ok = False
    if not args.OUTPUT.endswith('.csv'):
        sys.stderr.write(
            'ERROR: OUTPUT file name does not end with .csv\n'
        )
        args_ok = False
    if args_ok:
        return {
            'TRDATA': args.TRDATA,
            'HSDATA': args.HSDATA,
            'OUTPUT': args.OUTPUT,
            'tabulate': args.tabulate
        }
    sys.stderr.write(f'USAGE: {usage_str}\n')
    return {}
# end of process_command_line_arguments function


if __name__ == '__main__':
    cliarg = process_command_line_arguments()
    if cliarg:
        sys.exit(main(cliarg))
    else:
        sys.exit(1)