Skip to content

Commit

Permalink
HDFS-17564. EC: Fix the issue of inaccurate metrics when decommission…
Browse files Browse the repository at this point in the history
… mark busy DN. (#6911). Contributed by Haiyang Hu.

Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
haiyang1987 authored Jul 5, 2024
1 parent a571054 commit ae76e94
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,9 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) {
}

// Add block to the datanode's task list
rw.addTaskToDatanode(numReplicas);
if (!rw.addTaskToDatanode(numReplicas)) {
return false;
}
DatanodeStorageInfo.incrementBlocksScheduled(targets);

// Move the block-replication into a "pending" state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ abstract void chooseTargets(BlockPlacementPolicy blockplacement,
*
* @param numberReplicas replica details
*/
abstract void addTaskToDatanode(NumberReplicas numberReplicas);
abstract boolean addTaskToDatanode(NumberReplicas numberReplicas);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ private int chooseSource4SimpleReplication() {
}

@Override
void addTaskToDatanode(NumberReplicas numberReplicas) {
boolean addTaskToDatanode(NumberReplicas numberReplicas) {
final DatanodeStorageInfo[] targets = getTargets();
assert targets.length > 0;
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();

boolean flag = true;
if (hasNotEnoughRack()) {
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
Expand All @@ -152,6 +152,9 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
final int num = Math.min(leavingServiceSources.size(), targets.length);
if (num == 0) {
flag = false;
}
for (int i = 0; i < num; i++) {
createReplicationWork(leavingServiceSources.get(i), targets[i]);
}
Expand All @@ -160,6 +163,7 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
}
return flag;
}

private void createReplicationWork(int sourceIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ assert getSrcNodes().length > 0
}

@Override
void addTaskToDatanode(NumberReplicas numberReplicas) {
boolean addTaskToDatanode(NumberReplicas numberReplicas) {
getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,60 @@ public void testFileChecksumAfterDecommission() throws Exception {
fileChecksum1.equals(fileChecksum2));
}

/**
* Test decommission when DN marked as busy.
* @throwsException
*/
@Test(timeout = 120000)
public void testBusyAfterDecommissionNode() throws Exception {
int busyDNIndex = 0;
//1. create EC file.
final Path ecFile = new Path(ecDir, "testBusyAfterDecommissionNode");
int writeBytes = cellSize * dataBlocks;
writeStripedFile(dfs, ecFile, writeBytes);
Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);

//2. make once DN busy.
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(ecFile.toString()).asFile();
BlockInfo firstBlock = fileNode.getBlocks()[0];
DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
DatanodeDescriptor busyNode =
dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
for (int j = 0; j < replicationStreamsHardLimit; j++) {
busyNode.incrementPendingReplicationWithoutTargets();
}

//3. decomission one node.
List<DatanodeInfo> decommisionNodes = new ArrayList<>();
decommisionNodes.add(busyNode);
decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSION_INPROGRESS);

final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
bm.getDatanodeManager().fetchDatanodes(live, null, false);
int liveDecommissioning = 0;
for (DatanodeDescriptor node : live) {
liveDecommissioning += node.isDecommissionInProgress() ? 1 : 0;
}
assertEquals(decommisionNodes.size(), liveDecommissioning);

//4. wait for decommission block to replicate.
GenericTestUtils.waitFor(() -> bm.getLowRedundancyBlocksCount() == 1,
100, 3000);

int blocksScheduled = 0;
final List<DatanodeDescriptor> dnList = new ArrayList<>();
fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null,
false);
for (DatanodeDescriptor dn : dnList) {
blocksScheduled += dn.getBlocksScheduled();
}
assertEquals(0, blocksScheduled);
assertEquals(0, bm.getPendingReconstructionBlocksCount());
assertEquals(1, bm.getLowRedundancyBlocksCount());
}

private void testDecommission(int writeBytes, int storageCount,
int decomNodeCount, String filename) throws IOException, Exception {
Path ecFile = new Path(ecDir, filename);
Expand Down

0 comments on commit ae76e94

Please sign in to comment.