Skip to content

Commit

Permalink
Merge pull request #314 from IATI/develop
Browse files Browse the repository at this point in the history
Update method for generating Solr-doc ids
  • Loading branch information
simon-20 authored Feb 22, 2024
2 parents 4b63883 + ccebc9a commit d49b495
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions src/library/solrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ def validateLatLon(point_pos):


def process_hash_list(document_datasets):
"""
:param list document_datasets: A list of documents to be Solrized. Each item in the
list is itself a list with the following elements: doc.hash, doc.id,
doc.solr_api_error
"""

conn = db.getDirectConnection()
blob_service_client = BlobServiceClient.from_connection_string(
Expand All @@ -99,8 +105,7 @@ def process_hash_list(document_datasets):
file_hash = file_data[0]
file_id = file_data[1]

flattened_activities = db.getFlattenedActivitiesForDoc(
conn, file_id)
flattened_activities = db.getFlattenedActivitiesForDoc(conn, file_id)

if flattened_activities is None or flattened_activities[0] is None:
raise SolrizeSourceError(
Expand All @@ -115,9 +120,7 @@ def process_hash_list(document_datasets):
try:
solr_cores[core_name].ping()
except Exception as e:
e_message = ''
if hasattr(e, 'args'):
e_message = e.args[0]
e_message = e.args[0] if hasattr(e, 'args') else ''
raise SolrPingError('PINGING hash: ' + file_hash + ' and id: ' + file_id +
', from collection with name ' + core_name + ': ' + e_message)

Expand All @@ -131,23 +134,22 @@ def process_hash_list(document_datasets):
solr_cores[core_name].delete(
q='iati_activities_document_id:' + file_id)
except Exception as e:
e_message = ''
if hasattr(e, 'args'):
e_message = e.args[0]
e_message = e.args[0] if hasattr(e, 'args') else ''
raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id +
', from collection with name ' + core_name + ': ' + e_message)

logger.info('Adding docs for hash: ' +
file_hash + ' and id: ' + file_id)
logger.info('Adding docs for hash: ' + file_hash + ' and id: ' + file_id)

identifier_indices = {}

for fa in flattened_activities[0]:
hashed_identifier = utils.get_hash_for_identifier(
fa['iati_identifier'])
blob_name = '{}/{}.xml'.format(file_id, hashed_identifier)
hashed_iati_identifier = utils.get_hash_for_identifier(fa['iati_identifier'])
blob_name = '{}/{}.xml'.format(file_id, hashed_iati_identifier)

try:
blob_client = blob_service_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'], blob=blob_name)
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=blob_name)
downloader = blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
Expand All @@ -159,17 +161,17 @@ def process_hash_list(document_datasets):
)

try:
fa['iati_xml'] = utils.get_text_from_blob(
downloader, blob_name)
fa['iati_xml'] = utils.get_text_from_blob(downloader, blob_name)
except:
raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name +
', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier'])

json_blob_name = '{}/{}.json'.format(file_id, hashed_identifier)
json_blob_name = '{}/{}.json'.format(file_id, hashed_iati_identifier)

try:
json_blob_client = blob_service_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'], blob=json_blob_name)
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=json_blob_name)
json_downloader = json_blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
Expand All @@ -188,6 +190,7 @@ def process_hash_list(document_datasets):
', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier'])

fa['iati_activities_document_id'] = file_id
fa['iati_activities_document_hash'] = file_hash

# transform location_point_pos for default SOLR LatLonPointSpatialField
try:
Expand All @@ -210,7 +213,11 @@ def process_hash_list(document_datasets):
sub_list_data[element_name] = fa['@'+element_name]
del fa['@'+element_name]

fa['id'] = utils.get_hash_for_identifier(json.dumps(fa))
identifier_indices[hashed_iati_identifier] = identifier_indices.get(hashed_iati_identifier, -1) + 1

fa['id'] = "{}--{}--{}".format(file_id,
hashed_iati_identifier,
identifier_indices[hashed_iati_identifier])

addToSolr('activity', [fa], file_hash, file_id)

Expand All @@ -220,8 +227,8 @@ def process_hash_list(document_datasets):

# Now index explode_elements
for element_name, element_data in sub_list_data.items():
res = get_explode_element_data(element_name, element_data, fa)
addToSolr(element_name, res, file_hash, file_id)
results = get_explode_element_data(element_name, element_data, fa)
addToSolr(element_name, results, file_hash, file_id)

logger.info('Updating DB with successful Solrize for hash: ' +
file_hash + ' and id: ' + file_id)
Expand Down

0 comments on commit d49b495

Please sign in to comment.