diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8f65673806bd0..41845152514fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2396,7 +2396,9 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) { } // Add block to the datanode's task list - rw.addTaskToDatanode(numReplicas); + if (!rw.addTaskToDatanode(numReplicas)) { + return false; + } DatanodeStorageInfo.incrementBlocksScheduled(targets); // Move the block-replication into a "pending" state. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java index df76a15c733f0..6ea046204fbda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java @@ -145,5 +145,5 @@ abstract void chooseTargets(BlockPlacementPolicy blockplacement, * * @param numberReplicas replica details */ - abstract void addTaskToDatanode(NumberReplicas numberReplicas); + abstract boolean addTaskToDatanode(NumberReplicas numberReplicas); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index b8c396696ab11..5726fac0b3d79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -136,11 +136,11 @@ private int chooseSource4SimpleReplication() { } @Override - void addTaskToDatanode(NumberReplicas numberReplicas) { + boolean addTaskToDatanode(NumberReplicas numberReplicas) { final DatanodeStorageInfo[] targets = getTargets(); assert targets.length > 0; BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock(); - + boolean flag = true; if (hasNotEnoughRack()) { // if we already have all the internal blocks, but not enough racks, // we only need to replicate one internal block to a new rack @@ -152,6 +152,9 @@ void addTaskToDatanode(NumberReplicas numberReplicas) { List leavingServiceSources = findLeavingServiceSources(); // decommissioningSources.size() should be >= targets.length final int num = Math.min(leavingServiceSources.size(), targets.length); + if (num == 0) { + flag = false; + } for (int i = 0; i < num; i++) { createReplicationWork(leavingServiceSources.get(i), targets[i]); } @@ -160,6 +163,7 @@ void addTaskToDatanode(NumberReplicas numberReplicas) { new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets, liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy()); } + return flag; } private void createReplicationWork(int sourceIndex, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java index 15e5d5cdc2729..19b56f171523a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@ -61,7 +61,8 @@ assert getSrcNodes().length > 0 } @Override - void addTaskToDatanode(NumberReplicas numberReplicas) { + boolean addTaskToDatanode(NumberReplicas numberReplicas) { getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets()); + return true; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index 1de8fc17ee802..83332cc3134b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -462,6 +462,60 @@ public void testFileChecksumAfterDecommission() throws Exception { fileChecksum1.equals(fileChecksum2)); } + /** + * Test decommission when DN marked as busy. + * @throwsException + */ + @Test(timeout = 120000) + public void testBusyAfterDecommissionNode() throws Exception { + int busyDNIndex = 0; + //1. create EC file. + final Path ecFile = new Path(ecDir, "testBusyAfterDecommissionNode"); + int writeBytes = cellSize * dataBlocks; + writeStripedFile(dfs, ecFile, writeBytes); + Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); + FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes); + + //2. make once DN busy. + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(ecFile.toString()).asFile(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock); + DatanodeDescriptor busyNode = + dnStorageInfos[busyDNIndex].getDatanodeDescriptor(); + for (int j = 0; j < replicationStreamsHardLimit; j++) { + busyNode.incrementPendingReplicationWithoutTargets(); + } + + //3. decomission one node. + List decommisionNodes = new ArrayList<>(); + decommisionNodes.add(busyNode); + decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSION_INPROGRESS); + + final List live = new ArrayList(); + bm.getDatanodeManager().fetchDatanodes(live, null, false); + int liveDecommissioning = 0; + for (DatanodeDescriptor node : live) { + liveDecommissioning += node.isDecommissionInProgress() ? 1 : 0; + } + assertEquals(decommisionNodes.size(), liveDecommissioning); + + //4. wait for decommission block to replicate. + GenericTestUtils.waitFor(() -> bm.getLowRedundancyBlocksCount() == 1, + 100, 3000); + + int blocksScheduled = 0; + final List dnList = new ArrayList<>(); + fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null, + false); + for (DatanodeDescriptor dn : dnList) { + blocksScheduled += dn.getBlocksScheduled(); + } + assertEquals(0, blocksScheduled); + assertEquals(0, bm.getPendingReconstructionBlocksCount()); + assertEquals(1, bm.getLowRedundancyBlocksCount()); + } + private void testDecommission(int writeBytes, int storageCount, int decomNodeCount, String filename) throws IOException, Exception { Path ecFile = new Path(ecDir, filename);