MYI-Tax-Analyzer

merge.py
Login

File data/merge.py from the latest check-in


"""
Merge B18_read.csv and BE18_read.csv files (created by prep.py)
into gzipped data and weights files,
specifying unique recid values in the merged data file.

These files are written both for a complete sample and for a
stratified random sample.

USAGE: execute this script in the MYI-Tax-Analyzer/data directory (after
running prep.py) as follows:
$ python preprep.py
$ python prep.py
$ python merge.py
"""

import os
import sys
import subprocess
import numpy as np
import pandas as pd
import taf  # Tax Analyzer Framework


FORMS = ['B', 'BE']
INPUT_FILEPATH = {
    'B': os.path.join('.', 'B18_read.csv'),
    'BE': os.path.join('.', 'BE18_read.csv')
}
RNSEED = 456789
STRATUM_LIST = [
    {'grossinc_floor': -9e99, 'grossinc_ceiling': 100e3, 'sprob': 0.0500},
    {'grossinc_floor': 100e3, 'grossinc_ceiling': 500e3, 'sprob': 0.2500},
    {'grossinc_floor': 500e3, 'grossinc_ceiling': 99e99, 'sprob': 1.0000}
]
DATA_FILEPATH = {
    'all': os.path.join('.', 'all.csv'),
    'smpl': os.path.join('..', 'myitaxanalyzer', 'tax.csv')
}
GROWRATES_FILEPATH = os.path.join('.', 'grow.csv')
FIRST_WGTS_FILE_YEAR = 2018
LAST_WGTS_FILE_YEAR = 2025
WGTS_FILEPATH = {
    'all': os.path.join('.', 'all_weights.csv'),
    'smpl': os.path.join('..', 'myitaxanalyzer', 'tax_weights.csv')
}
CALC_FILEPATH = {
    'B': os.path.join('.', 'B18_calc.csv'),
    'BE': os.path.join('.', 'BE18_calc.csv')
}
EXPECT_FILEPATH = os.path.join('.', 'expect.csv')


def create_expect(weights):
    """
    Merge *_calc.csv files into a single file containing expected tax results.
    """
    # read input file for each form
    calcdata = {}
    for form in FORMS:
        calcdata[form] = pd.read_csv(CALC_FILEPATH[form])
    # merge the rows from each calc data frame
    edf = pd.concat(calcdata, ignore_index=True, copy=False)
    # specify unique recid values
    nobs = edf.shape[0]
    edf.recid = np.array(range(1, nobs+1), dtype=np.int64)
    # specify weight variable
    edf['weight'] = weights
    # write expect CSV file
    taf.df2csv(edf, EXPECT_FILEPATH)
    # compress written CSV file in gzip format
    taf.compress(EXPECT_FILEPATH)
    taf.delete_file(EXPECT_FILEPATH)


def main():
    """
    High-level logic of the script.
    """
    # pylint: disable=too-many-locals,too-many-statements,too-many-branches

    print('data/merge.py executing ...')

    # read input file for each form
    indata = {}
    for form in FORMS:
        indata[form] = pd.read_csv(INPUT_FILEPATH[form])

    # merge the rows from each input data frame into the whole data frame
    wdf = pd.concat(indata, ignore_index=True, copy=False)

    # specify unique recid values in the whole data frame
    nobs = wdf.shape[0]
    wdf.recid = np.array(range(1, nobs+1), dtype=np.int64)

    # read and check info in GROWRATES file
    grdf = pd.read_csv(GROWRATES_FILEPATH)
    grdf_years = grdf['year'].to_list()
    expected_years = list(range(FIRST_WGTS_FILE_YEAR, LAST_WGTS_FILE_YEAR+1))
    assert grdf_years == expected_years

    # draw all/some records from the whole data frame
    # ... compute gross income variable used to define sampling strata
    wdf['grossinc'] = (wdf.businc + wdf.empinc + wdf.rentinc + wdf.intinc +
                       wdf.pioneerinc + wdf.totinc_transfer)
    # ... specify value of whole data frame pre-sampling weight variable
    wdf_weights = np.ones(wdf.shape[0])
    wdf['weight'] = wdf_weights
    # ... draw random sample from each sampling stratum
    sdf = {}
    sdf['all'] = {}
    sdf['smpl'] = {}
    sinfo = 'stratum {} contains {:7d} records and uses sampling prob {:.3f}'
    for doing_sampling in [False, True]:
        if doing_sampling:
            skind = 'smpl'
        else:
            skind = 'all'
        seed = RNSEED
        print(f'*** Sample kind = {skind}     rng seed = {seed}')
        for stratum, info in enumerate(STRATUM_LIST, start=1):
            seed += 1000
            gilo = info['grossinc_floor']
            if stratum > 1:
                assert gilo == gihi
            gihi = info['grossinc_ceiling']
            sprob = info['sprob']
            assert 0.0 < sprob <= 1.0
            stratum_df = wdf[(gilo < wdf.grossinc) & (wdf.grossinc <= gihi)]
            if doing_sampling:
                print(sinfo.format(stratum, stratum_df.shape[0], sprob))
                sdf[skind][stratum] = stratum_df.sample(frac=sprob,
                                                        random_state=seed)
                sdf[skind][stratum]['weight'] /= sprob
            else:
                print(sinfo.format(stratum, stratum_df.shape[0], 1.0))
                sdf[skind][stratum] = stratum_df
            print(f'   included records {sdf[skind][stratum].shape[0]:7d}')
        # ... concatenate the stratum samples into the output data frame
        odf = pd.concat(sdf[skind], ignore_index=True, copy=False)
        print(f'total number of records {odf.shape[0]:7d}')
        # ... save post-sampling weight values
        weight = odf.weight.to_numpy(copy=True)
        # ... remove grossinc and weight columns
        odf.drop(columns=['weight', 'grossinc'], inplace=True)
        # ... write merged data to CSV file
        taf.df2csv(odf, DATA_FILEPATH[skind])
        # ... compress written CSV file in gzip format
        if skind == 'smpl':
            taf.compress(DATA_FILEPATH[skind])
            taf.delete_file(DATA_FILEPATH[skind])
        # ... specify weights dataframe and write to CSV file
        wtdf = pd.DataFrame()
        wtdf[f'WT{FIRST_WGTS_FILE_YEAR}'] = weight
        taf.df2csv(wtdf, WGTS_FILEPATH[skind], float_format='%.3f')
        # ... compress written CSV file in gzip format
        if skind == 'smpl':
            taf.compress(WGTS_FILEPATH[skind])
            taf.delete_file(WGTS_FILEPATH[skind])

    # create expected tax results file
    print('creating expect.csv.gz file ...')
    create_expect(wtdf.WT2018)

    # replace the two all *.csv files with a single *.zip file
    print('creating all-csv.zip file ...')
    dfname = DATA_FILEPATH['all']
    wfname = WGTS_FILEPATH['all']
    cmd = f'zip -m all-csv.zip {dfname} {wfname}'
    subprocess.run(cmd.split(), check=True)

    return 0


if __name__ == '__main__':
    sys.exit(main())