Tax-Analyzer-Framework

test2.py
Login

File taf/dptests/test2.py from the latest check-in


"""
Test data-prep method in a situation where adult population data is sampled
by population survey and aggregate statistics on number tax filers and their
income are known.  Objective is to use these aggregate statistics to impute
the missing tax filer variable in the survey data using the aggregate stats
on the number in the population who are tax filers and on the amount of the
income received by tax filers.

The method is to fit a tax filer probability model that generates the known
aggregate statistics.
"""

import sys
import time
import numpy as np
import pandas as pd
from scipy import optimize

# debugging parameters:
CSV_WRITE = False

# population parameters:
SEED_POP = 123456789
SIZE_POP = 1_000_000

# dPlN income distribution parameters:
MEAN = 9.0
SDEV = 1.0
ALPHA = 2.0
BETA = 2.0

# filing probability parameters:
# (calibrated so that 10% of population with 50% of income are filers)
SEED_FILER = 12345
FIND_COEFFS = True
FIND_TOL = 1e-6
FCONST = -9.0  # if FIND_COEFFS, use as root initial value
FSLOPE = 0.20  # if FIND_COEFFS, use as root initial value
FILER_NUMBER_FRAC = 0.10  # if FIND_COEFFS, use as root target value
FILER_INCOME_FRAC = 0.50  # if FIND_COEFFS, use as root target value

# survey sampling parameters:
# (probability calibrated so that survey sample size is 2% of the population)
SEED_SURVEY = 987654321
PROB_SURVEY = 0.0201744

# survey filer parameters:
# (calibrated so that number of imputed filers in the survey data and
#  there incomes are close to aggregate targets)
SEED_IMPUTE = 345678912
TIME_IMPUTE = False

# descriptive statistics parameters:
PCTILES = [0.01, 0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99]


def main():
    """
    High-level logic.
    """
    # pylint: disable=too-many-statements,too-many-locals
    # ----------------------------------------------------------------
    # (*) construct large population data set
    # ----------------------------------------------------------------
    # construct pop data set
    rng = np.random.default_rng(seed=SEED_POP)
    # ... income variable (dPlN distribution)
    norm = rng.normal(size=SIZE_POP)
    expa = rng.exponential(size=SIZE_POP)
    expb = rng.exponential(size=SIZE_POP)
    income = np.exp(MEAN + norm*SDEV + expa/ALPHA - expb/BETA).astype(int)
    del rng
    # construct filer variable (filing probability depends on income)
    rng = np.random.default_rng(seed=SEED_FILER)
    urn_filer = rng.random(size=SIZE_POP)
    del rng
    if FIND_COEFFS:
        x_0 = np.array([FCONST, FSLOPE])
        target = np.array([FILER_NUMBER_FRAC, FILER_INCOME_FRAC])
        res = optimize.root(rfunc, x_0, tol=FIND_TOL,
                            args=(urn_filer, income, target,))
        fconst = res['x'][0]
        fslope = res['x'][1]
        print('Finding tax filing probability parameters:', res['message'])
        solution = f' fconst={fconst:.5f}  fslope={fslope:.5f}'
        print(f'After {res["nfev"]} function evaluations: {solution}')
    else:
        fconst = FCONST
        fslope = FSLOPE
    exp_xb = np.exp(np.clip(fconst + fslope * income * 1e-3, -500, 500))
    fprob = exp_xb / (1.0 + exp_xb)
    filer = np.where(urn_filer < fprob, 1, 0)
    print(('pop_filer_number_fraction= '
           f'{(filer[filer == 1].sum() / SIZE_POP):.5f}'))
    print(('pop_filer_income_fraction= '
           f'{(income[filer == 1].sum() / income.sum()):.5f}'))
    # ... create pop dataframe
    pop = pd.DataFrame({
        'income': income,
        'fprob': fprob,
        'filer': filer,
        'weight': np.ones(SIZE_POP)
    })
    print('==> pop:\n', pop.describe(percentiles=PCTILES))
    print(pop.corr())
    # ----------------------------------------------------------------
    # (*) sample population data set to get survey data set
    # ----------------------------------------------------------------
    # transform pop data set into survey data set
    rng = np.random.default_rng(seed=SEED_SURVEY)
    sampled = np.where(rng.random(size=SIZE_POP) < PROB_SURVEY, 1, 0)
    del rng
    sur = pop.loc[sampled == 1, :].copy()
    sur['weight'] = 1.0 / PROB_SURVEY
    print('==> sur:\n', sur.describe(percentiles=PCTILES))
    print(sur.corr())
    if CSV_WRITE:
        sur.to_csv('test2_sur.csv', index=False, float_format='%.2f')
    # ----------------------------------------------------------------
    # (*) fit logit probability model that predicts filing status
    # ----------------------------------------------------------------
    # impute tax filing status in survey data
    imp = sur.copy()
    imp.loc[:, 'filer'] = 0
    time0 = time.time()
    # ... compute aggregate population statistics used in imputation
    pop_filers = pop[pop['filer'] == 1]
    agg_number_filers = np.sum(pop_filers['weight'])
    agg_income_filers = np.dot(pop_filers['income'], pop_filers['weight'])
    print(f'agg_number_filers= {agg_number_filers:.0f}')
    print(f'agg_income_filers= {(agg_income_filers * 1e-9):.3f} billion')
    # ... set fixed func parameters
    rng = np.random.default_rng(seed=SEED_IMPUTE)
    urn = rng.random(size=sur.shape[0])
    del rng
    income = imp['income'].to_numpy()
    weight = imp['weight'].to_numpy()
    params = (urn, income, weight, agg_number_filers, agg_income_filers,)
    # ... set logit coefficient values grid for brute minimization grid search
    xslices = (
        slice(-40, -10, 1.0),  # const term
        slice(0.0, 2.0, 0.1),  # income slope
    )
    # ... use Nelder-Mead (amoeba) algorithm to finish brute minimization
    res = optimize.brute(func, xslices, args=params,
                         full_output=True, disp=True, workers=-1,
                         finish=optimize.fmin)
    xvar = res[0]
    print('xvar=', xvar)  # xvar at func minimum
    print('func(xvar)=', res[1])  # minimum function value
    # ... use xvar logit-equation coefficients to impute filer variable
    const = xvar[0]
    slope = xvar[1]
    exp_xb = np.exp(np.clip(const + slope * income * 1e-3, -500, 500))
    fprob = exp_xb / (1.0 + exp_xb)
    imp.loc[:, 'filer'] = np.where(urn < fprob, 1, 0)
    # ----------------------------------------------------------------
    # (*) compare imputed and actual filing filing status
    # ----------------------------------------------------------------
    # ...  compute aggregate filer income using imputed filing status
    imp_filer = imp[imp['filer'] == 1]
    afi = np.dot(imp_filer['income'], imp_filer['weight'])
    print(f'imputed_aggregate_filer_income= {(afi * 1e-9):.3f} billion')
    if TIME_IMPUTE:
        print(f'impute_exec_time(secs)= {(time.time() - time0):.1f}')
    print('==> imp:\n', imp.describe(percentiles=PCTILES))
    print(imp.corr())
    # ... generate contingency table for sur and imp filer variables
    print('filer variable crosstab:')
    print(pd.crosstab(imp['filer'], sur['filer'],
                      rownames=['impute'], colnames=['survey:'],
                      dropna=False, margins=True))
    if CSV_WRITE:
        imp.to_csv('test2_imp.csv', index=False, float_format='%.2f')
    return 0
# end of main function code


def rfunc(prm, *args):
    """
    Return vector-valued function values used in root finding.
    """
    fconst = prm[0]
    fslope = prm[1]
    urn = args[0]
    income = args[1]
    target = args[2]
    exp_xb = np.exp(np.clip(fconst + fslope * income * 1e-3, -500, 500))
    fprob = exp_xb / (1.0 + exp_xb)
    filer = np.where(urn < fprob, 1.0, 0.0)
    fval = np.empty((2))
    fval[0] = filer[filer > 0.5].sum() / len(urn) - target[0]
    fval[1] = income[filer > 0.5].sum() / income.sum() - target[1]
    return fval


def func(prm, *args):
    """
    Return sum of squared deviations from filer number and income targets,
    where imputed filing status is modeled as a logit function with a
    constant term and an income slope coefficient.
    """
    const = prm[0]
    slope = prm[1]
    urn = args[0]
    income = args[1]
    weight = args[2]
    target_num = args[3]
    target_inc = args[4]
    exp_xb = np.exp(np.clip(const + slope * income * 1e-3, -500, 500))
    fprob = exp_xb / (1.0 + exp_xb)
    filer = urn < fprob
    filer_num = weight[filer].sum()
    filer_inc = np.dot(income[filer], weight[filer])
    fval = (filer_num / target_num - 1) ** 2 + \
           (filer_inc / target_inc - 1) ** 2
    return fval


if __name__ == '__main__':
    sys.exit(main())