Skip to content

Commit

Permalink
Merge pull request #209 from azavea/lf/rv1-parallel
Browse files Browse the repository at this point in the history
Add parallel prediction for old version of object detection
  • Loading branch information
lewfish authored Mar 29, 2018
2 parents d73efdc + c5d9471 commit f8b0ff8
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 46 deletions.
1 change: 1 addition & 0 deletions src/config-samples/detection/train/mobilenet.config
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ model {

train_config: {
batch_size: 8
num_steps: 40000 # Set to 0 for an indefinite number of steps
optimizer {
rms_prop_optimizer: {
learning_rate: {
Expand Down
58 changes: 27 additions & 31 deletions src/rv/detection/commands/merge_predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,42 @@
MyTemporaryDirectory)


def get_annotations_paths(projects_path, temp_dir):
annotations_paths = []
with open(projects_path, 'r') as projects_file:
projects = json.load(projects_file)
for project_ind, project in enumerate(projects):
annotations_uri = project['annotations']
annotations_path = download_if_needed(
annotations_uri, temp_dir)
annotations_paths.append(annotations_path)
return annotations_paths


def merge_annotations(annotations_list):
all_annotations = copy.deepcopy(annotations_list[0])
for annotations in annotations_list[1:]:
all_annotations['features'].extend(annotations['features'])
return all_annotations
def _merge_predictions(predictions_list):
merged_predictions = copy.deepcopy(predictions_list[0])
for predictions in predictions_list[1:]:
merged_predictions['features'].extend(predictions['features'])
return merged_predictions


@click.command()
@click.argument('projects_uri')
@click.argument('output_uri')
@click.argument('output_dir_uri')
@click.option('--save-temp', is_flag=True)
def merge_predictions(projects_uri, output_uri, save_temp):
def merge_predictions(projects_uri, output_dir_uri, save_temp):
prefix = temp_root_dir
temp_dir = os.path.join(prefix, 'merge-predictions') if save_temp else None
with MyTemporaryDirectory(temp_dir, prefix) as temp_dir:
projects_path = download_if_needed(projects_uri, temp_dir)
output_path = get_local_path(output_uri, temp_dir)

annotation_paths = get_annotations_paths(projects_path, temp_dir)
annotations_list = []
for annotation_path in annotation_paths:
with open(annotation_path, 'r') as annotation_file:
annotations_list.append(json.load(annotation_file))

annotations = merge_annotations(annotations_list)
with open(output_path, 'w') as output_file:
json.dump(annotations, output_file, indent=4)
upload_if_needed(output_path, output_uri)

# For each project:
# download the predictions files, merge them, and upload the merged
# predictions.
projects = json.load(open(projects_path))
for project in projects:
predictions_list = []
for image_ind, image in enumerate(project['images']):
predictions_uri = os.path.join(
output_dir_uri, project['id'],
'{}.json'.format(image_ind))
predictions_path = download_if_needed(
predictions_uri, temp_dir)
predictions_list.append(json.load(open(predictions_path)))

output_uri = project['annotations']
output_path = get_local_path(output_uri, temp_dir)
predictions = _merge_predictions(predictions_list)
json.dump(predictions, open(output_path, 'w'))
upload_if_needed(output_path, output_uri)


if __name__ == '__main__':
Expand Down
35 changes: 23 additions & 12 deletions src/rv/detection/commands/predict_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,46 @@
@click.argument('inference_graph_uri')
@click.argument('label_map_uri')
@click.argument('projects_uri')
@click.argument('output_dir_uri')
@click.option('--mask-uri', default=None,
help='URI for mask GeoJSON file to use as filter for detections')
@click.option('--channel-order', nargs=3, type=int,
default=default_channel_order, help='Index of RGB channels')
@click.option('--chip-size', default=300)
@click.option('--score-thresh', default=0.5,
help='Score threshold of predictions to keep')
@click.option('--merge-thresh', default=0.05,
@click.option('--merge-thresh', default=0.5,
help='IOU threshold for merging predictions')
@click.option('--save-temp', is_flag=True)
def predict_array(inference_graph_uri, label_map_uri, projects_uri,
mask_uri, channel_order, chip_size, score_thresh,
merge_thresh, save_temp):
job_index = int(os.environ['AWS_BATCH_JOB_ARRAY_INDEX'])
output_dir_uri, mask_uri, channel_order, chip_size,
score_thresh, merge_thresh, save_temp):
job_ind = int(os.environ['AWS_BATCH_JOB_ARRAY_INDEX'])

prefix = temp_root_dir
temp_dir = os.path.join(prefix, 'predict-array') if save_temp else None
with MyTemporaryDirectory(temp_dir, prefix) as temp_dir:
projects_path = download_if_needed(projects_uri, temp_dir)
with open(projects_path, 'r') as projects_file:
projects = json.load(projects_file)
if job_index >= len(projects):
raise ValueError(
'There are {} projects and job_index is {}!'.format(
len(projects), job_index))
project = projects[job_index]
image_uris = project['images']
output_uri = project['annotations']

def get_image_coords():
image_ind = 0
for project_ind, project in enumerate(projects):
for project_image_ind, image_uri in enumerate(project['images']):
if job_ind == image_ind:
return project_ind, project_image_ind
image_ind += 1

# Run predict for single image and generate ouput_uri based on
# the index of the image within the project.
project_ind, project_image_ind = get_image_coords()
project = projects[project_ind]
image_uri = project['images'][project_image_ind]
image_uris = [image_uri]
output_uri = os.path.join(
output_dir_uri, project['id'],
'{}.json'.format(project_image_ind))
output_debug_uri = None

_predict(inference_graph_uri, label_map_uri, image_uris,
Expand All @@ -50,6 +62,5 @@ def predict_array(inference_graph_uri, label_map_uri, projects_uri,
save_temp)



if __name__ == '__main__':
predict_array()
39 changes: 36 additions & 3 deletions src/rv/detection/commands/train.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from os.path import join, dirname, splitext
from os.path import join, basename
import os
from subprocess import Popen
import zipfile
from threading import Timer
from urllib.parse import urlparse
import glob
import re

import click

Expand All @@ -14,6 +16,30 @@
from rv.detection.commands.settings import temp_root_dir


def get_last_checkpoint_path(train_root_dir):
index_paths = glob.glob(join(train_root_dir, 'train', '*.index'))
checkpoint_ids = []
for index_path in index_paths:
match = re.match(r'model.ckpt-(\d+).index', basename(index_path))
checkpoint_ids.append(int(match.group(1)))
checkpoint_id = max(checkpoint_ids)
checkpoint_path = join(
train_root_dir, 'train', 'model.ckpt-{}'.format(checkpoint_id))
return checkpoint_path


def export_inference_graph(train_root_dir, config_path, inference_graph_path):
checkpoint_path = get_last_checkpoint_path(train_root_dir)
print('Exporting checkpoint {}...'.format(checkpoint_path))
train_process = Popen([
'python', '/opt/src/tf/object_detection/export_inference_graph.py',
'--input_type', 'image_tensor',
'--pipeline_config_path', config_path,
'--checkpoint_path', checkpoint_path,
'--inference_graph_path', inference_graph_path])
train_process.wait()


@click.command()
@click.argument('config_uri')
@click.argument('train_dataset_uri')
Expand Down Expand Up @@ -41,6 +67,7 @@ def train(config_uri, train_dataset_uri, val_dataset_uri, model_checkpoint_uri,
make_dir(train_root_dir)
train_dir = join(train_root_dir, 'train')
eval_dir = join(train_root_dir, 'eval')
inference_graph_path = join(train_root_dir, 'inference_graph.pb')

def process_zip_file(uri, temp_dir, link_dir):
if uri.endswith('.zip'):
Expand Down Expand Up @@ -84,9 +111,15 @@ def sync_train_dir(delete=True):
'tensorboard', '--logdir={}'.format(train_root_dir)],
preexec_fn=on_parent_exit('SIGTERM'))

# After training finishes due to num_steps exceeded,
# kill monitor processes, export inference graph, and upload.
train_process.wait()
eval_process.wait()
tensorboard_process.wait()
eval_process.kill()
tensorboard_process.kill()
export_inference_graph(
train_root_dir, config_path, inference_graph_path)
if urlparse(train_uri).scheme == 's3':
sync_dir(train_root_dir, train_uri, delete=True)


if __name__ == '__main__':
Expand Down
65 changes: 65 additions & 0 deletions src/workflows/detection/parallel_predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import json
import os
import click

from rv.utils.files import (
download_if_needed, MyTemporaryDirectory)
from rv.utils.batch import _batch_submit
from rv.detection.commands.settings import temp_root_dir


def make_predict_array_cmd(inference_graph_uri, label_map_uri, projects_uri,
output_dir_uri):
return 'python -m rv.detection.run predict_array {} {} {} {}'.format(
inference_graph_uri, label_map_uri, projects_uri, output_dir_uri)


def make_merge_predictions_cmd(projects_uri, output_dir_uri):
return 'python -m rv.detection.run merge_predictions {} {}'.format(
projects_uri, output_dir_uri)


def get_nb_images(projects):
nb_images = 0
for project in projects:
nb_images += len(project['images'])
return nb_images


@click.command()
@click.argument('projects_uri')
@click.argument('label_map_uri')
@click.argument('inference_graph_uri')
@click.argument('output_dir_uri')
@click.option('--branch-name', default='develop')
@click.option('--attempts', default=1)
@click.option('--cpu', is_flag=True)
def parallel_predict(projects_uri, label_map_uri, inference_graph_uri,
output_dir_uri,
branch_name, attempts, cpu):
prefix = temp_root_dir
temp_dir = os.path.join(prefix, 'parallel-predict')
with MyTemporaryDirectory(temp_dir, prefix) as temp_dir:
# Load projects and count number of images
projects_path = download_if_needed(projects_uri, temp_dir)
projects = json.load(open(projects_path))
nb_images = get_nb_images(projects)

# Submit an array job with nb_images elements.
command = make_predict_array_cmd(
inference_graph_uri, label_map_uri, projects_uri, output_dir_uri)
'''
predict_job_id = _batch_submit(
branch_name, command, attempts=attempts, cpu=cpu,
array_size=nb_images)
'''
# Submit a dependent merge_predictions job.
command = make_merge_predictions_cmd(
projects_uri, output_dir_uri)
_batch_submit(
branch_name, command, attempts=attempts, cpu=cpu)
#parent_job_ids=[predict_job_id])


if __name__ == '__main__':
parallel_predict()

0 comments on commit f8b0ff8

Please sign in to comment.