From 92405cda7049a034e80a36883fcacdb0a094657c Mon Sep 17 00:00:00 2001 From: qGYdXbY2 <47661341+qGYdXbY2@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:05:46 +0100 Subject: [PATCH] investigate sit Signed-off-by: qGYdXbY2 <47661341+qGYdXbY2@users.noreply.github.com> --- .../steps/execution/db/DatabaseBasedStep.java | 4 ++- .../jobs/steps/impl/transport/CopySpace.java | 32 ++++++++++--------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/db/DatabaseBasedStep.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/db/DatabaseBasedStep.java index b15ba7055..bf867bb9e 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/db/DatabaseBasedStep.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/db/DatabaseBasedStep.java @@ -306,9 +306,11 @@ public ExecutionMode getExecutionMode() { protected final DataSourceProvider requestResource(Database db, double estimatedMaxAcuLoad) throws TooManyResourcesClaimed { Map neededResources = getAggregatedNeededResources(); + if (!neededResources.containsKey(db) || claimedAcuLoad + estimatedMaxAcuLoad > neededResources.get(db)) throw new TooManyResourcesClaimed("Step " + getId() + " tried to claim further " + estimatedMaxAcuLoad + " ACUs, " - + claimedAcuLoad + "/" + neededResources.get(db) + " have been claimed before."); + + claimedAcuLoad + "/" + ( neededResources.containsKey(db) ? neededResources.get(db) : "missing[" + db.getName() + "]" ) + + " have been claimed before."); claimedAcuLoad += estimatedMaxAcuLoad; return db.getDataSources(); diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java index 2e0b9b6fa..1af259617 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CopySpace.java @@ -194,8 +194,15 @@ public List getNeededResources() { rList.add( new Load().withResource(loadDatabase(targetSpace.getStorage().getId(), WRITER)) .withEstimatedVirtualUnits(calculateNeededAcus()) ); - - if( isRemoteCopy(sourceSpace, targetSpace) ) + + boolean bRemoteCopy = isRemoteCopy(sourceSpace, targetSpace); + + logger.info("[{}] Copy remote({}) {} -> {}", getGlobalStepId(), + bRemoteCopy, + sourceSpace.getStorage().getId(), + targetSpace.getStorage().getId() ); + + if( bRemoteCopy ) rList.add( new Load().withResource(loadDatabaseReaderElseWriter(sourceSpace.getStorage().getId())) .withEstimatedVirtualUnits(calculateNeededAcus()) ); @@ -292,13 +299,9 @@ public void execute() throws Exception { @Override protected void onAsyncSuccess() throws WebClientException, SQLException, TooManyResourcesClaimed, IOException { - logger.info( "Loading space config for target-space " + getTargetSpaceId()); - Space targetSpace = loadSpace(getTargetSpaceId()); - logger.info("Getting storage database for space "+getSpaceId()); - // Database db = loadDatabase(targetSpace.getStorage().getId(), WRITER); - //@TODO: Add ACU calculation -// runWriteQueryAsync(buildCopySpaceNextVersionUpdate(getSchema(db), getRootTableName(targetSpace)), db, 0, false); + logger.info("[{}] AsyncSuccess Copy {} -> {}", getGlobalStepId(), getSpaceId() , getTargetSpaceId()); + } @Override @@ -346,7 +349,7 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace) throw targetSchema = getSchema( loadDatabase(targetStorageId, WRITER) ), targetTable = _getRootTableName(targetSpace); - int maxBlkSize = 7; + int maxBlkSize = 1000; final Map queryContext = createQueryContext(getId(), @@ -370,7 +373,8 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace) throw jsonb_build_object('updateStrategy','{"onExists":null,"onNotExists":null,"onVersionConflict":null,"onMergeConflict":null}'::jsonb, 'partialUpdates',false, 'featureData', jsonb_build_object( 'type', 'FeatureCollection', 'features', jsonb_agg( iidata.feature ) )))::text - ,iidata.author,false,(SELECT nextval('${schema}.${versionSequenceName}'))) + ,iidata.author,false,(SELECT nextval('${schema}.${versionSequenceName}')) + ) as wfresult from ( select (row_number() over ())/${{maxblksize}} as rn, idata.author, idata.jsondata || jsonb_build_object('geometry',st_asgeojson(idata.geo)::json) as feature @@ -379,7 +383,7 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace) throw ) iidata group by rn, author ) - select count(1) into dummy_output from ins_data + select sum((wfresult::json->>'count')::bigint)::bigint into dummy_output from ins_data """ /**/ ) @@ -602,9 +606,7 @@ public static PropertiesQuery parsePropertiesQuery(String query, String property return pq; } - private double calculateNeededAcus() { - overallNeededAcus = overallNeededAcus != -1 ? overallNeededAcus : ResourceAndTimeCalculator.getInstance() - .calculateNeededAcusFromByteSize(getUncompressedUploadBytesEstimation()); - return overallNeededAcus; + private double calculateNeededAcus() { + return 0.5; } }