"""
Merge B18_read.csv and BE18_read.csv files (created by prep.py)
into gzipped data and weights files,
specifying unique recid values in the merged data file.
These files are written both for a complete sample and for a
stratified random sample.
USAGE: execute this script in the MYI-Tax-Analyzer/data directory (after
running prep.py) as follows:
$ python preprep.py
$ python prep.py
$ python merge.py
"""
import os
import sys
import subprocess
import numpy as np
import pandas as pd
import taf # Tax Analyzer Framework
FORMS = ['B', 'BE']
INPUT_FILEPATH = {
'B': os.path.join('.', 'B18_read.csv'),
'BE': os.path.join('.', 'BE18_read.csv')
}
RNSEED = 456789
STRATUM_LIST = [
{'grossinc_floor': -9e99, 'grossinc_ceiling': 100e3, 'sprob': 0.0500},
{'grossinc_floor': 100e3, 'grossinc_ceiling': 500e3, 'sprob': 0.2500},
{'grossinc_floor': 500e3, 'grossinc_ceiling': 99e99, 'sprob': 1.0000}
]
DATA_FILEPATH = {
'all': os.path.join('.', 'all.csv'),
'smpl': os.path.join('..', 'myitaxanalyzer', 'tax.csv')
}
GROWRATES_FILEPATH = os.path.join('.', 'grow.csv')
FIRST_WGTS_FILE_YEAR = 2018
LAST_WGTS_FILE_YEAR = 2025
WGTS_FILEPATH = {
'all': os.path.join('.', 'all_weights.csv'),
'smpl': os.path.join('..', 'myitaxanalyzer', 'tax_weights.csv')
}
CALC_FILEPATH = {
'B': os.path.join('.', 'B18_calc.csv'),
'BE': os.path.join('.', 'BE18_calc.csv')
}
EXPECT_FILEPATH = os.path.join('.', 'expect.csv')
def create_expect(weights):
"""
Merge *_calc.csv files into a single file containing expected tax results.
"""
# read input file for each form
calcdata = {}
for form in FORMS:
calcdata[form] = pd.read_csv(CALC_FILEPATH[form])
# merge the rows from each calc data frame
edf = pd.concat(calcdata, ignore_index=True, copy=False)
# specify unique recid values
nobs = edf.shape[0]
edf.recid = np.array(range(1, nobs+1), dtype=np.int64)
# specify weight variable
edf['weight'] = weights
# write expect CSV file
taf.df2csv(edf, EXPECT_FILEPATH)
# compress written CSV file in gzip format
taf.compress(EXPECT_FILEPATH)
taf.delete_file(EXPECT_FILEPATH)
def main():
"""
High-level logic of the script.
"""
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
print('data/merge.py executing ...')
# read input file for each form
indata = {}
for form in FORMS:
indata[form] = pd.read_csv(INPUT_FILEPATH[form])
# merge the rows from each input data frame into the whole data frame
wdf = pd.concat(indata, ignore_index=True, copy=False)
# specify unique recid values in the whole data frame
nobs = wdf.shape[0]
wdf.recid = np.array(range(1, nobs+1), dtype=np.int64)
# read and check info in GROWRATES file
grdf = pd.read_csv(GROWRATES_FILEPATH)
grdf_years = grdf['year'].to_list()
expected_years = list(range(FIRST_WGTS_FILE_YEAR, LAST_WGTS_FILE_YEAR+1))
assert grdf_years == expected_years
# draw all/some records from the whole data frame
# ... compute gross income variable used to define sampling strata
wdf['grossinc'] = (wdf.businc + wdf.empinc + wdf.rentinc + wdf.intinc +
wdf.pioneerinc + wdf.totinc_transfer)
# ... specify value of whole data frame pre-sampling weight variable
wdf_weights = np.ones(wdf.shape[0])
wdf['weight'] = wdf_weights
# ... draw random sample from each sampling stratum
sdf = {}
sdf['all'] = {}
sdf['smpl'] = {}
sinfo = 'stratum {} contains {:7d} records and uses sampling prob {:.3f}'
for doing_sampling in [False, True]:
if doing_sampling:
skind = 'smpl'
else:
skind = 'all'
seed = RNSEED
print(f'*** Sample kind = {skind} rng seed = {seed}')
for stratum, info in enumerate(STRATUM_LIST, start=1):
seed += 1000
gilo = info['grossinc_floor']
if stratum > 1:
assert gilo == gihi
gihi = info['grossinc_ceiling']
sprob = info['sprob']
assert 0.0 < sprob <= 1.0
stratum_df = wdf[(gilo < wdf.grossinc) & (wdf.grossinc <= gihi)]
if doing_sampling:
print(sinfo.format(stratum, stratum_df.shape[0], sprob))
sdf[skind][stratum] = stratum_df.sample(frac=sprob,
random_state=seed)
sdf[skind][stratum]['weight'] /= sprob
else:
print(sinfo.format(stratum, stratum_df.shape[0], 1.0))
sdf[skind][stratum] = stratum_df
print(f' included records {sdf[skind][stratum].shape[0]:7d}')
# ... concatenate the stratum samples into the output data frame
odf = pd.concat(sdf[skind], ignore_index=True, copy=False)
print(f'total number of records {odf.shape[0]:7d}')
# ... save post-sampling weight values
weight = odf.weight.to_numpy(copy=True)
# ... remove grossinc and weight columns
odf.drop(columns=['weight', 'grossinc'], inplace=True)
# ... write merged data to CSV file
taf.df2csv(odf, DATA_FILEPATH[skind])
# ... compress written CSV file in gzip format
if skind == 'smpl':
taf.compress(DATA_FILEPATH[skind])
taf.delete_file(DATA_FILEPATH[skind])
# ... specify weights dataframe and write to CSV file
wtdf = pd.DataFrame()
wtdf[f'WT{FIRST_WGTS_FILE_YEAR}'] = weight
taf.df2csv(wtdf, WGTS_FILEPATH[skind], float_format='%.3f')
# ... compress written CSV file in gzip format
if skind == 'smpl':
taf.compress(WGTS_FILEPATH[skind])
taf.delete_file(WGTS_FILEPATH[skind])
# create expected tax results file
print('creating expect.csv.gz file ...')
create_expect(wtdf.WT2018)
# replace the two all *.csv files with a single *.zip file
print('creating all-csv.zip file ...')
dfname = DATA_FILEPATH['all']
wfname = WGTS_FILEPATH['all']
cmd = f'zip -m all-csv.zip {dfname} {wfname}'
subprocess.run(cmd.split(), check=True)
return 0
if __name__ == '__main__':
sys.exit(main())