diff --git a/README.md b/README.md index 7a2b079..8bab333 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ Current scoville commands: * `proxy`: Serves a treemap visualisation of tiles on a local HTTP server. * `percentiles`: Calculate the percentile tile sizes for a set of MVT tiles. * `heatmap`: Serves a heatmap visualisation of tile sizes on a local HTTP server. +* `outliers`: Calculates the tiles with the largest per-layer sizes. ### Info command ### @@ -126,6 +127,40 @@ This will run a server on [localhost:8000](http://localhost:8000) by default (us ![Screenshot of the heatmap server](doc/heatmap_screenshot.png) +### Outliers command ### + +This calculates the largest tiles on a per-layer basis. For example, when run on a list of 1,000 frequently accessed tiles: + +``` +scoville outliers -j 4 --cache top-1000-tiles.txt 'https://tile.nextzen.org/tilezen/vector/v1/512/all/{z}/{x}/{y}.mvt?api_key=YOUR_API_KEY' +``` + +It gives something like the following: + +``` +Layer 'boundaries' + 67474 https://tile.nextzen.org/tilezen/vector/v1/512/all/5/17/11.mvt?api_key=YOUR_API_KEY + 68731 https://tile.nextzen.org/tilezen/vector/v1/512/all/0/0/0.mvt?api_key=YOUR_API_KEY + 92467 https://tile.nextzen.org/tilezen/vector/v1/512/all/4/8/5.mvt?api_key=YOUR_API_KEY +Layer 'buildings' + 359667 https://tile.nextzen.org/tilezen/vector/v1/512/all/13/3035/4647.mvt?api_key=YOUR_API_KEY + 372929 https://tile.nextzen.org/tilezen/vector/v1/512/all/13/3034/4647.mvt?api_key=YOUR_API_KEY + 408946 https://tile.nextzen.org/tilezen/vector/v1/512/all/13/3033/4647.mvt?api_key=YOUR_API_KEY +Layer 'earth' + 94603 https://tile.nextzen.org/tilezen/vector/v1/512/all/7/62/44.mvt?api_key=YOUR_API_KEY + 98898 https://tile.nextzen.org/tilezen/vector/v1/512/all/7/68/40.mvt?api_key=YOUR_API_KEY + 110378 https://tile.nextzen.org/tilezen/vector/v1/512/all/0/0/0.mvt?api_key=YOUR_API_KEY +Layer 'landuse' + 191312 https://tile.nextzen.org/tilezen/vector/v1/512/all/9/263/170.mvt?api_key=YOUR_API_KEY + 196733 https://tile.nextzen.org/tilezen/vector/v1/512/all/9/262/170.mvt?api_key=YOUR_API_KEY + 271852 https://tile.nextzen.org/tilezen/vector/v1/512/all/9/263/169.mvt?api_key=YOUR_API_KEY +... +``` + +For each layer, it calculates the tiles which use the most bytes for that layer. The top tile URLs are listed, grouped by layer, with each line showing the size of the layer and the URL. Further investigation can be done by pasting the tile URL into the `info` command. + +By default, it outputs the top 3 tiles, but this can be changed with the `-n` command line option. Runs can be parallelised by using the `-j` option, and cached using the `--cache` option (useful if this is not a one-off, and you might run several commands against the same tile set). + ## Install on Ubuntu: ``` diff --git a/requirements.txt b/requirements.txt index be383ef..17a96fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,7 @@ click enum34 +requests +requests_futures +squarify +msgpack +Pillow diff --git a/scoville/command.py b/scoville/command.py index 5b0f658..6c52529 100644 --- a/scoville/command.py +++ b/scoville/command.py @@ -95,8 +95,21 @@ def info(mvt_file, kind, d3_json): treemap visualisation. """ - with open(mvt_file, 'r') as fh: - tile = Tile(fh.read()) + if mvt_file.startswith('http://') or \ + mvt_file.startswith('https://'): + import requests + + res = requests.get(mvt_file) + if res.status_code == 200: + tile = Tile(res.content) + else: + click.echo("Failed to fetch tile, status was %r" % + (res.status_code,)) + return + + else: + with open(mvt_file, 'r') as fh: + tile = Tile(fh.read()) sizes = {} for layer in tile: @@ -274,6 +287,33 @@ def colour_map(size): serve_http(url, port, heatmap) +@cli.command() +@click.argument('tiles_file', required=1) +@click.argument('url', required=1) +@click.option('--cache/--no-cache', default=False, help='Use a cache for ' + 'tiles. Can speed up multiple runs considerably.') +@click.option('--nprocs', '-j', default=1, type=int, help='Number of ' + 'processes to use to download and do tile size aggregation.') +@click.option('--num-outliers-per-layer', '-n', type=int, default=3, + help='Number of outliers for each layer to report on.') +def outliers(tiles_file, url, cache, nprocs, num_outliers_per_layer): + """ + From the distribution of tile coordinates given in TILES_FILE and fetched + from the URL pattern, pull out some of the outlier tiles which have the + largest sizes in each layer. + """ + + from scoville.percentiles import calculate_outliers + + tiles = read_urls(tiles_file, url) + result = calculate_outliers(tiles, num_outliers_per_layer, cache, nprocs) + + for name in sorted(result.keys()): + click.secho("Layer %r" % name, fg='green', bold=True) + for size, url in sorted(result[name]): + click.echo("%8d %s" % (size, url)) + + def scoville_main(): cli() diff --git a/scoville/percentiles.py b/scoville/percentiles.py index 3bdf216..621a95b 100644 --- a/scoville/percentiles.py +++ b/scoville/percentiles.py @@ -55,6 +55,16 @@ def _fetch_cache(url): return data +def fetch(url, cache=False): + """ + Fetch a tile from url, using cache if cache=True. + """ + + if cache: + return _fetch_cache(url) + return _fetch_http(url) + + class Aggregator(object): """ Core of the algorithm. Fetches tiles and aggregates their total and @@ -76,33 +86,72 @@ def add(self, tile_url): for layer in tile: self.results[layer.name].append(layer.size) + # encode a message to be sent over the "wire" from a worker to the parent + # process. we use msgpack encoding rather than pickle, as pickle was + # producing some very large messages. + def encode(self): + from msgpack import packb + return packb(self.results) -# special object to tell worker threads to exit -class Sentinel(object): - pass + def merge_decode(self, data): + from msgpack import unpackb + results = unpackb(data) + for k, v in results.iteritems(): + self.results[k].extend(v) + + +class LargestN(object): + """ + Keeps a list of the largest N tiles for each layer. + """ + def __init__(self, num, cache=False): + self.num = num + self.fetch_fn = _fetch_http + if cache: + self.fetch_fn = _fetch_cache + + self.results = defaultdict(list) + + def _insert(self, name, size, url): + largest = self.results.get(name, []) + largest.append((size, url)) + if len(largest) > self.num: + largest.sort(reverse=True) + del largest[self.num:] + self.results[name] = largest -# encode a message to be sent over the "wire" from a worker to the parent -# process. we use msgpack encoding rather than pickle, as pickle was producing -# some very large messages. -def mp_encode(data): - from msgpack import packb - return packb(data) + def add(self, tile_url): + data = self.fetch_fn(tile_url) + tile = Tile(data) + for layer in tile: + self._insert(layer.name, layer.size, tile_url) + def encode(self): + from msgpack import packb + return packb(self.results) -def mp_decode(data): - from msgpack import unpackb - return unpackb(data) + def merge_decode(self, data): + from msgpack import unpackb + results = unpackb(data) + for name, values in results.iteritems(): + for size, url in values: + self._insert(name, size, url) -def worker(input_queue, output_queue, cache): +# special object to tell worker threads to exit +class Sentinel(object): + pass + + +def worker(input_queue, output_queue, factory_fn): """ Worker for multi-processing. Reads tasks from a queue and feeds them into the Aggregator. When all tasks are done it reads a Sentinel and sends the aggregated result back on the output queue. """ - agg = Aggregator(cache) + agg = factory_fn() while True: obj = input_queue.get() @@ -113,10 +162,10 @@ def worker(input_queue, output_queue, cache): agg.add(obj) input_queue.task_done() - output_queue.put(mp_encode(agg.results)) + output_queue.put(agg.encode()) -def parallel(tile_urls, cache, nprocs): +def parallel(tile_urls, factory_fn, nprocs): """ Fetch percentile data in parallel, using nprocs processes. @@ -132,7 +181,7 @@ def parallel(tile_urls, cache, nprocs): workers = [] for i in xrange(0, nprocs): - w = Process(target=worker, args=(input_queue, output_queue, cache)) + w = Process(target=worker, args=(input_queue, output_queue, factory_fn)) w.start() workers.append(w) @@ -148,21 +197,19 @@ def parallel(tile_urls, cache, nprocs): # after we've queued the Sentinels, each worker should output an aggregated # result on the output queue. - result = defaultdict(list) + agg = factory_fn() for i in xrange(0, nprocs): - worker_result = mp_decode(output_queue.get()) - for k, v in worker_result.iteritems(): - result[k].extend(v) + agg.merge_decode(output_queue.get()) # and the worker should have exited, so we can clean up the processes. for w in workers: w.join() - return result + return agg.results -def sequential(tile_urls, cache): - agg = Aggregator(cache) +def sequential(tile_urls, factory_fn): + agg = factory_fn() for tile_url in tile_urls: agg.add(tile_url) return agg.results @@ -183,19 +230,51 @@ def calculate_percentiles(tile_urls, percentiles, cache, nprocs): larger number to make concurrent nework requests for tiles. """ + # check that the input values are in the range we need + for p in percentiles: + assert 0 <= p <= 100 + + def factory_fn(): + return Aggregator(cache) + if nprocs > 1: - results = parallel(tile_urls, cache, nprocs) + results = parallel(tile_urls, factory_fn, nprocs) else: - results = sequential(tile_urls, cache) + results = sequential(tile_urls, factory_fn) pct = {} for label, values in results.iteritems(): values.sort() pcts = [] for p in percentiles: - i = int(len(values) * p / 100.0) + i = min(len(values) - 1, int(len(values) * p / 100.0)) pcts.append(values[i]) pct[label] = pcts return pct + + +def calculate_outliers(tile_urls, num_outliers, cache, nprocs): + """ + Fetch tiles and calculate the outlier tiles per layer. + + The number of outliers is per layer - the largest N. + + Cache, if true, uses a local disk cache for the tiles. This can be very + useful if re-running percentile calculations. + + Nprocs is the number of processes to use for both fetching and aggregation. + Even on a system with a single CPU, it can be worth setting this to a + larger number to make concurrent nework requests for tiles. + """ + + def factory_fn(): + return LargestN(num_outliers, cache) + + if nprocs > 1: + results = parallel(tile_urls, factory_fn, nprocs) + else: + results = sequential(tile_urls, factory_fn) + + return results diff --git a/setup.py b/setup.py index 2a7b854..d8ec03f 100644 --- a/setup.py +++ b/setup.py @@ -30,10 +30,11 @@ zip_safe=False, install_requires=[ 'click', - 'PIL', 'requests', 'requests_futures', 'squarify', + 'msgpack', + 'Pillow', ], entry_points=dict( console_scripts=[