diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 7f053380f8..44db3a6824 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -260,6 +260,7 @@ private void flipDataBuffers() { private final Coordinator coordinator; private final CellBuffers cellBuffers; + private final ErasureCodingPolicy ecPolicy; private final RawErasureEncoder encoder; private final List streamers; private final DFSPacket[] currentPackets; // current Packet of each streamer @@ -286,7 +287,7 @@ private void flipDataBuffers() { LOG.debug("Creating DFSStripedOutputStream for " + src); } - final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy(); + ecPolicy = stat.getErasureCodingPolicy(); final int numParityBlocks = ecPolicy.getNumParityUnits(); cellSize = ecPolicy.getCellSize(); numDataBlocks = ecPolicy.getNumDataUnits(); @@ -478,11 +479,6 @@ private void allocateNewBlock() throws IOException { final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src, currentBlockGroup, fileId, favoredNodes, getAddBlockFlags()); assert lb.isStriped(); - if (lb.getLocations().length < numDataBlocks) { - throw new IOException("Failed to get " + numDataBlocks - + " nodes from namenode: blockGroupSize= " + numAllBlocks - + ", blocks.length= " + lb.getLocations().length); - } // assign the new block to the current block group currentBlockGroup = lb.getBlock(); blockGroupIndex++; @@ -494,11 +490,16 @@ private void allocateNewBlock() throws IOException { StripedDataStreamer si = getStripedDataStreamer(i); assert si.isHealthy(); if (blocks[i] == null) { + // allocBlock() should guarantee that all data blocks are successfully + // allocated. + assert i >= numDataBlocks; // Set exception and close streamer as there is no block locations // found for the parity block. - LOG.warn("Failed to get block location for parity block, index=" + i); + LOG.warn("Cannot allocate parity block(index={}, policy={}). " + + "Not enough datanodes? Exclude nodes={}", i, ecPolicy.getName(), + excludedNodes); si.getLastException().set( - new IOException("Failed to get following block, i=" + i)); + new IOException("Failed to get parity block, index=" + i)); si.getErrorState().setInternalError(); si.close(true); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f4e5cb4656..f33ec63cf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2057,6 +2057,7 @@ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final List favoredNodes, final byte storagePolicyID, final BlockType blockType, + final ErasureCodingPolicy ecPolicy, final EnumSet flags) throws IOException { List favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); @@ -2067,14 +2068,23 @@ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, favoredDatanodeDescriptors, storagePolicy, flags); - if (targets.length < minReplication) { - throw new IOException("File " + src + " could only be replicated to " - + targets.length + " nodes instead of minReplication (=" - + minReplication + "). There are " - + getDatanodeManager().getNetworkTopology().getNumOfLeaves() - + " datanode(s) running and " - + (excludedNodes == null? "no": excludedNodes.size()) - + " node(s) are excluded in this operation."); + + final String errorMessage = "File %s could only be written to %d of " + + "the %d %s. There are %d datanode(s) running and %s " + + "node(s) are excluded in this operation."; + if (blockType == BlockType.CONTIGUOUS && targets.length < minReplication) { + throw new IOException(String.format(errorMessage, src, + targets.length, minReplication, "minReplication nodes", + getDatanodeManager().getNetworkTopology().getNumOfLeaves(), + (excludedNodes == null? "no": excludedNodes.size()))); + } else if (blockType == BlockType.STRIPED && + targets.length < ecPolicy.getNumDataUnits()) { + throw new IOException( + String.format(errorMessage, src, targets.length, + ecPolicy.getNumDataUnits(), + String.format("required nodes for %s", ecPolicy.getName()), + getDatanodeManager().getNetworkTopology().getNumOfLeaves(), + (excludedNodes == null ? "no" : excludedNodes.size()))); } return targets; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 012e916f91..b202212669 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -201,7 +201,7 @@ static ValidateAddBlockResult validateAddBlock( } storagePolicyID = pendingFile.getStoragePolicyID(); return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, - clientMachine, blockType); + clientMachine, blockType, ecPolicy); } static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, @@ -286,7 +286,7 @@ static DatanodeStorageInfo[] chooseTargetForNewBlock( return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, favoredNodesList, r.storagePolicyID, - r.blockType, flags); + r.blockType, r.ecPolicy, flags); } /** @@ -831,20 +831,28 @@ private static class FileState { } static class ValidateAddBlockResult { - final long blockSize; - final int numTargets; - final byte storagePolicyID; - final String clientMachine; - final BlockType blockType; + private final long blockSize; + private final int numTargets; + private final byte storagePolicyID; + private final String clientMachine; + private final BlockType blockType; + private final ErasureCodingPolicy ecPolicy; ValidateAddBlockResult( long blockSize, int numTargets, byte storagePolicyID, - String clientMachine, BlockType blockType) { + String clientMachine, BlockType blockType, + ErasureCodingPolicy ecPolicy) { this.blockSize = blockSize; this.numTargets = numTargets; this.storagePolicyID = storagePolicyID; this.clientMachine = clientMachine; this.blockType = blockType; + this.ecPolicy = ecPolicy; + + if (blockType == BlockType.STRIPED) { + Preconditions.checkArgument(ecPolicy != null, + "ecPolicy is not specified for striped block"); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 231f260c69..ea889e38a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -42,6 +42,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.junit.Assert; @@ -284,7 +285,7 @@ public void testBlockTokenExpired() throws Exception { @Test(timeout = 90000) public void testAddBlockWhenNoSufficientDataBlockNumOfNodes() - throws IOException { + throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); try { @@ -303,20 +304,18 @@ public void testAddBlockWhenNoSufficientDataBlockNumOfNodes() DatanodeReportType.LIVE); assertEquals("Mismatches number of live Dns ", numDatanodes, info.length); final Path dirFile = new Path(dir, "ecfile"); - FSDataOutputStream out; - try { - out = dfs.create(dirFile, true); - out.write("something".getBytes()); - out.flush(); - out.close(); - Assert.fail("Failed to validate available dns against blkGroupSize"); - } catch (IOException ioe) { - // expected - GenericTestUtils.assertExceptionContains("Failed to get " + - dataBlocks + " nodes from namenode: blockGroupSize= " + - (dataBlocks + parityBlocks) + ", blocks.length= " + - numDatanodes, ioe); - } + LambdaTestUtils.intercept( + IOException.class, + "File " + dirFile + " could only be written to " + + numDatanodes + " of the " + dataBlocks + " required nodes for " + + getEcPolicy().getName(), + () -> { + try (FSDataOutputStream out = dfs.create(dirFile, true)) { + out.write("something".getBytes()); + out.flush(); + } + return 0; + }); } finally { tearDown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java index 8555e5d0ad..2fe0a1c295 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java @@ -118,7 +118,7 @@ public void testClientAndServerDoNotHaveCommonQop() throws Exception { HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); exception.expect(IOException.class); - exception.expectMessage("could only be replicated to 0 nodes"); + exception.expectMessage("could only be written to 0"); doTest(clientConf); } @@ -140,7 +140,7 @@ public void testServerSaslNoClientSasl() throws Exception { "configured or not supported in client"); } catch (IOException e) { GenericTestUtils.assertMatches(e.getMessage(), - "could only be replicated to 0 nodes"); + "could only be written to 0"); } finally { logs.stopCapturing(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 4c1ea7b763..10289ed0e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -1030,8 +1030,7 @@ public void testStorageWithRemainingCapacity() throws Exception { 0x1BAD5EED); } catch (RemoteException re) { - GenericTestUtils.assertExceptionContains("nodes instead of " - + "minReplication", re); + GenericTestUtils.assertExceptionContains("of the 1 minReplication", re); } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java index b7583c4e97..bcf38d6f7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java @@ -182,7 +182,7 @@ public void testStorageTypeStatsWhenStorageFailed() throws Exception { fail("Should throw exception, becuase no DISK storage available"); } catch (Exception e) { assertTrue(e.getMessage().contains( - "could only be replicated to 0 nodes instead")); + "could only be written to 0 of the 1 minReplication")); } // wait for heartbeat Thread.sleep(6000); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 74be90cfb5..b6c13188c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -175,8 +175,8 @@ public void testDeadNodeAsBlockTarget() throws Exception { // choose the targets, but local node should not get selected as this is not // part of the cluster anymore DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3, - clientNode, new HashSet(), 256 * 1024 * 1024L, null, (byte) 7, - BlockType.CONTIGUOUS, null); + clientNode, new HashSet<>(), 256 * 1024 * 1024L, null, (byte) 7, + BlockType.CONTIGUOUS, null, null); for (DatanodeStorageInfo datanodeStorageInfo : results) { assertFalse("Dead node should not be choosen", datanodeStorageInfo .getDatanodeDescriptor().equals(clientNode));