diff --git a/config.yaml.sample b/config.yaml.sample index 10d90294..d554f3ae 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -326,5 +326,39 @@ rawr: planet_osm_polygon: *osm planet_osm_ways: *osm planet_osm_rels: *osm + wof_neighbourhood: { name: wof, value: whosonfirst.mapzen.com } + water_polygons: &osmdata { name: shp, value: openstreetmapdata.com } + land_polygons: *osmdata + # when a feature's shape is of the type given in the key and the feature + # appears in the listed layers, then generate a label centroid. multi* + # geometries are considered the same as single ones for the purposes of key + # lookup. + label-placement-layers: + point: ['earth', 'water'] + polygon: ['buildings', 'earth', 'landuse', 'water'] + linestring: ['earth', 'landuse', 'water'] + # indexes to generate on the RAWR tile. features are looked up only if they + # appear in an index, so this list must be exhaustive. all OSM-based features + # are handled by a single index of {type: osm}. other indexes are provided on + # single table/layer combinations with the {type: simple}, providing the + # names of both the table and layer. it's also possible to provide start_zoom + # and end_zoom parameters which control when the data is visible. note that + # the start_zoom is inclusive (and optional; defaults to 0) and the end_zoom + # is exclusive (also optional; defaults to infinity). + indexes: + - type: osm + - type: simple + table: wof_neighbourhood + layer: places + - type: simple + table: water_polygons + layer: water + - type: simple + table: land_polygons + layer: earth + - type: simple + table: ne_10m_urban_areas + layer: landuse + end_zoom: 10 # uncomment this to use RAWR tiles rather than go direct to the database. #use-rawr-tiles: true diff --git a/tests/test_query_rawr.py b/tests/test_query_rawr.py index f3dace70..223187f8 100644 --- a/tests/test_query_rawr.py +++ b/tests/test_query_rawr.py @@ -44,8 +44,9 @@ def _make(self, min_zoom_fn, props_fn, tables, tile_pyramid, layers = {layer_name: LayerInfo(min_zoom_fn, props_fn)} storage = ConstantStorage(tables) + indexes_cfg = [dict(type="osm")] return make_rawr_data_fetcher( - min_z, max_z, storage, layers, + min_z, max_z, storage, layers, indexes_cfg, label_placement_layers=label_placement_layers) @@ -482,7 +483,7 @@ def props_fn(shape, props, fid, meta): layers[name] = LayerInfo(min_zoom_fn, props_fn) storage = ConstantStorage(tables) fetch = make_rawr_data_fetcher( - top_zoom, max_zoom, storage, layers) + top_zoom, max_zoom, storage, layers, [dict(type="osm")]) for fetcher, _ in fetch.fetch_tiles(_wrap(top_tile)): read_rows = fetcher(tile.zoom, coord_to_mercator_bounds(tile)) diff --git a/tilequeue/command.py b/tilequeue/command.py index 4ba153f7..e16deadd 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1725,7 +1725,8 @@ def tilequeue_rawr_process(cfg, peripherals): from raw_tiles.formatter.msgpack import Msgpack from raw_tiles.gen import RawrGenerator from raw_tiles.source.conn import ConnectionContextManager - from raw_tiles.source.osm import OsmSource + from raw_tiles.source import parse_sources + from raw_tiles.source import DEFAULT_SOURCES as DEFAULT_RAWR_SOURCES from tilequeue.log import JsonRawrProcessingLogger from tilequeue.rawr import RawrS3Sink from tilequeue.rawr import RawrStoreSink @@ -1737,6 +1738,12 @@ def tilequeue_rawr_process(cfg, peripherals): # pass through the postgresql yaml config directly conn_ctx = ConnectionContextManager(rawr_postgresql_yaml) + rawr_source_list = rawr_yaml.get('sources', DEFAULT_RAWR_SOURCES) + assert isinstance(rawr_source_list, list), \ + 'RAWR source list should be a list' + assert len(rawr_source_list) > 0, \ + 'RAWR source list should be non-empty' + rawr_store = rawr_yaml.get('store') if rawr_store: store = make_store(rawr_store, @@ -1779,15 +1786,15 @@ def tilequeue_rawr_process(cfg, peripherals): % (toi_type,) logger = make_logger(cfg, 'rawr_process') - rawr_osm_source = OsmSource(conn_ctx) + rawr_source = parse_sources(rawr_source_list) rawr_formatter = Msgpack() - rawr_gen = RawrGenerator(rawr_osm_source, rawr_formatter, rawr_sink) + rawr_gen = RawrGenerator(rawr_source, rawr_formatter, rawr_sink) stats_handler = RawrTilePipelineStatsHandler(peripherals.stats) rawr_proc_logger = JsonRawrProcessingLogger(logger) rawr_pipeline = RawrTileGenerationPipeline( rawr_queue, msg_marshaller, group_by_zoom, rawr_gen, peripherals.queue_writer, rawr_toi_intersector, stats_handler, - rawr_proc_logger) + rawr_proc_logger, conn_ctx) rawr_pipeline() diff --git a/tilequeue/query/__init__.py b/tilequeue/query/__init__.py index cdf8429f..0305a2f8 100644 --- a/tilequeue/query/__init__.py +++ b/tilequeue/query/__init__.py @@ -72,6 +72,16 @@ def _make_rawr_fetcher(cfg, layer_data, query_cfg, io_pool): source_value = data['value'] table_sources[tbl] = Source(source_name, source_value) + label_placement_layers = rawr_yaml.get('label-placement-layers', {}) + for geom_type, layers in label_placement_layers.items(): + assert geom_type in ('point', 'polygon', 'linestring'), \ + 'Geom type %r not understood, expecting point, polygon or ' \ + 'linestring.' % (geom_type,) + label_placement_layers[geom_type] = set(layers) + + indexes_cfg = rawr_yaml.get('indexes') + assert indexes_cfg, 'Missing definitions of table indexes.' + # source types are: # s3 - to fetch RAWR tiles from S3 # store - to fetch RAWR tiles from any tilequeue tile source @@ -124,20 +134,15 @@ def _make_rawr_fetcher(cfg, layer_data, query_cfg, io_pool): assert False, 'Source type %r not understood. ' \ 'Options are s3, generate and store.' % (source_type,) - # TODO: this needs to be configurable, everywhere! + # TODO: this needs to be configurable, everywhere! this is a long term + # refactor - it's hard-coded in a bunch of places :-( max_z = 16 - # TODO: put this in the config! - label_placement_layers = { - 'point': set(['earth', 'water']), - 'polygon': set(['buildings', 'earth', 'landuse', 'water']), - 'linestring': set(['earth', 'landuse', 'water']), - } - layers = _make_layer_info(layer_data, cfg.process_yaml_cfg) return make_rawr_data_fetcher( - group_by_zoom, max_z, storage, layers, label_placement_layers) + group_by_zoom, max_z, storage, layers, indexes_cfg, + label_placement_layers) def _make_layer_info(layer_data, process_yaml_cfg): diff --git a/tilequeue/query/rawr.py b/tilequeue/query/rawr.py index e822f9eb..cdc61fde 100644 --- a/tilequeue/query/rawr.py +++ b/tilequeue/query/rawr.py @@ -306,7 +306,7 @@ def bounds(self): _Metadata = namedtuple('_Metadata', 'source ways relations') -def _make_meta(source, fid, shape_type, osm): +def _associated_ways_and_relations(fid, shape_type, osm): ways = [] rels = [] @@ -328,6 +328,16 @@ def _make_meta(source, fid, shape_type, osm): if rel: rels.append(rel) + return ways, rels + + +def _make_meta(source, fid, shape_type, osm): + if osm: + ways, rels = _associated_ways_and_relations(fid, shape_type, osm) + else: + ways = [] + rels = [] + # have to transform the Relation object into a dict, which is # what the functions called on this data expect. # TODO: reusing the Relation object would be better. @@ -342,6 +352,105 @@ def _make_meta(source, fid, shape_type, osm): return _Metadata(source.name, ways, rel_dicts) +def insert_into_index(tile_pyramid, feature, tile_index, + start_zoom=0, end_zoom=None): + assert isinstance(feature, _Feature) + + layer_min_zooms = feature.layer_min_zooms + # quick exit if the feature didn't have a min zoom in any layer. + if not layer_min_zooms: + return + + # lowest zoom that this feature appears in any layer. note that this + # is clamped to the max zoom, so that all features that appear at some + # zoom level appear at the max zoom. this is different from the min + # zoom in layer_min_zooms, which is a property that will be injected + # for each layer and is used by the _client_ to determine feature + # visibility. + min_zoom = min(tile_pyramid.max_z, min(layer_min_zooms.values())) + + # take the minimum integer zoom - this is the min zoom tile that the + # feature should appear in, and a feature with min_zoom = 1.9 should + # appear in a tile at z=1, not 2, since the tile at z=N is used for + # the zoom range N to N+1. + # + # we cut this off at this index's min zoom, as we aren't interested + # in any tiles outside of that, and the layer's start_zoom, since the + # feature shouldn't appear outside that range. + floor_zoom = max(tile_pyramid.z, int(floor(min_zoom)), start_zoom) + + # seed initial set of tiles at maximum zoom. all features appear at + # least at the max zoom, even if the min_zoom function returns a + # value larger than the max zoom. + zoom = tile_pyramid.max_z + + # make sure that features aren't visible at or beyond the end_zoom + # for the layer, if one was provided. + if end_zoom is not None: + # end_zoom is exclusive, so we have to back up one level. + zoom = min(zoom, end_zoom - 1) + + # if the zoom ranges don't intersect, then this feature does not appear + # in any zoom. + if zoom < floor_zoom: + return + + tiles = shape_tile_coverage(feature.shape, zoom, tile_pyramid.tile()) + + while zoom >= floor_zoom: + parent_tiles = set() + for tile in tiles: + tile_index[tile].append(feature) + parent_tiles.add(tile.parent()) + + zoom -= 1 + tiles = parent_tiles + + +def make_layer_min_zooms(layers, source, fid, shape, props, shape_type): + layer_min_zooms = {} + meta = _make_meta(source, fid, shape_type, None) + for layer_name, info in layers.items(): + if info.shape_types and shape_type not in info.shape_types: + continue + min_zoom = info.min_zoom_fn(shape, props, fid, meta) + if min_zoom is not None: + layer_min_zooms[layer_name] = min_zoom + return layer_min_zooms + + +# TODO: factor out common features of _SimpleLayersIndex & _LayersIndex +class _SimpleLayersIndex(object): + """ + Index features by the tile(s) that they appear in. + + This is the non-relations version, for stand-alone features such as those + from openstreetmapdata.com shapefiles or WOF. + """ + + def __init__(self, layers, tile_pyramid, source, start_zoom, end_zoom): + self.layers = layers + self.tile_pyramid = tile_pyramid + self.tile_index = defaultdict(list) + self.source = source + self.start_zoom = start_zoom + self.end_zoom = end_zoom + + def add_row(self, fid, shape_wkb, props): + shape = _LazyShape(shape_wkb) + shape_type = wkb_shape_type(shape_wkb) + + layer_min_zooms = make_layer_min_zooms( + self.layers, self.source, fid, shape, props, shape_type) + + feature = _Feature(fid, shape, props, layer_min_zooms) + insert_into_index(self.tile_pyramid, feature, self.tile_index, + self.start_zoom, self.end_zoom) + + def __call__(self, tile): + return self.tile_index.get(tile, []) + + class _LayersIndex(object): """ Index features by the tile(s) that they appear in. @@ -372,6 +481,7 @@ def add_row(self, fid, shape_wkb, props): def index(self, osm, source): for feature in self.delayed_features: self._index_feature(feature, osm, source) + self.source = source del self.delayed_features def _index_feature(self, feature, osm, source): @@ -394,49 +504,81 @@ def _index_feature(self, feature, osm, source): if min_zoom is not None: layer_min_zooms[layer_name] = min_zoom - # quick exit if the feature didn't have a min zoom in any layer. - if not layer_min_zooms: - return - - # lowest zoom that this feature appears in any layer. note that this - # is clamped to the max zoom, so that all features that appear at some - # zoom level appear at the max zoom. this is different from the min - # zoom in layer_min_zooms, which is a property that will be injected - # for each layer and is used by the _client_ to determine feature - # visibility. - min_zoom = min(self.tile_pyramid.max_z, min(layer_min_zooms.values())) - - # take the minimum integer zoom - this is the min zoom tile that the - # feature should appear in, and a feature with min_zoom = 1.9 should - # appear in a tile at z=1, not 2, since the tile at z=N is used for - # the zoom range N to N+1. - # - # we cut this off at this index's min zoom, as we aren't interested - # in any tiles outside of that. - floor_zoom = max(self.tile_pyramid.z, int(floor(min_zoom))) - - # seed initial set of tiles at maximum zoom. all features appear at - # least at the max zoom, even if the min_zoom function returns a - # value larger than the max zoom. - zoom = self.tile_pyramid.max_z - tiles = shape_tile_coverage(shape, zoom, self.tile_pyramid.tile()) - - while zoom >= floor_zoom: - parent_tiles = set() - for tile in tiles: - self.tile_index[tile].append(feature) - parent_tiles.add(tile.parent()) - - zoom -= 1 - tiles = parent_tiles + insert_into_index(self.tile_pyramid, feature, self.tile_index) def __call__(self, tile): return self.tile_index.get(tile, []) +def osm_index(layers, tables, tile_pyramid): + from raw_tiles.index.index import index_table + + table_indexes = defaultdict(list) + + index = _LayersIndex(layers, tile_pyramid) + for shape_type in ('point', 'line', 'polygon'): + table_name = 'planet_osm_' + shape_type + table_indexes[table_name].append(index) + + # source for all these layers has to be the same + source = None + + osm = OsmRawrLookup() + # NOTE: order here is different from that in raw_tiles index() + # function. this is because here we want to gather up some + # "interesting" feature IDs before we look at the ways/rels tables. + for typ in ('point', 'line', 'polygon', 'ways', 'rels'): + table_name = 'planet_osm_' + typ + table = tables(table_name) + extra_indexes = table_indexes[table_name] + index_table(table.rows, osm, *extra_indexes) + + if source is None: + source = table.source + else: + assert source == table.source, 'Mismatched sources' + + assert source + + # there's a chicken and egg problem with the indexes: we want to know + # which features to index, but also calculate the feature's min zoom, + # which might depend on ways and relations not seen yet. one solution + # would be to do this in two passes, but that might mean paying a cost + # to decompress or deserialize the data twice. instead, the index + # buffers the features and indexes them in the following step. this + # might mean we buffer more information in memory than we technically + # need if many of the features are not visible, but means we get one + # single set of _Feature objects. + index.index(osm, source) + + return index, osm + + +def simple_index(layers, tables, tile_pyramid, index_cfg): + from raw_tiles.index.index import index_table + + table_name = index_cfg.get('table') + assert table_name, 'Simple index must have a table name.' + + layer_name = index_cfg.get('layer') + assert layer_name, 'Simple index must have a layer name.' + + start_zoom = index_cfg.get('start_zoom', 0) + end_zoom = index_cfg.get('end_zoom') + + table = tables(table_name) + # only using a single layer + simple_layers = {layer_name: layers[layer_name]} + index = _SimpleLayersIndex( + simple_layers, tile_pyramid, table.source, start_zoom, end_zoom) + index_table(table.rows, index) + return index + + class RawrTile(object): - def __init__(self, layers, tables, tile_pyramid, label_placement_layers): + def __init__(self, layers, tables, tile_pyramid, label_placement_layers, + indexes_cfg): """ Expect layers to be a dict of layer name to LayerInfo (see fixture.py). Tables should be a callable which returns a Table object (namedtuple @@ -444,51 +586,29 @@ def __init__(self, layers, tables, tile_pyramid, label_placement_layers): that table's name. """ - from raw_tiles.index.index import index_table - self.layers = layers self.tile_pyramid = tile_pyramid self.label_placement_layers = label_placement_layers - self.layer_indexes = {} - - table_indexes = defaultdict(list) - - self.layers_index = _LayersIndex(self.layers, self.tile_pyramid) - for shape_type in ('point', 'line', 'polygon'): - table_name = 'planet_osm_' + shape_type - table_indexes[table_name].append(self.layers_index) - - # source for all these layers has to be the same - source = None - - self.osm = OsmRawrLookup() - # NOTE: order here is different from that in raw_tiles index() - # function. this is because here we want to gather up some - # "interesting" feature IDs before we look at the ways/rels tables. - for typ in ('point', 'line', 'polygon', 'ways', 'rels'): - table_name = 'planet_osm_' + typ - table = tables(table_name) - extra_indexes = table_indexes[table_name] - index_table(table.rows, self.osm, *extra_indexes) - - if source is None: - source = table.source + self.osm = None + + indexes = [] + for index_cfg in indexes_cfg: + typ = index_cfg.get('type') + assert typ, 'Index configuration must provide a type.' + + if typ == 'osm': + index, osm = osm_index(layers, tables, tile_pyramid) + assert self.osm is None, 'Cannot have more than one OSM index.' + self.osm = osm + indexes.append(index) + + elif typ == 'simple': + indexes.append(simple_index( + layers, tables, tile_pyramid, index_cfg)) else: - assert source == table.source, 'Mismatched sources' - - assert source - self.layers_index_source = source - - # there's a chicken and egg problem with the indexes: we want to know - # which features to index, but also calculate the feature's min zoom, - # which might depend on ways and relations not seen yet. one solution - # would be to do this in two passes, but that might mean paying a cost - # to decompress or deserialize the data twice. instead, the index - # buffers the features and indexes them in the following step. this - # might mean we buffer more information in memory than we technically - # need if many of the features are not visible, but means we get one - # single set of _Feature objects. - self.layers_index.index(self.osm, self.layers_index_source) + raise ValueError('Unknown index type %r' % (typ,)) + + self.indexes = indexes def _named_layer(self, layer_min_zooms): # we want only one layer from ('pois', 'landuse', 'buildings') for @@ -503,18 +623,21 @@ def _named_layer(self, layer_min_zooms): return None def _lookup(self, zoom, unpadded_bounds): - features = [] + source_features = defaultdict(list) seen_ids = set() + def _add_feature(source, feature): + feature_id = id(feature) + if feature_id not in seen_ids: + seen_ids.add(feature_id) + source_features[source].append(feature) + for tile in _tiles(zoom, unpadded_bounds): - tile_features = self.layers_index(tile) - for feature in tile_features: - feature_id = id(feature) - if feature_id not in seen_ids: - seen_ids.add(feature_id) - features.append(feature) + for index in self.indexes: + for feature in index(tile): + _add_feature(index.source, feature) - return {self.layers_index_source: features}.iteritems() + return source_features.iteritems() def __call__(self, zoom, unpadded_bounds): read_rows = [] @@ -609,11 +732,13 @@ def _parse_row(self, zoom, unpadded_bounds, bbox, source, fid, shape, class DataFetcher(object): - def __init__(self, min_z, max_z, storage, layers, label_placement_layers): + def __init__(self, min_z, max_z, storage, layers, indexes_cfg, + label_placement_layers): self.min_z = min_z self.max_z = max_z self.storage = storage self.layers = layers + self.indexes_cfg = indexes_cfg self.label_placement_layers = label_placement_layers def fetch_tiles(self, all_data): @@ -635,7 +760,7 @@ def fetch_tiles(self, all_data): tables = self.storage(tile_pyramid.tile()) fetcher = RawrTile(self.layers, tables, tile_pyramid, - self.label_placement_layers) + self.label_placement_layers, self.indexes_cfg) for coord, data in coord_group: yield fetcher, data @@ -649,11 +774,16 @@ def fetch_tiles(self, all_data): # returning a "tables" callable. The "tables" callable returns a # list of rows given the table name as its only argument. # - layers: A dict of layer name to LayerInfo (see fixture.py). +# - indexes_cfg: A list of index configurations. Each entry should be a dict +# with a "type" key in (osm, simple). OSM has no further +# configuration, but simple requires a further "table" and "layer" +# entry to give the table name and layer name. # - label_placement_layers: # A dict of geometry type ('point', 'linestring', 'polygon') to # set (or other in-supporting collection) of layer names. # Geometries of that type in that layer will have a label # placement generated for them. -def make_rawr_data_fetcher(min_z, max_z, storage, layers, +def make_rawr_data_fetcher(min_z, max_z, storage, layers, indexes_cfg, label_placement_layers={}): - return DataFetcher(min_z, max_z, storage, layers, label_placement_layers) + return DataFetcher(min_z, max_z, storage, layers, indexes_cfg, + label_placement_layers) diff --git a/tilequeue/rawr.py b/tilequeue/rawr.py index 7bef4f43..f4b0fcde 100644 --- a/tilequeue/rawr.py +++ b/tilequeue/rawr.py @@ -6,6 +6,7 @@ from ModestMaps.Core import Coordinate from msgpack import Unpacker from raw_tiles.tile import Tile +from raw_tiles.source.table_reader import TableReader from tilequeue.command import explode_and_intersect from tilequeue.format import zip_format from tilequeue.queue.message import MessageHandle @@ -277,7 +278,7 @@ class RawrTileGenerationPipeline(object): def __init__( self, rawr_queue, msg_marshaller, group_by_zoom, rawr_gen, queue_writer, rawr_toi_intersector, stats_handler, - rawr_proc_logger): + rawr_proc_logger, conn_ctx): self.rawr_queue = rawr_queue self.msg_marshaller = msg_marshaller self.group_by_zoom = group_by_zoom @@ -286,6 +287,7 @@ def __init__( self.rawr_toi_intersector = rawr_toi_intersector self.stats_handler = stats_handler self.rawr_proc_logger = rawr_proc_logger + self.conn_ctx = conn_ctx def _atexit_log(self): self.rawr_proc_logger.lifecycle('Processing stopped') @@ -331,7 +333,17 @@ def __call__(self): try: rawr_gen_timing = {} with time_block(rawr_gen_timing, 'total'): - rawr_gen_specific_timing = self.rawr_gen(rawr_tile_coord) + # grab connection + with self.conn_ctx() as conn: + # commit transaction + with conn as conn: + # cleanup cursor resources + with conn.cursor() as cur: + table_reader = TableReader(cur) + + rawr_gen_specific_timing = self.rawr_gen( + table_reader, rawr_tile_coord) + rawr_gen_timing.update(rawr_gen_specific_timing) timing['rawr_gen'] = rawr_gen_timing