-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The BULK 2.0 API is faster and simpler to use. It's also specifically built to work with larger data sets. From a user's point of view, using the classic BULK API is the same. All you need to do is change the API type to "BULK2."
- Loading branch information
1 parent
d5269a7
commit 2eae61d
Showing
4 changed files
with
100 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import time | ||
import csv | ||
import sys | ||
import json | ||
import singer | ||
from singer import metrics | ||
|
||
|
||
BATCH_STATUS_POLLING_SLEEP = 20 | ||
DEFAULT_CHUNK_SIZE = 50000 | ||
|
||
LOGGER = singer.get_logger() | ||
|
||
class Bulk2(): | ||
bulk_url = '{}/services/data/v60.0/jobs/query' | ||
|
||
def __init__(self, sf): | ||
csv.field_size_limit(sys.maxsize) | ||
self.sf = sf | ||
|
||
|
||
def query(self, catalog_entry, state): | ||
job_id = self._create_job(catalog_entry, state) | ||
self._wait_for_job(job_id) | ||
|
||
for batch in self._get_next_batch(job_id): | ||
reader = csv.DictReader(batch.decode('utf-8').splitlines()) | ||
|
||
for row in reader: | ||
yield row | ||
|
||
|
||
def _get_bulk_headers(self): | ||
return {**self.sf.auth.rest_headers, "Content-Type": "application/json"} | ||
|
||
def _create_job(self, catalog_entry, state): | ||
url = self.bulk_url.format(self.sf.instance_url) | ||
start_date = self.sf.get_start_date(state, catalog_entry) | ||
|
||
query = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=False) | ||
|
||
body = { | ||
"operation": "query", | ||
"query": query, | ||
} | ||
|
||
with metrics.http_request_timer("create_job") as timer: | ||
timer.tags['sobject'] = catalog_entry['stream'] | ||
resp = self.sf._make_request( | ||
'POST', | ||
url, | ||
headers=self._get_bulk_headers(), | ||
body=json.dumps(body)) | ||
|
||
job = resp.json() | ||
|
||
return job['id'] | ||
|
||
def _wait_for_job(self, job_id): | ||
status_url = self.bulk_url + '/{}' | ||
url = status_url.format(self.sf.instance_url, job_id) | ||
status = None | ||
|
||
while status not in ('JobComplete', 'Failed'): | ||
resp = self.sf._make_request('GET', url, headers=self._get_bulk_headers()).json() | ||
status = resp['state'] | ||
|
||
if status == 'JobComplete': | ||
break | ||
|
||
if status == 'Failed': | ||
raise Exception("Job failed: {}".format(resp.json())) | ||
|
||
time.sleep(BATCH_STATUS_POLLING_SLEEP) | ||
|
||
def _get_next_batch(self, job_id): | ||
url = self.bulk_url + '/{}/results' | ||
url = url.format(self.sf.instance_url, job_id) | ||
locator = '' | ||
|
||
while locator != 'null': | ||
params = {"maxRecords": DEFAULT_CHUNK_SIZE} | ||
|
||
if locator != '': | ||
params['locator'] = locator | ||
|
||
resp = self.sf._make_request('GET', url, headers=self._get_bulk_headers(), params=params) | ||
locator = resp.headers.get('Sforce-Locator') | ||
|
||
yield resp.content |