-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtabs.py
131 lines (113 loc) · 3.98 KB
/
tabs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- coding: utf-8 -*-
#As a work of the United States government, this project is in the public
#domain within the United States. Additionally, we waive copyright and related
#rights in the work worldwide through the CC0 1.0 Universal public domain
#dedication (https://creativecommons.org/publicdomain/zero/1.0/)
"""
Module
Args:
1 (int) - number of workers
Example execution:
$ setsid python tabs.py 20 cef
Part of the replication archive for The U.S. Census Bureau's Ex Post
Confidentiality Analysis of the 2010 Census Data Publications
(https://github.com/uscensusbureau/recon_replication)
"""
import re
import csv
import time
import multiprocessing as mp
import os
import glob
import psutil
import sys
import logging
import pandas as pd
import numpy as np
import subprocess
import threading
import json
import pathlib
import argparse
import itertools
import math
# MODULE LEVEL VARIABLES
# number of worker threads
#numWorkers = int(sys.argv[1])
# dir of program
myRoot = os.path.dirname(os.path.abspath(__file__))
# param dictionary
params = json.load(open(myRoot+'/../common/config.json','r'))
# import common py
sys.path.insert(0,myRoot+'/../common/')
from tabber import Tabber
# date time stamp to be put in outfile names
date = repr(os.popen('date +"%m-%d-%y"').read() +
os.popen('date +"%T"').read()).replace(r"\n"," ").replace("'","").strip().replace(" ","_")
def main(numWorkers,src):
"""Main body of tab program.
"""
numWorkers = int(numWorkers)
# Open log file
sys.stderr = open(myRoot+'/tabs_{0}.error'.format(date),'a')
logging.info('\n\n###### BEGINNING OF TABS PROGRAM ######\n\n')
logging.info('date time: {0}'.format(date))
logging.info('number of workers: {0}'.format(numWorkers))
# initialize tabber
lock = mp.Lock()
tabber = Tabber(lock, params)
# fill queue
with open(params['geolookup']+'allcounties.txt') as f:
for line in f:
county = line.replace('\n','')
if county[0:2] != '72':
tabber.tabQueue.put((county,src))
# create and kick off workers
processes = []
for w in range(numWorkers):
p = mp.Process(target=tabber.tabWorker)
p.start()
processes.append(p)
time.sleep(1)
for p in processes:
p.join()
logging.info('Tab process: {0} finished and joined'.format(p))
tabber.logTime()
# stitch together county-level tabs
counter = 1
if src in ['cef','hdf','cmrcl']:
outFile = params['rsltbase']+src+'/'+src+'_block_counts.csv'
else:
outFile = params['rhdfbasersltdir']+'/'+src+'/'+src+'_block_counts.csv'
try:
os.remove(outFile)
except:
pass
with open(params['geolookup']+'allcounties.txt') as f:
for line in f:
c = line.replace('\n','')
if c[0:2] != '72':
if src in ['cef','hdf','cmrcl']:
inFile = params['rsltbase']+src+'/'+src+'_block_counts_'+c+'.csv'
else:
inFile = params['rhdfbasersltdir']+'/'+src+'/'+src+'_block_counts_'+c+'.csv'
with open(outFile,'a+') as outF:
with open(inFile,'r') as inF:
if counter > 1:
firstLine = inF.readline()
for inFLine in inF:
outF.write(inFLine)
if counter%100 == 0:
logging.info('Filling national file, counties done {0}'.format(counter))
counter+=1
if src in ['cef','hdf','cmrcl']:
cFilesToDelete = glob.glob(params['rsltbase']+src+'/'+src+'_block_counts_*.csv')
else:
cFilesToDelete = glob.glob(params['rhdfbasersltdir']+'/'+src+'/'+src+'_block_counts_*.csv')
for file in cFilesToDelete:
os.remove(file)
tabber.logTime()
logging.info('\n\n###### END OF TABS PROGRAM ######\n\n')
if __name__ == '__main__':
import sys
main(sys.argv[1],sys.argv[2])