Skip to content

Commit

Permalink
investigate sit
Browse files Browse the repository at this point in the history
Signed-off-by: qGYdXbY2 <[email protected]>
  • Loading branch information
qGYdXbY2 committed Nov 14, 2024
1 parent 01a3545 commit 92405cd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,11 @@ public ExecutionMode getExecutionMode() {

protected final DataSourceProvider requestResource(Database db, double estimatedMaxAcuLoad) throws TooManyResourcesClaimed {
Map<ExecutionResource, Double> neededResources = getAggregatedNeededResources();

if (!neededResources.containsKey(db) || claimedAcuLoad + estimatedMaxAcuLoad > neededResources.get(db))
throw new TooManyResourcesClaimed("Step " + getId() + " tried to claim further " + estimatedMaxAcuLoad + " ACUs, "
+ claimedAcuLoad + "/" + neededResources.get(db) + " have been claimed before.");
+ claimedAcuLoad + "/" + ( neededResources.containsKey(db) ? neededResources.get(db) : "missing[" + db.getName() + "]" )
+ " have been claimed before.");

claimedAcuLoad += estimatedMaxAcuLoad;
return db.getDataSources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,15 @@ public List<Load> getNeededResources() {

rList.add( new Load().withResource(loadDatabase(targetSpace.getStorage().getId(), WRITER))
.withEstimatedVirtualUnits(calculateNeededAcus()) );

if( isRemoteCopy(sourceSpace, targetSpace) )

boolean bRemoteCopy = isRemoteCopy(sourceSpace, targetSpace);

logger.info("[{}] Copy remote({}) {} -> {}", getGlobalStepId(),
bRemoteCopy,
sourceSpace.getStorage().getId(),
targetSpace.getStorage().getId() );

if( bRemoteCopy )
rList.add( new Load().withResource(loadDatabaseReaderElseWriter(sourceSpace.getStorage().getId()))
.withEstimatedVirtualUnits(calculateNeededAcus()) );

Expand Down Expand Up @@ -292,13 +299,9 @@ public void execute() throws Exception {
@Override
protected void onAsyncSuccess() throws WebClientException,
SQLException, TooManyResourcesClaimed, IOException {
logger.info( "Loading space config for target-space " + getTargetSpaceId());
Space targetSpace = loadSpace(getTargetSpaceId());
logger.info("Getting storage database for space "+getSpaceId());
// Database db = loadDatabase(targetSpace.getStorage().getId(), WRITER);

//@TODO: Add ACU calculation
// runWriteQueryAsync(buildCopySpaceNextVersionUpdate(getSchema(db), getRootTableName(targetSpace)), db, 0, false);
logger.info("[{}] AsyncSuccess Copy {} -> {}", getGlobalStepId(), getSpaceId() , getTargetSpaceId());

}

@Override
Expand Down Expand Up @@ -346,7 +349,7 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace) throw
targetSchema = getSchema( loadDatabase(targetStorageId, WRITER) ),
targetTable = _getRootTableName(targetSpace);

int maxBlkSize = 7;
int maxBlkSize = 1000;

final Map<String, Object> queryContext =
createQueryContext(getId(),
Expand All @@ -370,7 +373,8 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace) throw
jsonb_build_object('updateStrategy','{"onExists":null,"onNotExists":null,"onVersionConflict":null,"onMergeConflict":null}'::jsonb,
'partialUpdates',false,
'featureData', jsonb_build_object( 'type', 'FeatureCollection', 'features', jsonb_agg( iidata.feature ) )))::text
,iidata.author,false,(SELECT nextval('${schema}.${versionSequenceName}')))
,iidata.author,false,(SELECT nextval('${schema}.${versionSequenceName}'))
) as wfresult
from
(
select (row_number() over ())/${{maxblksize}} as rn, idata.author, idata.jsondata || jsonb_build_object('geometry',st_asgeojson(idata.geo)::json) as feature
Expand All @@ -379,7 +383,7 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace) throw
) iidata
group by rn, author
)
select count(1) into dummy_output from ins_data
select sum((wfresult::json->>'count')::bigint)::bigint into dummy_output from ins_data
"""
/**/
)
Expand Down Expand Up @@ -602,9 +606,7 @@ public static PropertiesQuery parsePropertiesQuery(String query, String property
return pq;
}

private double calculateNeededAcus() {
overallNeededAcus = overallNeededAcus != -1 ? overallNeededAcus : ResourceAndTimeCalculator.getInstance()
.calculateNeededAcusFromByteSize(getUncompressedUploadBytesEstimation());
return overallNeededAcus;
private double calculateNeededAcus() {
return 0.5;
}
}

0 comments on commit 92405cd

Please sign in to comment.