Skip to content

Commit

Permalink
pool manager: retry request on pool up
Browse files Browse the repository at this point in the history
Motivation:
If a pool with the file is online and a tape copy available, then
dCache will trigger stage and wait until file is restored on disk.
However, if pool becomes available again, the stage request is not
interrupted and client will wait for tape.

Modification:
Update request container 'onPoolUp' logic to retry the request if
the file expected to be on that pool. Added unit test to validate the
behavior.

Result:
pool selection succeeds then a pool with the file becomes online
despite the on-going stage request.

NOTE (1): the stage request is not interrupted
NOTE (2): if newly enabled pool doesn't contains the expected file, then
double stage is very likely.

Target: master
Acked-by: Lea Morschel
Require-book: no
Require-notes: yes
(cherry picked from commit f945c3d)
Signed-off-by: Tigran Mkrtchyan <[email protected]>
  • Loading branch information
kofemann authored and lemora committed Oct 9, 2024
1 parent ae3fd77 commit 2423686
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* dCache - http://www.dcache.org/
*
* Copyright (C) 2021-2022 Deutsches Elektronen-Synchrotron
* Copyright (C) 2021-2024 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand All @@ -20,6 +20,8 @@

import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.StorageInfo;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -85,6 +87,11 @@ public FileAttributesBuilder withChecksum(Checksum checksum) {
return this;
}

public FileAttributesBuilder withLocations(String...locations) {
_attributes.setLocations(Arrays.asList(locations));
return this;
}

public FileAttributes build() {
if (!_checksums.isEmpty()) {
_attributes.setChecksums(_checksums);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.dcache.cells.CellStub;
import org.dcache.namespace.FileAttribute;
import org.dcache.poolmanager.CostException;
import org.dcache.poolmanager.Partition;
import org.dcache.poolmanager.PartitionManager;
Expand Down Expand Up @@ -363,10 +364,12 @@ public void poolStatusChanged(String poolName, int poolStatus) {
*
* in this construction we will fall down to next case
*/
if (rph.getPoolCandidate().equals(POOL_UNKNOWN_STRING)) {
if (rph.getPoolCandidate().equals(POOL_UNKNOWN_STRING) || rph.expectedOnPool(poolName)) {
LOGGER.info("Restore Manager : retrying : {}", rph);
rph.retry();
}

// fall through to retry requests scheduled on that pool
case PoolStatusChangedMessage.DOWN:
/*
* if pool is down, re-try all request scheduled to this
Expand Down Expand Up @@ -1039,6 +1042,16 @@ public String getPoolCandidate() {
}
}

/**
* Returns true if file is expected to be on specified pool.
* @param poolName pool name to check.
* @return true if file is expected to be on specified pool.
*/
public boolean expectedOnPool(String poolName) {
return _fileAttributes.isDefined(FileAttribute.LOCATIONS)
&& _fileAttributes.getLocations().contains(poolName);
}

private String getPoolCandidateState() {
if (_stageCandidate.isPresent()) {
return _stageCandidate.get().name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,73 @@ public void shouldCancelStageRequestOnFail() throws Exception {
is("rh kill 80D1B8B90CED30430608C58002811B3285FC"));
}

@Test
public void shouldRetryOnPoolUpEvenForStage() throws Exception {
var stagePool = aPool("stage-pool@dCacheDomain");
given(aPartitionManager().withDefault(aPartition().withStageAllowed(true)));
given(aPoolSelectionUnit().withNetUnit("all-net", "192.168.1.1")
.withProtocolUnit("HTTP", "http/1"));
given(aPoolMonitor().thatReturns(aPoolSelectorThat()
.onReadThrows(aFileNotInCacheException())
.onStageSelects(stagePool)));

given(aContainer("PoolManager@dCacheDomain").thatDoesNotSendHitMessages());

whenReceiving(aReadRequest()
.by(ROOT)
.forFile("80D1B8B90CED30430608C58002811B3285FC")
.withBillingPath("/public/test")
.withTransferPath("/uploads/50/test")
.withFileAttributes(fileAttributes().withSize(10, KiB)
.withLocations("some-pool")
.withStorageInfo(aStorageInfo().withLocation("osm://RZ1/bfid1")))
.withProtocolInfo(aProtocolInfo().withProtocol("http")
.withMajorVersion(1).withIPAddress("192.168.1.1")));

container.setPoolMonitor(poolMonitor);
whenReceiving(aPoolStatusChange().thatPool("some-pool").isUp());

var reply = replySentWith(endpoint);

then(reply).should().setFailed(eq(10021), any());
then(reply).should().setContext(eq(0), any());

then(endpoint).shouldHaveNoMoreInteractions();
}

@Test
public void shouldIgnoreOnRandomPoolUp() throws Exception {
var stagePool = aPool("stage-pool@dCacheDomain");
given(aPartitionManager().withDefault(aPartition().withStageAllowed(true)));
given(aPoolSelectionUnit().withNetUnit("all-net", "192.168.1.1")
.withProtocolUnit("HTTP", "http/1"));
given(aPoolMonitor().thatReturns(aPoolSelectorThat()
.onReadThrows(aFileNotInCacheException())
.onStageSelects(stagePool)));

given(aContainer("PoolManager@dCacheDomain").thatDoesNotSendHitMessages());

whenReceiving(aReadRequest()
.by(ROOT)
.forFile("80D1B8B90CED30430608C58002811B3285FC")
.withBillingPath("/public/test")
.withTransferPath("/uploads/50/test")
.withFileAttributes(fileAttributes().withSize(10, KiB)
.withLocations("some-pool")
.withStorageInfo(aStorageInfo().withLocation("osm://RZ1/bfid1")))
.withProtocolInfo(aProtocolInfo().withProtocol("http")
.withMajorVersion(1).withIPAddress("192.168.1.1")));

container.setPoolMonitor(poolMonitor);
whenReceiving(aPoolStatusChange().thatPool("random-pool").isUp());

var message = stageSentWith(endpoint);

// the only message we have is starting stage
assertThat(message.isReply(), is(false));
then(endpoint).shouldHaveNoMoreInteractions();
}

private void given(ContainerBuilder builder) {
container = builder.build();
}
Expand Down Expand Up @@ -3406,6 +3473,11 @@ private static PoolMgrSelectReadPoolMsg replySentWith(CellEndpoint endpointUsed)
return (PoolMgrSelectReadPoolMsg) envelope.getMessageObject();
}

private static PoolFetchFileMessage stageSentWith(CellEndpoint endpointUsed) {
var envelope = envelopeSentWith(endpointUsed);
return (PoolFetchFileMessage) envelope.getMessageObject();
}

private static List<PoolMgrSelectReadPoolMsg> allRepliesSentWith(CellEndpoint endpointUsed) {
var envelopeArg = ArgumentCaptor.forClass(CellMessage.class);
verify(endpointUsed, Mockito.atLeastOnce()).sendMessage(envelopeArg.capture());
Expand Down

0 comments on commit 2423686

Please sign in to comment.