"""
Test data-prep method in a situation where adult population data is sampled
by population survey and aggregate statistics on number tax filers and their
income are known. Objective is to use these aggregate statistics to impute
the missing tax filer variable in the survey data using the aggregate stats
on the number in the population who are tax filers and on the amount of the
income received by tax filers.
The method is to fit a tax filer probability model that generates the known
aggregate statistics.
"""
import sys
import time
import numpy as np
import pandas as pd
from scipy import optimize
# debugging parameters:
CSV_WRITE = False
# population parameters:
SEED_POP = 123456789
SIZE_POP = 1_000_000
# dPlN income distribution parameters:
MEAN = 9.0
SDEV = 1.0
ALPHA = 2.0
BETA = 2.0
# filing probability parameters:
# (calibrated so that 10% of population with 50% of income are filers)
SEED_FILER = 12345
FIND_COEFFS = True
FIND_TOL = 1e-6
FCONST = -9.0 # if FIND_COEFFS, use as root initial value
FSLOPE = 0.20 # if FIND_COEFFS, use as root initial value
FILER_NUMBER_FRAC = 0.10 # if FIND_COEFFS, use as root target value
FILER_INCOME_FRAC = 0.50 # if FIND_COEFFS, use as root target value
# survey sampling parameters:
# (probability calibrated so that survey sample size is 2% of the population)
SEED_SURVEY = 987654321
PROB_SURVEY = 0.0201744
# survey filer parameters:
# (calibrated so that number of imputed filers in the survey data and
# there incomes are close to aggregate targets)
SEED_IMPUTE = 345678912
TIME_IMPUTE = False
# descriptive statistics parameters:
PCTILES = [0.01, 0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99]
def main():
"""
High-level logic.
"""
# pylint: disable=too-many-statements,too-many-locals
# ----------------------------------------------------------------
# (*) construct large population data set
# ----------------------------------------------------------------
# construct pop data set
rng = np.random.default_rng(seed=SEED_POP)
# ... income variable (dPlN distribution)
norm = rng.normal(size=SIZE_POP)
expa = rng.exponential(size=SIZE_POP)
expb = rng.exponential(size=SIZE_POP)
income = np.exp(MEAN + norm*SDEV + expa/ALPHA - expb/BETA).astype(int)
del rng
# construct filer variable (filing probability depends on income)
rng = np.random.default_rng(seed=SEED_FILER)
urn_filer = rng.random(size=SIZE_POP)
del rng
if FIND_COEFFS:
x_0 = np.array([FCONST, FSLOPE])
target = np.array([FILER_NUMBER_FRAC, FILER_INCOME_FRAC])
res = optimize.root(rfunc, x_0, tol=FIND_TOL,
args=(urn_filer, income, target,))
fconst = res['x'][0]
fslope = res['x'][1]
print('Finding tax filing probability parameters:', res['message'])
solution = f' fconst={fconst:.5f} fslope={fslope:.5f}'
print(f'After {res["nfev"]} function evaluations: {solution}')
else:
fconst = FCONST
fslope = FSLOPE
exp_xb = np.exp(np.clip(fconst + fslope * income * 1e-3, -500, 500))
fprob = exp_xb / (1.0 + exp_xb)
filer = np.where(urn_filer < fprob, 1, 0)
print(('pop_filer_number_fraction= '
f'{(filer[filer == 1].sum() / SIZE_POP):.5f}'))
print(('pop_filer_income_fraction= '
f'{(income[filer == 1].sum() / income.sum()):.5f}'))
# ... create pop dataframe
pop = pd.DataFrame({
'income': income,
'fprob': fprob,
'filer': filer,
'weight': np.ones(SIZE_POP)
})
print('==> pop:\n', pop.describe(percentiles=PCTILES))
print(pop.corr())
# ----------------------------------------------------------------
# (*) sample population data set to get survey data set
# ----------------------------------------------------------------
# transform pop data set into survey data set
rng = np.random.default_rng(seed=SEED_SURVEY)
sampled = np.where(rng.random(size=SIZE_POP) < PROB_SURVEY, 1, 0)
del rng
sur = pop.loc[sampled == 1, :].copy()
sur['weight'] = 1.0 / PROB_SURVEY
print('==> sur:\n', sur.describe(percentiles=PCTILES))
print(sur.corr())
if CSV_WRITE:
sur.to_csv('test2_sur.csv', index=False, float_format='%.2f')
# ----------------------------------------------------------------
# (*) fit logit probability model that predicts filing status
# ----------------------------------------------------------------
# impute tax filing status in survey data
imp = sur.copy()
imp.loc[:, 'filer'] = 0
time0 = time.time()
# ... compute aggregate population statistics used in imputation
pop_filers = pop[pop['filer'] == 1]
agg_number_filers = np.sum(pop_filers['weight'])
agg_income_filers = np.dot(pop_filers['income'], pop_filers['weight'])
print(f'agg_number_filers= {agg_number_filers:.0f}')
print(f'agg_income_filers= {(agg_income_filers * 1e-9):.3f} billion')
# ... set fixed func parameters
rng = np.random.default_rng(seed=SEED_IMPUTE)
urn = rng.random(size=sur.shape[0])
del rng
income = imp['income'].to_numpy()
weight = imp['weight'].to_numpy()
params = (urn, income, weight, agg_number_filers, agg_income_filers,)
# ... set logit coefficient values grid for brute minimization grid search
xslices = (
slice(-40, -10, 1.0), # const term
slice(0.0, 2.0, 0.1), # income slope
)
# ... use Nelder-Mead (amoeba) algorithm to finish brute minimization
res = optimize.brute(func, xslices, args=params,
full_output=True, disp=True, workers=-1,
finish=optimize.fmin)
xvar = res[0]
print('xvar=', xvar) # xvar at func minimum
print('func(xvar)=', res[1]) # minimum function value
# ... use xvar logit-equation coefficients to impute filer variable
const = xvar[0]
slope = xvar[1]
exp_xb = np.exp(np.clip(const + slope * income * 1e-3, -500, 500))
fprob = exp_xb / (1.0 + exp_xb)
imp.loc[:, 'filer'] = np.where(urn < fprob, 1, 0)
# ----------------------------------------------------------------
# (*) compare imputed and actual filing filing status
# ----------------------------------------------------------------
# ... compute aggregate filer income using imputed filing status
imp_filer = imp[imp['filer'] == 1]
afi = np.dot(imp_filer['income'], imp_filer['weight'])
print(f'imputed_aggregate_filer_income= {(afi * 1e-9):.3f} billion')
if TIME_IMPUTE:
print(f'impute_exec_time(secs)= {(time.time() - time0):.1f}')
print('==> imp:\n', imp.describe(percentiles=PCTILES))
print(imp.corr())
# ... generate contingency table for sur and imp filer variables
print('filer variable crosstab:')
print(pd.crosstab(imp['filer'], sur['filer'],
rownames=['impute'], colnames=['survey:'],
dropna=False, margins=True))
if CSV_WRITE:
imp.to_csv('test2_imp.csv', index=False, float_format='%.2f')
return 0
# end of main function code
def rfunc(prm, *args):
"""
Return vector-valued function values used in root finding.
"""
fconst = prm[0]
fslope = prm[1]
urn = args[0]
income = args[1]
target = args[2]
exp_xb = np.exp(np.clip(fconst + fslope * income * 1e-3, -500, 500))
fprob = exp_xb / (1.0 + exp_xb)
filer = np.where(urn < fprob, 1.0, 0.0)
fval = np.empty((2))
fval[0] = filer[filer > 0.5].sum() / len(urn) - target[0]
fval[1] = income[filer > 0.5].sum() / income.sum() - target[1]
return fval
def func(prm, *args):
"""
Return sum of squared deviations from filer number and income targets,
where imputed filing status is modeled as a logit function with a
constant term and an income slope coefficient.
"""
const = prm[0]
slope = prm[1]
urn = args[0]
income = args[1]
weight = args[2]
target_num = args[3]
target_inc = args[4]
exp_xb = np.exp(np.clip(const + slope * income * 1e-3, -500, 500))
fprob = exp_xb / (1.0 + exp_xb)
filer = urn < fprob
filer_num = weight[filer].sum()
filer_inc = np.dot(income[filer], weight[filer])
fval = (filer_num / target_num - 1) ** 2 + \
(filer_inc / target_inc - 1) ** 2
return fval
if __name__ == '__main__':
sys.exit(main())