HDFS-12349. Improve log message when it could not alloc enough blocks for EC. (Lei (Eddy) Xu)
This commit is contained in:
parent
3a8d57a0a2
commit
fbe06b5880
@ -260,6 +260,7 @@ private void flipDataBuffers() {
|
|||||||
|
|
||||||
private final Coordinator coordinator;
|
private final Coordinator coordinator;
|
||||||
private final CellBuffers cellBuffers;
|
private final CellBuffers cellBuffers;
|
||||||
|
private final ErasureCodingPolicy ecPolicy;
|
||||||
private final RawErasureEncoder encoder;
|
private final RawErasureEncoder encoder;
|
||||||
private final List<StripedDataStreamer> streamers;
|
private final List<StripedDataStreamer> streamers;
|
||||||
private final DFSPacket[] currentPackets; // current Packet of each streamer
|
private final DFSPacket[] currentPackets; // current Packet of each streamer
|
||||||
@ -286,7 +287,7 @@ private void flipDataBuffers() {
|
|||||||
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
||||||
}
|
}
|
||||||
|
|
||||||
final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
|
ecPolicy = stat.getErasureCodingPolicy();
|
||||||
final int numParityBlocks = ecPolicy.getNumParityUnits();
|
final int numParityBlocks = ecPolicy.getNumParityUnits();
|
||||||
cellSize = ecPolicy.getCellSize();
|
cellSize = ecPolicy.getCellSize();
|
||||||
numDataBlocks = ecPolicy.getNumDataUnits();
|
numDataBlocks = ecPolicy.getNumDataUnits();
|
||||||
@ -478,11 +479,6 @@ private void allocateNewBlock() throws IOException {
|
|||||||
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
|
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
|
||||||
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
|
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
|
||||||
assert lb.isStriped();
|
assert lb.isStriped();
|
||||||
if (lb.getLocations().length < numDataBlocks) {
|
|
||||||
throw new IOException("Failed to get " + numDataBlocks
|
|
||||||
+ " nodes from namenode: blockGroupSize= " + numAllBlocks
|
|
||||||
+ ", blocks.length= " + lb.getLocations().length);
|
|
||||||
}
|
|
||||||
// assign the new block to the current block group
|
// assign the new block to the current block group
|
||||||
currentBlockGroup = lb.getBlock();
|
currentBlockGroup = lb.getBlock();
|
||||||
blockGroupIndex++;
|
blockGroupIndex++;
|
||||||
@ -494,11 +490,16 @@ private void allocateNewBlock() throws IOException {
|
|||||||
StripedDataStreamer si = getStripedDataStreamer(i);
|
StripedDataStreamer si = getStripedDataStreamer(i);
|
||||||
assert si.isHealthy();
|
assert si.isHealthy();
|
||||||
if (blocks[i] == null) {
|
if (blocks[i] == null) {
|
||||||
|
// allocBlock() should guarantee that all data blocks are successfully
|
||||||
|
// allocated.
|
||||||
|
assert i >= numDataBlocks;
|
||||||
// Set exception and close streamer as there is no block locations
|
// Set exception and close streamer as there is no block locations
|
||||||
// found for the parity block.
|
// found for the parity block.
|
||||||
LOG.warn("Failed to get block location for parity block, index=" + i);
|
LOG.warn("Cannot allocate parity block(index={}, policy={}). " +
|
||||||
|
"Not enough datanodes? Exclude nodes={}", i, ecPolicy.getName(),
|
||||||
|
excludedNodes);
|
||||||
si.getLastException().set(
|
si.getLastException().set(
|
||||||
new IOException("Failed to get following block, i=" + i));
|
new IOException("Failed to get parity block, index=" + i));
|
||||||
si.getErrorState().setInternalError();
|
si.getErrorState().setInternalError();
|
||||||
si.close(true);
|
si.close(true);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2057,6 +2057,7 @@ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
|
|||||||
final List<String> favoredNodes,
|
final List<String> favoredNodes,
|
||||||
final byte storagePolicyID,
|
final byte storagePolicyID,
|
||||||
final BlockType blockType,
|
final BlockType blockType,
|
||||||
|
final ErasureCodingPolicy ecPolicy,
|
||||||
final EnumSet<AddBlockFlag> flags) throws IOException {
|
final EnumSet<AddBlockFlag> flags) throws IOException {
|
||||||
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
||||||
getDatanodeDescriptors(favoredNodes);
|
getDatanodeDescriptors(favoredNodes);
|
||||||
@ -2067,14 +2068,23 @@ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
|
|||||||
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
|
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
|
||||||
numOfReplicas, client, excludedNodes, blocksize,
|
numOfReplicas, client, excludedNodes, blocksize,
|
||||||
favoredDatanodeDescriptors, storagePolicy, flags);
|
favoredDatanodeDescriptors, storagePolicy, flags);
|
||||||
if (targets.length < minReplication) {
|
|
||||||
throw new IOException("File " + src + " could only be replicated to "
|
final String errorMessage = "File %s could only be written to %d of " +
|
||||||
+ targets.length + " nodes instead of minReplication (="
|
"the %d %s. There are %d datanode(s) running and %s "
|
||||||
+ minReplication + "). There are "
|
+ "node(s) are excluded in this operation.";
|
||||||
+ getDatanodeManager().getNetworkTopology().getNumOfLeaves()
|
if (blockType == BlockType.CONTIGUOUS && targets.length < minReplication) {
|
||||||
+ " datanode(s) running and "
|
throw new IOException(String.format(errorMessage, src,
|
||||||
+ (excludedNodes == null? "no": excludedNodes.size())
|
targets.length, minReplication, "minReplication nodes",
|
||||||
+ " node(s) are excluded in this operation.");
|
getDatanodeManager().getNetworkTopology().getNumOfLeaves(),
|
||||||
|
(excludedNodes == null? "no": excludedNodes.size())));
|
||||||
|
} else if (blockType == BlockType.STRIPED &&
|
||||||
|
targets.length < ecPolicy.getNumDataUnits()) {
|
||||||
|
throw new IOException(
|
||||||
|
String.format(errorMessage, src, targets.length,
|
||||||
|
ecPolicy.getNumDataUnits(),
|
||||||
|
String.format("required nodes for %s", ecPolicy.getName()),
|
||||||
|
getDatanodeManager().getNetworkTopology().getNumOfLeaves(),
|
||||||
|
(excludedNodes == null ? "no" : excludedNodes.size())));
|
||||||
}
|
}
|
||||||
return targets;
|
return targets;
|
||||||
}
|
}
|
||||||
|
@ -201,7 +201,7 @@ static ValidateAddBlockResult validateAddBlock(
|
|||||||
}
|
}
|
||||||
storagePolicyID = pendingFile.getStoragePolicyID();
|
storagePolicyID = pendingFile.getStoragePolicyID();
|
||||||
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
|
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
|
||||||
clientMachine, blockType);
|
clientMachine, blockType, ecPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
|
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
|
||||||
@ -286,7 +286,7 @@ static DatanodeStorageInfo[] chooseTargetForNewBlock(
|
|||||||
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
|
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
|
||||||
excludedNodesSet, r.blockSize,
|
excludedNodesSet, r.blockSize,
|
||||||
favoredNodesList, r.storagePolicyID,
|
favoredNodesList, r.storagePolicyID,
|
||||||
r.blockType, flags);
|
r.blockType, r.ecPolicy, flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -831,20 +831,28 @@ private static class FileState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static class ValidateAddBlockResult {
|
static class ValidateAddBlockResult {
|
||||||
final long blockSize;
|
private final long blockSize;
|
||||||
final int numTargets;
|
private final int numTargets;
|
||||||
final byte storagePolicyID;
|
private final byte storagePolicyID;
|
||||||
final String clientMachine;
|
private final String clientMachine;
|
||||||
final BlockType blockType;
|
private final BlockType blockType;
|
||||||
|
private final ErasureCodingPolicy ecPolicy;
|
||||||
|
|
||||||
ValidateAddBlockResult(
|
ValidateAddBlockResult(
|
||||||
long blockSize, int numTargets, byte storagePolicyID,
|
long blockSize, int numTargets, byte storagePolicyID,
|
||||||
String clientMachine, BlockType blockType) {
|
String clientMachine, BlockType blockType,
|
||||||
|
ErasureCodingPolicy ecPolicy) {
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.numTargets = numTargets;
|
this.numTargets = numTargets;
|
||||||
this.storagePolicyID = storagePolicyID;
|
this.storagePolicyID = storagePolicyID;
|
||||||
this.clientMachine = clientMachine;
|
this.clientMachine = clientMachine;
|
||||||
this.blockType = blockType;
|
this.blockType = blockType;
|
||||||
|
this.ecPolicy = ecPolicy;
|
||||||
|
|
||||||
|
if (blockType == BlockType.STRIPED) {
|
||||||
|
Preconditions.checkArgument(ecPolicy != null,
|
||||||
|
"ecPolicy is not specified for striped block");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -284,7 +285,7 @@ public void testBlockTokenExpired() throws Exception {
|
|||||||
|
|
||||||
@Test(timeout = 90000)
|
@Test(timeout = 90000)
|
||||||
public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
|
public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
|
||||||
throws IOException {
|
throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
try {
|
try {
|
||||||
@ -303,20 +304,18 @@ public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
|
|||||||
DatanodeReportType.LIVE);
|
DatanodeReportType.LIVE);
|
||||||
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
||||||
final Path dirFile = new Path(dir, "ecfile");
|
final Path dirFile = new Path(dir, "ecfile");
|
||||||
FSDataOutputStream out;
|
LambdaTestUtils.intercept(
|
||||||
try {
|
IOException.class,
|
||||||
out = dfs.create(dirFile, true);
|
"File " + dirFile + " could only be written to " +
|
||||||
out.write("something".getBytes());
|
numDatanodes + " of the " + dataBlocks + " required nodes for " +
|
||||||
out.flush();
|
getEcPolicy().getName(),
|
||||||
out.close();
|
() -> {
|
||||||
Assert.fail("Failed to validate available dns against blkGroupSize");
|
try (FSDataOutputStream out = dfs.create(dirFile, true)) {
|
||||||
} catch (IOException ioe) {
|
out.write("something".getBytes());
|
||||||
// expected
|
out.flush();
|
||||||
GenericTestUtils.assertExceptionContains("Failed to get " +
|
}
|
||||||
dataBlocks + " nodes from namenode: blockGroupSize= " +
|
return 0;
|
||||||
(dataBlocks + parityBlocks) + ", blocks.length= " +
|
});
|
||||||
numDatanodes, ioe);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
tearDown();
|
tearDown();
|
||||||
}
|
}
|
||||||
|
@ -118,7 +118,7 @@ public void testClientAndServerDoNotHaveCommonQop() throws Exception {
|
|||||||
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
|
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
|
||||||
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
|
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
|
||||||
exception.expect(IOException.class);
|
exception.expect(IOException.class);
|
||||||
exception.expectMessage("could only be replicated to 0 nodes");
|
exception.expectMessage("could only be written to 0");
|
||||||
doTest(clientConf);
|
doTest(clientConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,7 +140,7 @@ public void testServerSaslNoClientSasl() throws Exception {
|
|||||||
"configured or not supported in client");
|
"configured or not supported in client");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
GenericTestUtils.assertMatches(e.getMessage(),
|
GenericTestUtils.assertMatches(e.getMessage(),
|
||||||
"could only be replicated to 0 nodes");
|
"could only be written to 0");
|
||||||
} finally {
|
} finally {
|
||||||
logs.stopCapturing();
|
logs.stopCapturing();
|
||||||
}
|
}
|
||||||
|
@ -1030,8 +1030,7 @@ public void testStorageWithRemainingCapacity() throws Exception {
|
|||||||
0x1BAD5EED);
|
0x1BAD5EED);
|
||||||
}
|
}
|
||||||
catch (RemoteException re) {
|
catch (RemoteException re) {
|
||||||
GenericTestUtils.assertExceptionContains("nodes instead of "
|
GenericTestUtils.assertExceptionContains("of the 1 minReplication", re);
|
||||||
+ "minReplication", re);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -182,7 +182,7 @@ public void testStorageTypeStatsWhenStorageFailed() throws Exception {
|
|||||||
fail("Should throw exception, becuase no DISK storage available");
|
fail("Should throw exception, becuase no DISK storage available");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertTrue(e.getMessage().contains(
|
assertTrue(e.getMessage().contains(
|
||||||
"could only be replicated to 0 nodes instead"));
|
"could only be written to 0 of the 1 minReplication"));
|
||||||
}
|
}
|
||||||
// wait for heartbeat
|
// wait for heartbeat
|
||||||
Thread.sleep(6000);
|
Thread.sleep(6000);
|
||||||
|
@ -175,8 +175,8 @@ public void testDeadNodeAsBlockTarget() throws Exception {
|
|||||||
// choose the targets, but local node should not get selected as this is not
|
// choose the targets, but local node should not get selected as this is not
|
||||||
// part of the cluster anymore
|
// part of the cluster anymore
|
||||||
DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3,
|
DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3,
|
||||||
clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7,
|
clientNode, new HashSet<>(), 256 * 1024 * 1024L, null, (byte) 7,
|
||||||
BlockType.CONTIGUOUS, null);
|
BlockType.CONTIGUOUS, null, null);
|
||||||
for (DatanodeStorageInfo datanodeStorageInfo : results) {
|
for (DatanodeStorageInfo datanodeStorageInfo : results) {
|
||||||
assertFalse("Dead node should not be choosen", datanodeStorageInfo
|
assertFalse("Dead node should not be choosen", datanodeStorageInfo
|
||||||
.getDatanodeDescriptor().equals(clientNode));
|
.getDatanodeDescriptor().equals(clientNode));
|
||||||
|
Loading…
Reference in New Issue
Block a user