From 6575b8bb70fc815077988867ca1e946c1f21ced3 Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Thu, 6 Jun 2024 12:38:11 +0100 Subject: [PATCH] refactor: revert default line length 132 --> 88 char --- osm_rawdata/config.py | 19 +++++++-- osm_rawdata/geofabrik.py | 12 ++++-- osm_rawdata/importer.py | 20 ++++++++-- osm_rawdata/overture.py | 8 +++- osm_rawdata/pgasync.py | 61 ++++++++++++++++++++++------- osm_rawdata/postgres.py | 84 ++++++++++++++++++++++++++++++++-------- tests/conftest.py | 5 ++- tests/test_config.py | 15 +++++-- 8 files changed, 177 insertions(+), 47 deletions(-) diff --git a/osm_rawdata/config.py b/osm_rawdata/config.py index c1daa80..e861ae4 100755 --- a/osm_rawdata/config.py +++ b/osm_rawdata/config.py @@ -182,7 +182,12 @@ def _yaml_parse_select_and_keep(self, data): for table in self.config.get("tables", []): # 'select' not tags specified, use 'where' tags instead if data.get("select") is None: - tags = [key for entry in self.config["where"][table] for key in entry.keys() if key != "op"] + tags = [ + key + for entry in self.config["where"][table] + for key in entry.keys() + if key != "op" + ] self.config["select"][table] = [{tag: {}} for tag in tags] # 'select' tags specified, process @@ -256,7 +261,9 @@ def convert_geometry(geom_type): # FIXME needs a refactor to handle all_geometry correctly if geom_type == "all_geometry": for geometry_type in ["nodes", "ways_line", "ways_poly"]: - self.config["select"][geometry_type].append({attribute_name: {}}) + self.config["select"][geometry_type].append( + {attribute_name: {}} + ) self.config["tables"].append(geometry_type) else: self.config["select"][geom_type].append({attribute_name: {}}) @@ -311,7 +318,9 @@ def dump(self): keys.append(v1) if k1 == "op": continue - print(f"\tWhere table '{key}', tag '{k1}' has values '{v1}' {op}") + print( + f"\tWhere table '{key}', tag '{k1}' has values '{v1}' {op}" + ) else: print(f"\tSelecting tag '{key}'") # print("Tables") @@ -344,7 +353,9 @@ def main(): log.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(threadName)10s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter( + "%(threadName)10s - %(name)s - %(levelname)s - %(message)s" + ) ch.setFormatter(formatter) log.addHandler(ch) diff --git a/osm_rawdata/geofabrik.py b/osm_rawdata/geofabrik.py index d3b8713..836e6bc 100755 --- a/osm_rawdata/geofabrik.py +++ b/osm_rawdata/geofabrik.py @@ -72,7 +72,9 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", action="store_true", help="verbose output") parser.add_argument("-f", "--file", help="The country or US state to download") - parser.add_argument("-l", "--list", action="store_true", help="List all files on GeoFabrik") + parser.add_argument( + "-l", "--list", action="store_true", help="List all files on GeoFabrik" + ) args = parser.parse_args() if len(argv) <= 1: @@ -84,7 +86,9 @@ def main(): log.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(threadName)10s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter( + "%(threadName)10s - %(name)s - %(levelname)s - %(message)s" + ) ch.setFormatter(formatter) log.addHandler(ch) @@ -97,7 +101,9 @@ def main(): if args.file: region = geof.getRegion(args.file) if not region: - log.error(f"{args.file} not found on GeoFabrik! Use the -l option to list all the regions") + log.error( + f"{args.file} not found on GeoFabrik! Use the -l option to list all the regions" + ) quit() uri = f"http://download.geofabrik.de/{region.lower()}/{args.file.lower()}-latest.osm.pbf" diff --git a/osm_rawdata/importer.py b/osm_rawdata/importer.py index e399c8f..a46d482 100755 --- a/osm_rawdata/importer.py +++ b/osm_rawdata/importer.py @@ -187,7 +187,9 @@ def parquetThread( tags=scalar, ) elif hex.geom_type == "MultiPolygon": - gdata = geoalchemy2.shape.from_shape(hex.convex_hull, srid=4326, extended=True) + gdata = geoalchemy2.shape.from_shape( + hex.convex_hull, srid=4326, extended=True + ) sql = insert(ways).values( geom=bytes(gdata.data), tags=scalar, @@ -340,7 +342,11 @@ def importParquet( if len(overture.data[block : block + chunk]) == 0: continue log.debug("Dispatching Block %d:%d" % (block, block + chunk)) - result = executor.submit(parquetThread, overture.data[block : block + chunk], connections[index]) + result = executor.submit( + parquetThread, + overture.data[block : block + chunk], + connections[index], + ) block += chunk index += 1 executor.shutdown() @@ -387,7 +393,11 @@ def importGeoJson( block = 0 while block <= entries: log.debug("Dispatching Block %d:%d" % (block, block + chunk)) - result = executor.submit(importThread, data["features"][block : block + chunk], self.connections[index]) + result = executor.submit( + importThread, + data["features"][block : block + chunk], + self.connections[index], + ) block += chunk index += 1 executor.shutdown() @@ -420,7 +430,9 @@ def main(): log.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(threadName)10s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter( + "%(threadName)10s - %(name)s - %(levelname)s - %(message)s" + ) ch.setFormatter(formatter) log.addHandler(ch) diff --git a/osm_rawdata/overture.py b/osm_rawdata/overture.py index 38148e7..04981be 100755 --- a/osm_rawdata/overture.py +++ b/osm_rawdata/overture.py @@ -139,7 +139,9 @@ def main(): ) parser.add_argument("-v", "--verbose", action="store_true", help="verbose output") parser.add_argument("-i", "--infile", required=True, help="Input file") - parser.add_argument("-o", "--outfile", default="overture.geojson", help="Output file") + parser.add_argument( + "-o", "--outfile", default="overture.geojson", help="Output file" + ) args = parser.parse_args() @@ -148,7 +150,9 @@ def main(): log.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(threadName)10s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter( + "%(threadName)10s - %(name)s - %(levelname)s - %(message)s" + ) ch.setFormatter(formatter) log.addHandler(ch) diff --git a/osm_rawdata/pgasync.py b/osm_rawdata/pgasync.py index eaf2f75..e4928b5 100755 --- a/osm_rawdata/pgasync.py +++ b/osm_rawdata/pgasync.py @@ -64,13 +64,17 @@ async def connect( if not uri.username: self.dburi["dbuser"] = os.getenv("PGUSER", default=None) if not self.dburi["dbuser"]: - log.error("You must specify the user name in the database URI, or set PGUSER") + log.error( + "You must specify the user name in the database URI, or set PGUSER" + ) else: self.dburi["dbuser"] = uri.username if not uri.password: self.dburi["dbpass"] = os.getenv("PGPASSWORD", default=None) if not self.dburi["dbpass"]: - log.error("You must specify the user password in the database URI, or set PGPASSWORD") + log.error( + "You must specify the user password in the database URI, or set PGPASSWORD" + ) else: self.dburi["dbpass"] = uri.password if not uri.hostname: @@ -88,8 +92,13 @@ async def connect( # Use a persistant connect, better for multiple requests self.session = requests.Session() - self.url = os.getenv("RAW_DATA_API_URL", "https://raw-data-api0.hotosm.org/v1") - self.headers = {"accept": "application/json", "Content-Type": "application/json"} + self.url = os.getenv( + "RAW_DATA_API_URL", "https://raw-data-api0.hotosm.org/v1" + ) + self.headers = { + "accept": "application/json", + "Content-Type": "application/json", + } else: # log.debug(f"Connecting with: {connect}") try: @@ -128,11 +137,20 @@ async def createJson( # This only effects the output file geometrytype = list() # for table in config.config['tables']: - if len(config.config["select"]["nodes"]) > 0 or len(config.config["where"]["nodes"]) > 0: + if ( + len(config.config["select"]["nodes"]) > 0 + or len(config.config["where"]["nodes"]) > 0 + ): geometrytype.append("point") - if len(config.config["select"]["ways_line"]) > 0 or len(config.config["where"]["ways_line"]) > 0: + if ( + len(config.config["select"]["ways_line"]) > 0 + or len(config.config["where"]["ways_line"]) > 0 + ): geometrytype.append("line") - if len(config.config["select"]["ways_poly"]) > 0 or len(config.config["where"]["ways_poly"]) > 0: + if ( + len(config.config["select"]["ways_poly"]) > 0 + or len(config.config["where"]["ways_poly"]) > 0 + ): geometrytype.append("polygon") feature["geometryType"] = geometrytype @@ -412,7 +430,10 @@ async def queryLocal( # If there is no config file, don't modify the results if self.qc: - if len(self.qc.config["where"]["ways_poly"]) == 0 and len(self.qc.config["where"]["nodes"]) == 0: + if ( + len(self.qc.config["where"]["ways_poly"]) == 0 + and len(self.qc.config["where"]["nodes"]) == 0 + ): return result for item in result: @@ -606,11 +627,22 @@ async def main(): ) parser.add_argument("-v", "--verbose", nargs="?", const="0", help="verbose output") parser.add_argument("-u", "--uri", default="underpass", help="Database URI") - parser.add_argument("-b", "--boundary", required=True, help="Boundary polygon to limit the data size") - parser.add_argument("-s", "--sql", help="Custom SQL query to execute against the database") + parser.add_argument( + "-b", + "--boundary", + required=True, + help="Boundary polygon to limit the data size", + ) + parser.add_argument( + "-s", "--sql", help="Custom SQL query to execute against the database" + ) parser.add_argument("-a", "--all", help="All the geometry or just centroids") - parser.add_argument("-c", "--config", help="The config file for the query (json or yaml)") - parser.add_argument("-o", "--outfile", default="extract.geojson", help="The output file") + parser.add_argument( + "-c", "--config", help="The config file for the query (json or yaml)" + ) + parser.add_argument( + "-o", "--outfile", default="extract.geojson", help="The output file" + ) args = parser.parse_args() # if len(argv) <= 1 or (args.sql is None and args.config is None): @@ -624,7 +656,10 @@ async def main(): logging.basicConfig( level=log_level, - format=("%(asctime)s.%(msecs)03d [%(levelname)s] " "%(name)s | %(funcName)s:%(lineno)d | %(message)s"), + format=( + "%(asctime)s.%(msecs)03d [%(levelname)s] " + "%(name)s | %(funcName)s:%(lineno)d | %(message)s" + ), datefmt="%y-%m-%d %H:%M:%S", stream=sys.stdout, ) diff --git a/osm_rawdata/postgres.py b/osm_rawdata/postgres.py index f0ef1c5..0a12186 100755 --- a/osm_rawdata/postgres.py +++ b/osm_rawdata/postgres.py @@ -113,7 +113,13 @@ def uriParser(source): dbhost = "localhost" # print(f"{source}\n\tcolon={colon} rcolon={rcolon} atsign={atsign} slash={slash}") - return {"dbname": dbname, "dbhost": dbhost, "dbuser": dbuser, "dbpass": dbpass, "dbport": dbport} + return { + "dbname": dbname, + "dbhost": dbhost, + "dbuser": dbuser, + "dbpass": dbpass, + "dbport": dbport, + } class DatabaseAccess(object): @@ -132,14 +138,23 @@ def __init__( if self.uri["dbname"] == "underpass": # Use a persistant connect, better for multiple requests self.session = requests.Session() - self.url = os.getenv("RAW_DATA_API_URL", "https://api-prod.raw-data.hotosm.org/v1") - self.headers = {"accept": "application/json", "Content-Type": "application/json"} + self.url = os.getenv( + "RAW_DATA_API_URL", "https://api-prod.raw-data.hotosm.org/v1" + ) + self.headers = { + "accept": "application/json", + "Content-Type": "application/json", + } else: log.info(f"Opening database connection to: {self.uri['dbname']}") connect = "PG: dbname=" + self.uri["dbname"] if "dbname" in self.uri and self.uri["dbname"] is not None: connect = f"dbname={self.uri['dbname']}" - elif "dbhost" in self.uri and self.uri["dbhost"] == "localhost" and self.uri["dbhost"] is not None: + elif ( + "dbhost" in self.uri + and self.uri["dbhost"] == "localhost" + and self.uri["dbhost"] is not None + ): connect = f"host={self.uri['dbhost']} dbname={self.uri['dbname']}" if "dbuser" in self.uri and self.uri["dbuser"] is not None: connect += f" user={self.uri['dbuser']}" @@ -203,8 +218,14 @@ def _get_geometry_types(self, config: QueryConfig) -> Union[list, None]: Union[list, None]: A list of geometry types or None if empty. """ geometry_types = [] - for table, geometry_type in {"nodes": "point", "ways_line": "line", "ways_poly": "polygon"}.items(): - if config.config.get("select", {}).get(table) or config.config.get("where", {}).get(table): + for table, geometry_type in { + "nodes": "point", + "ways_line": "line", + "ways_poly": "polygon", + }.items(): + if config.config.get("select", {}).get(table) or config.config.get( + "where", {} + ).get(table): geometry_types.append(geometry_type) return geometry_types or None @@ -462,7 +483,10 @@ def queryLocal( return FeatureCollection(features) # If there is no config file, don't modify the results - if len(self.qc.config["where"]["ways_poly"]) == 0 and len(self.qc.config["where"]["nodes"]) == 0: + if ( + len(self.qc.config["where"]["ways_poly"]) == 0 + and len(self.qc.config["where"]["nodes"]) == 0 + ): return result for item in result: @@ -544,7 +568,9 @@ def queryRemote( log.debug(f"Raw Data API Query URL: {task_query_url}") polling_interval = 2 # Initial polling interval in seconds - max_polling_duration = 600 # Maximum duration for polling in seconds (10 minutes) + max_polling_duration = ( + 600 # Maximum duration for polling in seconds (10 minutes) + ) elapsed_time = 0 while elapsed_time < max_polling_duration: @@ -556,13 +582,21 @@ def queryRemote( log.debug(f"Current status: {response_status}") # response_status options: STARTED, PENDING, SUCCESS - if response_status != "SUCCESS" or not isinstance(task_info, dict) or not task_info.get("download_url"): + if ( + response_status != "SUCCESS" + or not isinstance(task_info, dict) + or not task_info.get("download_url") + ): # Adjust polling frequency after the first minute if elapsed_time > 60: - polling_interval = 10 # Poll every 10 seconds after the first minute + polling_interval = ( + 10 # Poll every 10 seconds after the first minute + ) # Wait before polling again - log.debug(f"Waiting {polling_interval} seconds before polling API again...") + log.debug( + f"Waiting {polling_interval} seconds before polling API again..." + ) time.sleep(polling_interval) elapsed_time += polling_interval @@ -716,7 +750,10 @@ def execQuery( if (geom_type := boundary.get("type")) == "FeatureCollection": # Convert each feature into a Shapely geometry - geometries = [shape(feature.get("geometry")) for feature in boundary.get("features", [])] + geometries = [ + shape(feature.get("geometry")) + for feature in boundary.get("features", []) + ] merged_geom = unary_union(geometries) elif geom_type == "Feature": merged_geom = shape(boundary.get("geometry")) @@ -769,11 +806,22 @@ def main(): ) parser.add_argument("-v", "--verbose", nargs="?", const="0", help="verbose output") parser.add_argument("-u", "--uri", default="underpass", help="Database URI") - parser.add_argument("-b", "--boundary", required=True, help="Boundary polygon to limit the data size") - parser.add_argument("-s", "--sql", help="Custom SQL query to execute against the database") + parser.add_argument( + "-b", + "--boundary", + required=True, + help="Boundary polygon to limit the data size", + ) + parser.add_argument( + "-s", "--sql", help="Custom SQL query to execute against the database" + ) parser.add_argument("-a", "--all", help="All the geometry or just centroids") - parser.add_argument("-c", "--config", help="The config file for the query (json or yaml)") - parser.add_argument("-o", "--outfile", default="extract.geojson", help="The output file") + parser.add_argument( + "-c", "--config", help="The config file for the query (json or yaml)" + ) + parser.add_argument( + "-o", "--outfile", default="extract.geojson", help="The output file" + ) args = parser.parse_args() if len(argv) <= 1 or (args.sql is None and args.config is None): @@ -785,7 +833,9 @@ def main(): log.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(threadName)10s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter( + "%(threadName)10s - %(name)s - %(levelname)s - %(message)s" + ) ch.setFormatter(formatter) log.addHandler(ch) diff --git a/tests/conftest.py b/tests/conftest.py index c6a5b30..0fc8ff0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,7 +25,10 @@ logging.basicConfig( level="DEBUG", - format=("%(asctime)s.%(msecs)03d [%(levelname)s] " "%(name)s | %(funcName)s:%(lineno)d | %(message)s"), + format=( + "%(asctime)s.%(msecs)03d [%(levelname)s] " + "%(name)s | %(funcName)s:%(lineno)d | %(message)s" + ), datefmt="%y-%m-%d %H:%M:%S", stream=sys.stdout, ) diff --git a/tests/test_config.py b/tests/test_config.py index 225b3a8..3a44db6 100755 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -50,7 +50,10 @@ def test_levels(): if "admin_level" in data["select"]["ways_poly"][2]: hits += 1 - if qc.config["fileName"] == "Example export with all features" and qc.config["outputType"] == "geojson": + if ( + qc.config["fileName"] == "Example export with all features" + and qc.config["outputType"] == "geojson" + ): hits += 1 assert hits == 4 @@ -80,7 +83,10 @@ def test_formats(): # this query contains only the geometry and the output file name and type qc = QueryConfig() qc.parseJson(f"{rootdir}/formats.json") - assert qc.config["outputType"] == "shp" and qc.config["fileName"] == "Pokhara_all_features" + assert ( + qc.config["outputType"] == "shp" + and qc.config["fileName"] == "Pokhara_all_features" + ) def test_bytesio(): @@ -88,7 +94,10 @@ def test_bytesio(): with open(f"{rootdir}/formats.json", "rb") as file: json_obj = BytesIO(file.read()) qc.parseJson(json_obj) - assert qc.config["outputType"] == "shp" and qc.config["fileName"] == "Pokhara_all_features" + assert ( + qc.config["outputType"] == "shp" + and qc.config["fileName"] == "Pokhara_all_features" + ) def test_yaml_no_joins():