MYI-Tax-Analyzer

prep.py
Login

File data/prep.py from the latest check-in


"""
Prepare CSV-formatted file suitable for use by MYI-Tax-Analyzer from the
confidential data supplied to the World Bank by the Malaysia Ministry of
Finance.  The confidential data contain 2018 returns for the PIT Form B
and PIT Form BE.  Form B (with business) and Form BE (without business)
can be merged by defining a set of common variable names and mapping each
form variable to a common variable, leaving some business-related variables
at zero for those filing Form BE.

USAGE: execute this script in the MYI-Tax-Analyzer/data directory as follows:
$ python preprep.py
$ python prep.py
"""

import os
import sys
import numpy as np
import pandas as pd
import taf  # Tax Analyzer Framework


DATA_YEAR = 2018
INPUT_DATA_PATH = os.path.join('.')
INPUT_DATA_FILEPATH = {
    'B': os.path.join(INPUT_DATA_PATH, 'B18.csv'),
    'BE': os.path.join(INPUT_DATA_PATH, 'BE18.csv')
}
FORM_VALUE = {'B': 0, 'BE': 1}
KINDS = ['read', 'calc']  # kinds of output data files
OUTPUT_DATA_FILEPATH = {
    'read': {
        'B': os.path.join('.', 'B18_read.csv'),
        'BE': os.path.join('.', 'BE18_read.csv')
    },
    'calc': {
        'B': os.path.join('.', 'B18_calc.csv'),
        'BE': os.path.join('.', 'BE18_calc.csv')
    }
}
CODE_PATH = os.path.join('..', 'myitaxanalyzer')
RECVARS_JSON_FILEPATH = os.path.join(CODE_PATH, 'records_variables.json')
DTYPE = {'real': 'float64', 'integer': 'int64'}
CHECK_FOR_INCONSISTENCY = False


def compare_act_exp(act, exp, vname, form, zero_components=False):
    """
    Compare act and exp numpy arrays and print comparison results.
    """
    if CHECK_FOR_INCONSISTENCY:
        if np.allclose(act, exp):
            print(f'OK no inconsistencies in {vname} on Form {form}')
        else:
            num = (act != exp).sum()
            print(f'## inconsistencies (n={num}) in {vname} on Form {form}')
    # optionally check for cases with zero act and positive exp values
    if zero_components:
        cases = np.logical_and(act == 0, exp > 0)
        num = cases.sum()
        if num > 0:
            amt = exp[cases].sum()
            print((f'ZC zero act and positive exp values in {num} cases '
                   f'in {vname} on Form {form};\n        '
                   f'AMT(RMb)= {(amt * 1e-9):.3f}'))
        else:
            print(('OK no zero act and positive exp values '
                   f'in {vname} on Form {form}'))


def year_dict(yeardicts, cyr):
    """
    Return dictionary from a list of dictionaries that has cyr as year value.
    """
    assert isinstance(yeardicts, list), 'is not a list'
    for yeardict in yeardicts:
        assert isinstance(yeardict, dict), 'list item is not a dict'
        if yeardict['year'] == cyr:
            return yeardict
    assert cyr == 0, f'does not contain any list item with year={cyr}'
    return {}


def main(form=None):
    """
    High-level logic of the script.
    """
    # pylint: disable=too-many-statements,too-many-locals,too-many-branches
    print('data/prep.py executing ...')

    assert form in ('B', 'BE'), f'unknown form {form}'

    # read input data file into a data frame
    idf = pd.read_csv(INPUT_DATA_FILEPATH[form],
                      encoding='unicode_escape',  # Form B data contain Unicode
                      low_memory=False)  # some columns have mixed data types

    # replace all blank fields with zero
    idf.fillna(0, inplace=True)

    # check that data file has expected DATA_YEAR
    assert (idf.Tahun == DATA_YEAR).all()
    print(f'Form{form}_raw_count= {idf.shape[0]:d}')

    # combine three components of Form B line B1
    if form == 'B':
        idf['B1'] = (idf['B1a_Perniagaan 1'] +
                     idf['B1a_Perniagaan 2'] +
                     idf['B1a_Perniagaan 3 + 4'])

    # specify empty read and calc data frames in odf dictionary
    odf = {}
    for kind in KINDS:
        odf[kind] = pd.DataFrame()

    # specify recid, form, and late variables for both output data frames
    ones = np.ones(idf.shape[0], dtype=np.int64)
    for kind in KINDS:
        odf[kind]['recid'] = ones
        odf[kind]['form'] = ones * FORM_VALUE[form]
        odf[kind]['late'] = idf.late.astype(np.int64)

    # read variable mapping information from records_variables.json file
    # (vmap[kind] is map of output variable from input ival and dtype)
    recvar = taf.json2dict(RECVARS_JSON_FILEPATH)
    vmap = {'read': {}, 'calc': {}}
    for kind in KINDS:
        for avar in recvar[kind]:
            if 'map' in recvar[kind][avar]:
                vmap[kind][avar] = {}
                try:
                    mapdict = year_dict(recvar[kind][avar]['map'], DATA_YEAR)
                except AssertionError as err_msg:
                    msg = str(err_msg).rstrip('\n').lstrip('\n')
                    print(f'ERROR: {kind} {avar} map {msg}')
                    sys.exit(1)
                vmap[kind][avar]['ival'] = mapdict[form]
                vmap[kind][avar]['dtype'] = mapdict['dtype']

    # add common variables to output data frames
    for kind in KINDS:
        for avar in vmap[kind]:
            vmp = vmap[kind][avar]
            ival = vmp['ival']
            if ival == 0:
                odf[kind][avar] = np.zeros_like(ones)
            else:
                assert ival in idf, f'{form} {kind} {avar} {ival}'
                odf[kind][avar] = idf[ival].astype(DTYPE[vmp['dtype']])

    # convert values of read variables disabled_indiv and disabled_spouse
    relief_amt = odf['read']['disabled_indiv']
    odf['read']['disabled_indiv'] = np.where(relief_amt > 1000, 1, 0)
    relief_amt = odf['read']['disabled_spouse']
    odf['read']['disabled_spouse'] = np.where(relief_amt > 1000, 1, 0)

    # check consistency of variables in the two output data frames
    read = odf['read']
    calc = odf['calc']
    # ... check businc_net for consistency
    act = np.maximum(0, read.businc - read.busloss_prior)
    exp = calc.businc_net
    compare_act_exp(act, exp, 'businc_net', form, zero_components=True)
    # ... check agginc for consistency
    act = calc.businc_net + read.empinc + read.rentinc + read.intinc
    exp = calc.agginc
    compare_act_exp(act, exp, 'agginc', form, zero_components=True)
    # ... check gift_total for consistency
    act = (read.gift_gvt + read.gift_org + read.gift_art + read.gift_lib +
           read.gift_dis + read.gift_med + read.gift_paint)
    exp = calc.gift_total
    compare_act_exp(act, exp, 'gift_total', form, zero_components=True)
    # ... compute totinc(self)
    nonnegative = np.maximum(0, (calc.agginc - read.busloss_cyear -
                                 read.otherexp - calc.gift_total))
    totinc_self = nonnegative + read.pioneerinc
    # ... check totinc for consistency
    act = totinc_self + read.totinc_transfer
    exp = calc.totinc
    compare_act_exp(act, exp, 'totinc', form)
    # ... check relief_total for consistency
    act = (read.relief_parent + read.relief_dis_equip +
           read.relief_educ + read.relief_medexps + read.relief_medexam +
           read.relief_lifestyle + read.relief_bfeed + read.relief_childcare +
           read.relief_sspn + read.relief_alimony +
           read.relief_kids_u18 + read.relief_kids_o17 + read.relief_kids_dis +
           read.relief_lifeins + read.relief_prvtret + read.relief_insurance +
           read.relief_socso)
    # subtract from Form relief_total the field F1 value which is not in act
    expraw = calc.relief_total - 9000
    exp = np.where(expraw <= 1e-1, 0, expraw)  # handle possible rounding error
    del expraw
    compare_act_exp(act, exp, 'relief_total', form, zero_components=True)
    # ... check charginc for consistency
    act = np.maximum(0, calc.totinc - calc.relief_total)
    exp = calc.charginc
    compare_act_exp(act, exp, 'charginc', form)
    # ... check tottax_charged for consistency
    act = np.maximum(0, calc.tottax - read.rebate)
    exp = calc.tottax_charged
    compare_act_exp(act, exp, 'tottax_charged', form)
    # ... compute pitax
    pitax = calc.tottax_charged - read.credit
    calc['pitax'] = pitax
    exp = calc.pitax
    # ... check tax_payable for consistency
    act = np.maximum(0, pitax)
    exp = calc.tax_payable
    compare_act_exp(act, exp, 'tax_payable', form)
    # ... check tax_repayable for consistency
    act = np.maximum(0, -pitax)
    exp = calc.tax_repayable
    compare_act_exp(act, exp, 'tax_repayable', form)

    # write output data frames to CSV files
    for kind in KINDS:
        taf.df2csv(odf[kind], OUTPUT_DATA_FILEPATH[kind][form])

    return 0


if __name__ == '__main__':
    if main(form='B') != 0:
        sys.exit(1)
    if main(form='BE') != 0:
        sys.exit(1)
    sys.exit(0)