diff --git a/.jshintrc b/.jshintrc index 8a7f48c00..e6cf7154f 100644 --- a/.jshintrc +++ b/.jshintrc @@ -1,5 +1,5 @@ { - "esversion": 6, + "esversion": 8, "node": true, "curly": true, "eqeqeq": true, diff --git a/package.json b/package.json index 02cf872e6..b21abfe9c 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ "dependencies": { "@hapi/joi": "^16.0.1", "async": "^3.1.0", + "axios": "^1.2.2", "bottleneck": "^2.19.5", "combined-stream": "^1.0.7", "csv-parse": "^5.0.3", @@ -17,7 +18,7 @@ "lodash": "^4.16.0", "minimist": "^1.2.0", "pelias-blacklist-stream": "^1.0.0", - "pelias-config": "^4.12.0", + "pelias-config": "^5.2.0", "pelias-dbclient": "^2.13.0", "pelias-logger": "^1.2.1", "pelias-model": "^9.2.0", diff --git a/schema.js b/schema.js index 904f02543..08bdf5029 100644 --- a/schema.js +++ b/schema.js @@ -12,7 +12,8 @@ module.exports = Joi.object().keys({ dataHost: Joi.string(), s3Options: Joi.string(), adminLookup: Joi.boolean(), - missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no') + missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no'), + token: Joi.string().required(true), }).unknown(false) }).unknown(true) }).unknown(true); diff --git a/test/schema.js b/test/schema.js index 33dba6c34..894fe299e 100644 --- a/test/schema.js +++ b/test/schema.js @@ -142,6 +142,7 @@ tape( 'unknown config fields should throw error', function(test) { imports: { openaddresses: { datapath: 'this is the datapath', + token: 'abc', unknown: 'value' } } @@ -153,11 +154,12 @@ tape( 'unknown config fields should throw error', function(test) { }); -tape( 'configuration with only datapath should not throw error', function(test) { +tape( 'configuration with only datapath & token should not throw error', function(test) { const config = { imports: { openaddresses: { - datapath: 'this is the datapath' + datapath: 'this is the datapath', + token: 'abc' } } }; @@ -172,6 +174,7 @@ tape( 'valid configuration should not throw error', function(test) { imports: { openaddresses: { datapath: 'this is the datapath', + token: 'abc', adminLookup: false, files: ['file 1', 'file 2'] } @@ -188,6 +191,7 @@ tape( 'unknown children of imports should not throw error', function(test) { imports: { openaddresses: { datapath: 'this is the datapath', + token: 'abc', adminLookup: false, files: ['file 1', 'file 2'] }, @@ -205,6 +209,7 @@ tape( 'unknown children of root should not throw error', function(test) { imports: { openaddresses: { datapath: 'this is the datapath', + token: 'abc', adminLookup: false, files: ['file 1', 'file 2'] } diff --git a/utils/OpenAddressesAPI.js b/utils/OpenAddressesAPI.js new file mode 100644 index 000000000..c450446ac --- /dev/null +++ b/utils/OpenAddressesAPI.js @@ -0,0 +1,43 @@ +const _ = require('lodash'); +const axios = require('axios'); +const config = require('pelias-config'); +const HOST = 'https://batch.openaddresses.io'; + +class OpenAddressesAPI { + constructor() { + this.config = _.get(config.generate(), 'imports.openaddresses', {}); + this.token = _.get(this.config, 'token'); + } + + // remove file extensions from 'source' + normalize(source) { + if (!_.isString(source)) { return source; } + return source.replace(/\.[^/.]+$/, ''); + } + + // return the http url for a specific job id + url(job) { + return `${HOST}/api/job/${job}/output/source.geojson.gz`; + } + + // if the 'validated' mode is enabled (for financial supporters only) + isValidatedModeEnabled() { + return _.get(this.config, 'validated') === true; + } + + async lookup(filename) { + // normalize 'source' property + // support the 'validated' property for financial supporters + const params = { + source: this.normalize(filename), + layer: 'addresses', + validated: this.isValidatedModeEnabled() ? 'true' : 'false' + }; + + // request extended info and return the first result + const versions = await axios.get(`${HOST}/api/data`, { params }); + return _.isArray(versions.data) && !_.isEmpty(versions.data) ? _.head(versions.data) : {}; + } +} + +module.exports = OpenAddressesAPI; diff --git a/utils/download_filtered.js b/utils/download_filtered.js index 8bf881cac..062ecaeff 100644 --- a/utils/download_filtered.js +++ b/utils/download_filtered.js @@ -1,81 +1,108 @@ const child_process = require('child_process'); -const config = require( 'pelias-config' ).generate(); +const config = require('pelias-config').generate(); const async = require('async'); const fs = require('fs-extra'); +const path = require('path'); const temp = require('temp'); const logger = require('pelias-logger').get('openaddresses-download'); const Bottleneck = require('bottleneck/es5'); function downloadFiltered(config, callback) { const targetDir = config.imports.openaddresses.datapath; + const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal'); - fs.ensureDir(targetDir, (err) => { + fs.ensureDir(targetDir, async (err) => { if (err) { logger.error(`error making directory ${targetDir}`, err); return callback(err); } - const files = getFiles(config, targetDir, callback); - logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`); + // validate sources + const files = config.get('imports.openaddresses.files', []); + const sources = await getSources(files); + const validSources = sources.filter(source => source.url); + + // respect 'imports.openaddresses.missingFilesAreFatal' setting + if (errorsFatal && (sources.length !== validSources.length)) { + callback(sources.find(source => source.error)); // return first error + return; + } + + logger.info(`Attempting to download selected data sources: ${sources.map(source => source.id)}`); // limit requests to avoid being banned by openaddresses.io // current policy is 10 request per minute // https://github.com/pelias/openaddresses/issues/433#issuecomment-527383976 + // @todo: contact OA team to check if this is still required with the batch. endpoint? const options = { maxConcurrent: 1, minTime: 6000 }; const limiter = new Bottleneck(options); - const callbackOnLastOne = () => { + const done = () => { if (limiter.empty()) { callback(); } }; - files.map(file => { - limiter.submit(downloadSource, targetDir, file, callbackOnLastOne); + validSources.map(source => { + limiter.submit(downloadSource, targetDir, source, done); }); process.on('SIGINT', () => { - limiter.stop({dropWaitingJobs: true}); + limiter.stop({ dropWaitingJobs: true }); process.exit(); }); }); } -function getFiles(config, targetDir, main_callback){ - const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal'); - const files = config.imports.openaddresses.files; - files.forEach(file => { - // sources MUST end with '.csv' - if( !file.endsWith('.csv') ){ - const msg = `invalid source '${file}': MUST end with '.csv'`; - logger.warn(msg); - - // respect 'imports.openaddresses.missingFilesAreFatal' setting - return main_callback(errorsFatal ? msg : null); +async function getSources(files) { + const OpenAddressesAPI = require('./OpenAddressesAPI'); + const oa = new OpenAddressesAPI(); + + return await Promise.all(files.map(async file => { + + // source definitions previously required a file extension. + // please remove file extensions from your ~/pelias.json file + // to silence these warning messages. + let id = file.replace(/\.[^\/.]+$/, ''); + if (id !== file) { + logger.warn(`source definitions no longer require a file extension '${file}'`); } - }); - return files.map(file => { - const source = file.replace('.csv', '.zip'); - const name = file.replace('.csv', '').replace(/\//g,'-'); - return { - csv: file, - url: `https://results.openaddresses.io/latest/run/${source}`, - zip: temp.path({prefix: name, dir: targetDir, suffix: '.zip'}) - }; - }); + + // lookup the source using the OpenAddresses API + // to find the most current job id and ensure validity + const version = await oa.lookup(id); + const valid = (version && version.job); + + // invalid source + if (!valid) { + return { id, error: `invalid source '${file}'` }; + } + + // valid source + return { id, url: oa.url(version.job) }; + })); } -function downloadSource(targetDir, file, main_callback) { +function downloadSource(targetDir, source, done) { + const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal'); + const token = config.get('imports.openaddresses.token'); const referer = config.get('imports.openaddresses.dataReferer') || 'https://pelias-results.openaddresses.io'; - logger.info(`Downloading ${file.csv}`); + logger.info(`Downloading ${source.id}`); + + const outFile = path.join(targetDir, `${source.id}.geojson`); + const tmpFile = temp.path({ + prefix: source.id.replace(new RegExp(path.sep, 'g'), '-'), + dir: targetDir, + suffix: '.gz' + }); async.series( [ - // download the zip file into the temp directory + // download the compressed file into the temp directory (callback) => { - logger.debug(`downloading ${file.url}`); + logger.debug(`downloading ${source.url}`); const flags = [ '--request GET', // HTTP GET '--silent', // be quiet @@ -83,15 +110,16 @@ function downloadSource(targetDir, file, main_callback) { '--fail', // exit with a non-zero code for >=400 responses '--write-out "%{http_code}"', // print status code to STDOUT `--referer ${referer}`, // set referer header - `--output ${file.zip}`, // set output filepath + `--output ${tmpFile}`, // set output filepath '--retry 5', // retry this number of times before giving up '--retry-connrefused', // consider ECONNREFUSED as a transient error - '--retry-delay 5' // sleep this many seconds between retry attempts + '--retry-delay 5', // sleep this many seconds between retry attempts + `-H 'Authorization: Bearer ${token}'` // authorization token ].join(' '); // the `--fail*` flags cause an error to be returned as the first arg with `error.code` // as the process exit status, the `-w "%{http_code}"` flag writes the HTTP status to STDOUT. - child_process.exec(`curl ${flags} ${file.url}`, (error, stdout) => { + child_process.exec(`curl ${flags} ${source.url}`, (error, stdout) => { if (!error) { return callback(); } // provide a more user-friendly error message @@ -99,22 +127,33 @@ function downloadSource(targetDir, file, main_callback) { callback(error); }); }, - // unzip file into target directory + // decompress file into target directory (callback) => { - logger.debug(`unzipping ${file.zip} to ${targetDir}`); - child_process.exec(`unzip -o -qq -d ${targetDir} ${file.zip}`, callback); + logger.debug(`decompress ${tmpFile} to ${outFile}`); + child_process.exec(` + mkdir -p ${path.dirname(outFile)}; + gzip -d < ${tmpFile} > ${outFile}; + `, (error, stdout) => { + if (!error) { return callback(); } + + // provide a more user-friendly error message + error.message = `decompress failed, ${stdout}`; + callback(error); + }); }, - // delete the temp downloaded zip file - fs.remove.bind(null, file.zip) ], - function(err) { + (err) => { if (err) { - logger.warn(`failed to download ${file.url}: ${err}`); + logger.warn(`failed to download ${source.url}: ${err}`); } + // ensure temp files are cleaned up + if (fs.existsSync(tmpFile)) { fs.unlinkSync(tmpFile); } + // honour 'imports.openaddresses.missingFilesAreFatal' setting - main_callback(errorsFatal ? err : null); - }); + done(errorsFatal ? err : null); + } + ); } module.exports = downloadFiltered;