-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathget-external-data.py
executable file
·405 lines (339 loc) · 18.4 KB
/
get-external-data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#!/usr/bin/env python3
'''This script is designed to load quasi-static data into a PostGIS database
for rendering maps. It differs from the usual scripts to do this in that it is
designed to take its configuration from a file rather than be a series of shell
commands.
Some implicit assumptions are
- Time spent querying (rendering) the data is more valuable than the one-time
cost of loading it
- The script will not be running multiple times in parallel. This is not
normally likely because the script is likely to be called daily or less,
not minutely.
- Usage patterns will be similar to typical map rendering
'''
import yaml
from urllib.parse import urlparse
import os
import re
import argparse
import shutil
# modules for getting data
import zipfile
import requests
import io
# modules for converting and postgres loading
import subprocess
import psycopg2
import logging
def database_setup(conn, temp_schema, schema, metadata_table):
with conn.cursor() as cur:
cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
.format(temp_schema=temp_schema))
cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
''' (name text primary key, last_modified text);''')
.format(schema=schema, metadata_table=metadata_table))
conn.commit()
class Table:
def __init__(self, name, conn, temp_schema, schema, metadata_table):
self._name = name
self._conn = conn
self._temp_schema = temp_schema
self._dst_schema = schema
self._metadata_table = metadata_table
# Clean up the temporary schema in preparation for loading
def clean_temp(self):
with self._conn.cursor() as cur:
cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
.format(name=self._name, temp_schema=self._temp_schema))
self._conn.commit()
# get the last modified date from the metadata table
def last_modified(self):
with self._conn.cursor() as cur:
cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
results = cur.fetchone()
if results is not None:
return results[0]
self._conn.commit()
def grant_access(self, user):
with self._conn.cursor() as cur:
cur.execute('''GRANT SELECT ON "{temp_schema}"."{name}" TO "{user}";'''
.format(name=self._name, temp_schema=self._temp_schema, user=user))
self._conn.commit()
def index(self):
with self._conn.cursor() as cur:
# Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
.format(name=self._name, temp_schema=self._temp_schema))
# ogr creates a ogc_fid column we don't need
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
.format(name=self._name, temp_schema=self._temp_schema))
# Null geometries are useless for rendering
cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
.format(name=self._name, temp_schema=self._temp_schema))
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
.format(name=self._name, temp_schema=self._temp_schema))
# sorting static tables helps performance and reduces size from the column drop above
cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
'''(ST_Envelope(way));'''
'''CLUSTER "{temp_schema}"."{name}" '''
'''USING "{name}_order";'''
'''DROP INDEX "{temp_schema}"."{name}_order";'''
'''CREATE INDEX ON "{temp_schema}"."{name}" '''
'''USING GIST (way) WITH (fillfactor=100);''')
.format(name=self._name, temp_schema=self._temp_schema))
# Reset autovacuum. The table is static, so this doesn't really
# matter since it'll never need a vacuum.
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
.format(name=self._name, temp_schema=self._temp_schema))
self._conn.commit()
# VACUUM can't be run in transaction, so autocommit needs to be turned on
old_autocommit = self._conn.autocommit
try:
self._conn.autocommit = True
with self._conn.cursor() as cur:
cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
.format(name=self._name, temp_schema=self._temp_schema))
finally:
self._conn.autocommit = old_autocommit
def replace(self, new_last_modified):
with self._conn.cursor() as cur:
cur.execute('''BEGIN;''')
cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
'''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
.format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
# We checked if the metadata table had this table way up above
cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[self._name])
if cur.rowcount == 0:
cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
'''(name, last_modified) VALUES (%s, %s)''')
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[self._name, new_last_modified])
else:
cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[new_last_modified, self._name])
self._conn.commit()
class Downloader:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.session.close()
def _download(self, url, headers=None):
if url.startswith('file://'):
filename = url[7:]
if headers and 'If-Modified-Since' in headers:
if str(os.path.getmtime(filename)) == headers['If-Modified-Since']:
return DownloadResult(status_code = requests.codes.not_modified)
with open(filename, 'rb') as fp:
return DownloadResult(status_code = 200, content = fp.read(),
last_modified = str(os.fstat(fp.fileno()).st_mtime))
response = self.session.get(url, headers=headers)
response.raise_for_status()
return DownloadResult(status_code = response.status_code, content = response.content,
last_modified = response.headers.get('Last-Modified', None))
def download(self, url, name, opts, data_dir, table_last_modified):
filename = os.path.join(data_dir, os.path.basename(urlparse(url).path))
filename_lastmod = filename + '.lastmod'
if os.path.exists(filename) and os.path.exists(filename_lastmod):
with open(filename_lastmod, 'r') as fp:
lastmod_cache = fp.read()
with open(filename, 'rb') as fp:
cached_data = DownloadResult(status_code = 200, content = fp.read(),
last_modified = lastmod_cache)
else:
cached_data = None
lastmod_cache = None
result = None
# Variable used to tell if we downloaded something
download_happened = False
if opts.no_update and (cached_data or table_last_modified):
# It is ok if this returns None, because for this to be None,
# we need to have something in table and therefore need not import (since we are opts.no-update)
result = cached_data
else:
if opts.force:
headers = {}
else:
# If none of those 2 exist, value will be None and it will have the same effect as not having If-Modified-Since set
headers = {'If-Modified-Since': table_last_modified or lastmod_cache}
response = self._download(url, headers)
# Check status codes
if response.status_code == requests.codes.ok:
logging.info(" Download complete ({} bytes)".format(len(response.content)))
download_happened = True
if opts.cache:
# Write to cache
with open(filename, 'wb') as fp:
fp.write(response.content)
with open(filename_lastmod, 'w') as fp:
fp.write(response.last_modified)
result = response
elif response.status_code == requests.codes.not_modified:
# Now we need to figure out if our not modified data came from table or cache
if os.path.exists(filename) and os.path.exists(filename_lastmod):
logging.info(" Cached file {} did not require updating".format(url))
result = cached_data
else:
result = None
else:
logging.critical(" Unexpected response code ({}".format(response.status_code))
logging.critical(" Content {} was not downloaded".format(name))
return None
if opts.delete_cache or (not opts.cache and download_happened):
try:
os.remove(filename)
os.remove(filename_lastmod)
except FileNotFoundError:
pass
return result
class DownloadResult:
def __init__(self, status_code, content=None, last_modified=None):
self.status_code = status_code
self.content = content
self.last_modified = last_modified
def main():
# parse options
parser = argparse.ArgumentParser(
description="Load external data into a database")
parser.add_argument("-f", "--force", action="store_true",
help="Download and import new data, even if not required.")
parser.add_argument("-C", "--cache", action="store_true",
help="Cache downloaded data. Useful if you'll have your database volume deleted in the future")
parser.add_argument("--no-update", action="store_true",
help="Don't download newer data than what is locally available (either in cache or table). Overridden by --force")
parser.add_argument("--delete-cache", action="store_true",
help="Execute as usual, but delete cached data")
parser.add_argument("--force-import", action="store_true",
help="Import data into table even if may not be needed")
parser.add_argument("-c", "--config", action="store", default="external-data.yml",
help="Name of configuration file (default external-data.yml)")
parser.add_argument("-D", "--data", action="store",
help="Override data download directory")
parser.add_argument("-d", "--database", action="store",
help="Override database name to connect to")
parser.add_argument("-H", "--host", action="store",
help="Override database server host or socket directory")
parser.add_argument("-p", "--port", action="store",
help="Override database server port")
parser.add_argument("-U", "--username", action="store",
help="Override database user name")
parser.add_argument("-v", "--verbose", action="store_true",
help="Be more verbose. Overrides -q")
parser.add_argument("-q", "--quiet", action="store_true",
help="Only report serious problems")
parser.add_argument("-w", "--password", action="store",
help="Override database password")
parser.add_argument("-R", "--renderuser", action="store",
help="User to grant access for rendering")
opts = parser.parse_args()
if opts.verbose:
logging.basicConfig(level=logging.DEBUG)
elif opts.quiet:
logging.basicConfig(level=logging.WARNING)
else:
logging.basicConfig(level=logging.INFO)
if opts.force and opts.no_update:
opts.no_update = False
logging.warning("Force (-f) flag overrides --no-update flag")
logging.info("Starting load of external data into database")
with open(opts.config) as config_file:
config = yaml.safe_load(config_file)
data_dir = opts.data or config["settings"]["data_dir"]
os.makedirs(data_dir, exist_ok=True)
# If the DB options are unspecified in both on the command line and in the
# config file, libpq will pick what to use with the None
database = opts.database or config["settings"].get("database")
host = opts.host or config["settings"].get("host")
port = opts.port or config["settings"].get("port")
user = opts.username or config["settings"].get("username")
password = opts.password or config["settings"].get("password")
renderuser = opts.renderuser or config["settings"].get("renderuser")
with Downloader() as d:
conn = None
conn = psycopg2.connect(database=database,
host=host, port=port,
user=user,
password=password)
# DB setup
database_setup(conn, config["settings"]["temp_schema"],
config["settings"]["schema"],
config["settings"]["metadata_table"])
for name, source in config["sources"].items():
logging.info("Checking table {}".format(name))
# Don't attempt to handle strange names
# Even if there was code to escape them properly here, you don't want
# in a style with all the quoting headaches
if not re.match('''^[a-zA-Z0-9_]+$''', name):
raise RuntimeError(
"Only ASCII alphanumeric table are names supported")
this_table = Table(name, conn,
config["settings"]["temp_schema"],
config["settings"]["schema"],
config["settings"]["metadata_table"])
this_table.clean_temp()
# This will fetch data needed for import
download = d.download(source["url"], name, opts, data_dir, this_table.last_modified())
# Check if there is need to import
if download == None or (not opts.force and not opts.force_import and this_table.last_modified() == download.last_modified):
logging.info(" Table {} did not require updating".format(name))
continue
workingdir = os.path.join(data_dir, name)
shutil.rmtree(workingdir, ignore_errors=True)
os.makedirs(workingdir, exist_ok=True)
if "archive" in source and source["archive"]["format"] == "zip":
logging.info(" Decompressing file")
zip = zipfile.ZipFile(io.BytesIO(download.content))
for member in source["archive"]["files"]:
zip.extract(member, workingdir)
ogrpg = "PG:dbname={}".format(database)
if port is not None:
ogrpg = ogrpg + " port={}".format(port)
if user is not None:
ogrpg = ogrpg + " user={}".format(user)
if host is not None:
ogrpg = ogrpg + " host={}".format(host)
if password is not None:
ogrpg = ogrpg + " password={}".format(password)
ogrcommand = ["ogr2ogr",
'-f', 'PostgreSQL',
'-lco', 'GEOMETRY_NAME=way',
'-lco', 'SPATIAL_INDEX=FALSE',
'-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
'-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
if "ogropts" in source:
ogrcommand += source["ogropts"]
ogrcommand += [ogrpg,
os.path.join(workingdir, source["file"])]
logging.info(" Importing into database")
logging.debug("running {}".format(
subprocess.list2cmdline(ogrcommand)))
# ogr2ogr can raise errors here, so they need to be caught
try:
subprocess.check_output(
ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError as e:
# Add more detail on stdout for the logs
logging.critical(
"ogr2ogr returned {} with layer {}".format(e.returncode, name))
logging.critical("Command line was {}".format(
subprocess.list2cmdline(e.cmd)))
logging.critical("Output was\n{}".format(e.output))
logging.critical("Error was\n{}".format(e.stderr))
raise RuntimeError(
"ogr2ogr error when loading table {}".format(name))
logging.info(" Import complete")
this_table.index()
if renderuser is not None:
this_table.grant_access(renderuser)
this_table.replace(download.last_modified)
shutil.rmtree(workingdir, ignore_errors=True)
if conn:
conn.close()
if __name__ == '__main__':
main()