Skip to content

Commit

Permalink
feat(batch): port download code to use the batch endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
missinglink committed Jan 10, 2023
1 parent 00fccf8 commit a269b5d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"esversion": 6,
"esversion": 8,
"node": true,
"curly": true,
"eqeqeq": true,
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"dependencies": {
"@hapi/joi": "^16.0.1",
"async": "^3.1.0",
"axios": "^1.2.2",
"bottleneck": "^2.19.5",
"combined-stream": "^1.0.7",
"csv-parse": "^5.0.3",
Expand All @@ -17,7 +18,7 @@
"lodash": "^4.16.0",
"minimist": "^1.2.0",
"pelias-blacklist-stream": "^1.0.0",
"pelias-config": "^4.12.0",
"pelias-config": "^5.2.0",
"pelias-dbclient": "^2.13.0",
"pelias-logger": "^1.2.1",
"pelias-model": "^9.2.0",
Expand Down
3 changes: 2 additions & 1 deletion schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ module.exports = Joi.object().keys({
dataHost: Joi.string(),
s3Options: Joi.string(),
adminLookup: Joi.boolean(),
missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no')
missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no'),
token: Joi.string().required(true),
}).unknown(false)
}).unknown(true)
}).unknown(true);
9 changes: 7 additions & 2 deletions test/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ tape( 'unknown config fields should throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
unknown: 'value'
}
}
Expand All @@ -153,11 +154,12 @@ tape( 'unknown config fields should throw error', function(test) {

});

tape( 'configuration with only datapath should not throw error', function(test) {
tape( 'configuration with only datapath & token should not throw error', function(test) {
const config = {
imports: {
openaddresses: {
datapath: 'this is the datapath'
datapath: 'this is the datapath',
token: 'abc'
}
}
};
Expand All @@ -172,6 +174,7 @@ tape( 'valid configuration should not throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
adminLookup: false,
files: ['file 1', 'file 2']
}
Expand All @@ -188,6 +191,7 @@ tape( 'unknown children of imports should not throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
adminLookup: false,
files: ['file 1', 'file 2']
},
Expand All @@ -205,6 +209,7 @@ tape( 'unknown children of root should not throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
adminLookup: false,
files: ['file 1', 'file 2']
}
Expand Down
43 changes: 43 additions & 0 deletions utils/OpenAddressesAPI.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const _ = require('lodash');
const axios = require('axios');
const config = require('pelias-config');
const HOST = 'https://batch.openaddresses.io';

class OpenAddressesAPI {
constructor() {
this.config = _.get(config.generate(), 'imports.openaddresses', {});
this.token = _.get(this.config, 'token');
}

// remove file extensions from 'source'
normalize(source) {
if (!_.isString(source)) { return source; }
return source.replace(/\.[^/.]+$/, '');
}

// return the http url for a specific job id
url(job) {
return `${HOST}/api/job/${job}/output/source.geojson.gz`;
}

// if the 'validated' mode is enabled (for financial supporters only)
isValidatedModeEnabled() {
return _.get(this.config, 'validated') === true;
}

async lookup(filename) {
// normalize 'source' property
// support the 'validated' property for financial supporters
const params = {
source: this.normalize(filename),
layer: 'addresses',
validated: this.isValidatedModeEnabled() ? 'true' : 'false'
};

// request extended info and return the first result
const versions = await axios.get(`${HOST}/api/data`, { params });
return _.isArray(versions.data) && !_.isEmpty(versions.data) ? _.head(versions.data) : {};
}
}

module.exports = OpenAddressesAPI;
129 changes: 84 additions & 45 deletions utils/download_filtered.js
Original file line number Diff line number Diff line change
@@ -1,120 +1,159 @@
const child_process = require('child_process');
const config = require( 'pelias-config' ).generate();
const config = require('pelias-config').generate();
const async = require('async');
const fs = require('fs-extra');
const path = require('path');
const temp = require('temp');
const logger = require('pelias-logger').get('openaddresses-download');
const Bottleneck = require('bottleneck/es5');

function downloadFiltered(config, callback) {
const targetDir = config.imports.openaddresses.datapath;
const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');

fs.ensureDir(targetDir, (err) => {
fs.ensureDir(targetDir, async (err) => {
if (err) {
logger.error(`error making directory ${targetDir}`, err);
return callback(err);
}

const files = getFiles(config, targetDir, callback);
logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`);
// validate sources
const files = config.get('imports.openaddresses.files', []);
const sources = await getSources(files);
const validSources = sources.filter(source => source.url);

// respect 'imports.openaddresses.missingFilesAreFatal' setting
if (errorsFatal && (sources.length !== validSources.length)) {
callback(sources.find(source => source.error)); // return first error
return;
}

logger.info(`Attempting to download selected data sources: ${sources.map(source => source.id)}`);

// limit requests to avoid being banned by openaddresses.io
// current policy is 10 request per minute
// https://github.com/pelias/openaddresses/issues/433#issuecomment-527383976
// @todo: contact OA team to check if this is still required with the batch. endpoint?
const options = {
maxConcurrent: 1,
minTime: 6000
};
const limiter = new Bottleneck(options);
const callbackOnLastOne = () => {
const done = () => {
if (limiter.empty()) {
callback();
}
};
files.map(file => {
limiter.submit(downloadSource, targetDir, file, callbackOnLastOne);
validSources.map(source => {
limiter.submit(downloadSource, targetDir, source, done);
});
process.on('SIGINT', () => {
limiter.stop({dropWaitingJobs: true});
limiter.stop({ dropWaitingJobs: true });
process.exit();
});
});

}

function getFiles(config, targetDir, main_callback){
const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');
const files = config.imports.openaddresses.files;
files.forEach(file => {
// sources MUST end with '.csv'
if( !file.endsWith('.csv') ){
const msg = `invalid source '${file}': MUST end with '.csv'`;
logger.warn(msg);

// respect 'imports.openaddresses.missingFilesAreFatal' setting
return main_callback(errorsFatal ? msg : null);
async function getSources(files) {
const OpenAddressesAPI = require('./OpenAddressesAPI');
const oa = new OpenAddressesAPI();

return await Promise.all(files.map(async file => {

// source definitions previously required a file extension.
// please remove file extensions from your ~/pelias.json file
// to silence these warning messages.
let id = file.replace(/\.[^\/.]+$/, '');
if (id !== file) {
logger.warn(`source definitions no longer require a file extension '${file}'`);
}
});
return files.map(file => {
const source = file.replace('.csv', '.zip');
const name = file.replace('.csv', '').replace(/\//g,'-');
return {
csv: file,
url: `https://results.openaddresses.io/latest/run/${source}`,
zip: temp.path({prefix: name, dir: targetDir, suffix: '.zip'})
};
});

// lookup the source using the OpenAddresses API
// to find the most current job id and ensure validity
const version = await oa.lookup(id);
const valid = (version && version.job);

// invalid source
if (!valid) {
return { id, error: `invalid source '${file}'` };
}

// valid source
return { id, url: oa.url(version.job) };
}));
}

function downloadSource(targetDir, file, main_callback) {
function downloadSource(targetDir, source, done) {

const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');
const token = config.get('imports.openaddresses.token');
const referer = config.get('imports.openaddresses.dataReferer') || 'https://pelias-results.openaddresses.io';
logger.info(`Downloading ${file.csv}`);
logger.info(`Downloading ${source.id}`);

const outFile = path.join(targetDir, `${source.id}.geojson`);
const tmpFile = temp.path({
prefix: source.id.replace(new RegExp(path.sep, 'g'), '-'),
dir: targetDir,
suffix: '.gz'
});

async.series(
[
// download the zip file into the temp directory
// download the compressed file into the temp directory
(callback) => {
logger.debug(`downloading ${file.url}`);
logger.debug(`downloading ${source.url}`);
const flags = [
'--request GET', // HTTP GET
'--silent', // be quiet
'--location', // follow redirects
'--fail', // exit with a non-zero code for >=400 responses
'--write-out "%{http_code}"', // print status code to STDOUT
`--referer ${referer}`, // set referer header
`--output ${file.zip}`, // set output filepath
`--output ${tmpFile}`, // set output filepath
'--retry 5', // retry this number of times before giving up
'--retry-connrefused', // consider ECONNREFUSED as a transient error
'--retry-delay 5' // sleep this many seconds between retry attempts
'--retry-delay 5', // sleep this many seconds between retry attempts
`-H 'Authorization: Bearer ${token}'` // authorization token
].join(' ');

// the `--fail*` flags cause an error to be returned as the first arg with `error.code`
// as the process exit status, the `-w "%{http_code}"` flag writes the HTTP status to STDOUT.
child_process.exec(`curl ${flags} ${file.url}`, (error, stdout) => {
child_process.exec(`curl ${flags} ${source.url}`, (error, stdout) => {
if (!error) { return callback(); }

// provide a more user-friendly error message
error.message = `cURL request failed, HTTP status: ${stdout}, exit code: ${error.code}`;
callback(error);
});
},
// unzip file into target directory
// decompress file into target directory
(callback) => {
logger.debug(`unzipping ${file.zip} to ${targetDir}`);
child_process.exec(`unzip -o -qq -d ${targetDir} ${file.zip}`, callback);
logger.debug(`decompress ${tmpFile} to ${outFile}`);
child_process.exec(`
mkdir -p ${path.dirname(outFile)};
gzip -d < ${tmpFile} > ${outFile};
`, (error, stdout) => {
if (!error) { return callback(); }

// provide a more user-friendly error message
error.message = `decompress failed, ${stdout}`;
callback(error);
});
},
// delete the temp downloaded zip file
fs.remove.bind(null, file.zip)
],
function(err) {
(err) => {
if (err) {
logger.warn(`failed to download ${file.url}: ${err}`);
logger.warn(`failed to download ${source.url}: ${err}`);
}

// ensure temp files are cleaned up
if (fs.existsSync(tmpFile)) { fs.unlinkSync(tmpFile); }

// honour 'imports.openaddresses.missingFilesAreFatal' setting
main_callback(errorsFatal ? err : null);
});
done(errorsFatal ? err : null);
}
);
}

module.exports = downloadFiltered;

0 comments on commit a269b5d

Please sign in to comment.