HDFS-17299. Adding rack failure tolerance when creating a new file (#6566)

This commit is contained in:
ritegarg 2024-03-06 13:08:05 -08:00 committed by GitHub
parent 7012986fc3
commit 58afe43769
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 225 additions and 33 deletions

View File

@ -87,6 +87,7 @@
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalNotification;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -643,17 +644,17 @@ void setAccessToken(Token<BlockTokenIdentifier> t) {
this.accessToken = t;
}
private void setPipeline(LocatedBlock lb) {
protected void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
String[] storageIDs) {
protected void setPipeline(DatanodeInfo[] newNodes, StorageType[] newStorageTypes,
String[] newStorageIDs) {
synchronized (nodesLock) {
this.nodes = nodes;
this.nodes = newNodes;
}
this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
this.storageTypes = newStorageTypes;
this.storageIDs = newStorageIDs;
}
/**
@ -748,7 +749,7 @@ public void run() {
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream());
setupPipelineForCreate();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
@ -1607,8 +1608,11 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
*
* Returns boolean whether pipeline was setup successfully or not.
* This boolean is used upstream on whether to continue creating pipeline or throw exception
*/
private void setupPipelineForAppendOrRecovery() throws IOException {
private boolean setupPipelineForAppendOrRecovery() throws IOException {
// Check number of datanodes. Note that if there is no healthy datanode,
// this must be internal error because we mark external error in striped
// outputstream only when all the streamers are in the DATA_STREAMING stage
@ -1618,33 +1622,46 @@ private void setupPipelineForAppendOrRecovery() throws IOException {
LOG.warn(msg);
lastException.set(new IOException(msg));
streamerClosed = true;
return;
return false;
}
setupPipelineInternal(nodes, storageTypes, storageIDs);
return setupPipelineInternal(nodes, storageTypes, storageIDs);
}
protected void setupPipelineInternal(DatanodeInfo[] datanodes,
protected boolean setupPipelineInternal(DatanodeInfo[] datanodes,
StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false;
long newGS = 0L;
boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage;
while (!success && !streamerClosed && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
return;
return false;
}
final boolean isRecovery = errorState.hasInternalError();
final boolean isRecovery = errorState.hasInternalError() && !isCreateStage;
if (!handleBadDatanode()) {
return;
return false;
}
handleDatanodeReplacement();
// During create stage, min replication should still be satisfied.
if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
return false;
}
// get a new generation stamp and an access token
final LocatedBlock lb = updateBlockForPipeline();
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();
if (isCreateStage) {
block.setCurrentBlock(lb.getBlock());
}
// set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
isRecovery);
@ -1657,6 +1674,7 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
if (success) {
updatePipeline(newGS);
}
return success;
}
/**
@ -1795,7 +1813,7 @@ DatanodeInfo[] getExcludedNodes() {
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
protected LocatedBlock nextBlockOutputStream() throws IOException {
protected void setupPipelineForCreate() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
StorageType[] nextStorageTypes;
@ -1806,6 +1824,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
do {
errorState.resetInternalError();
lastException.clear();
streamerClosed = false;
DatanodeInfo[] excluded = getExcludedNodes();
lb = locateFollowingBlock(
@ -1817,26 +1836,33 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
nodes = lb.getLocations();
nextStorageTypes = lb.getStorageTypes();
nextStorageIDs = lb.getStorageIDs();
setPipeline(lb);
try {
// Connect to first DataNode in the list.
success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, 0L, false)
|| setupPipelineForAppendOrRecovery();
// Connect to first DataNode in the list.
success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
0L, false);
} catch(IOException ie) {
LOG.warn("Exception in setupPipelineForCreate " + this, ie);
success = false;
}
if (!success) {
LOG.warn("Abandoning " + block);
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
stat.getFileId(), src, dfsClient.clientName);
block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1
? Iterables.getLast(failed)
: nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
setPipeline(null, null, null);
}
} while (!success && --count >= 0);
if (!success) {
throw new IOException("Unable to create new block.");
}
return lb;
}
// connects to the first datanode in the pipeline

View File

@ -90,7 +90,7 @@ private LocatedBlock getFollowingBlock() throws IOException {
}
@Override
protected LocatedBlock nextBlockOutputStream() throws IOException {
protected void setupPipelineForCreate() throws IOException {
boolean success;
LocatedBlock lb = getFollowingBlock();
block.setCurrentBlock(lb.getBlock());
@ -101,7 +101,6 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
DatanodeInfo[] nodes = lb.getLocations();
StorageType[] storageTypes = lb.getStorageTypes();
String[] storageIDs = lb.getStorageIDs();
// Connect to the DataNode. If fail the internal error state will be set.
success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
false);
@ -113,7 +112,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
excludedNodes.put(badNode, badNode);
throw new IOException("Unable to create new block." + this);
}
return lb;
setPipeline(lb);
}
@VisibleForTesting
@ -122,18 +121,18 @@ LocatedBlock peekFollowingBlock() {
}
@Override
protected void setupPipelineInternal(DatanodeInfo[] nodes,
protected boolean setupPipelineInternal(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false;
while (!success && !streamerClosed() && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
return;
return false;
}
if (!handleBadDatanode()) {
// for striped streamer if it is datanode error then close the stream
// and return. no need to replace datanode
return;
return false;
}
// get a new generation stamp and an access token
@ -179,6 +178,7 @@ assert getErrorState().hasExternalError()
setStreamerAsClosed();
}
} // while
return success;
}
void setExternalError() {

View File

@ -218,7 +218,10 @@ class BlockReceiver implements Closeable {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
block, allowLazyPersist, newGs);
if (newGs != 0L) {
block.setGenerationStamp(newGs);
}
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;

View File

@ -335,6 +335,16 @@ ReplicaHandler createTemporary(StorageType storageType, String storageId,
ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
*
* @param b block
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica.
*
@ -468,7 +478,7 @@ void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
boolean isValidRbw(ExtendedBlock b);
/**
* Invalidates the specified blocks
* Invalidates the specified blocks.
* @param bpid Block pool Id
* @param invalidBlks - the blocks to be invalidated
* @throws IOException

View File

@ -1587,15 +1587,29 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
return createRbw(storageType, storageId, b, allowLazyPersist, 0L);
}
@Override // FsDatasetSpi
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist, long newGS) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
// In case of retries with same blockPoolId + blockId as before
// with updated GS, cleanup the old replica to avoid
// any multiple copies with same blockPoolId + blockId
if (newGS != 0L) {
cleanupReplica(b.getBlockPoolId(), replicaInfo);
} else {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
}
}
// create a new block
FsVolumeReference ref = null;

View File

@ -108,6 +108,8 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -2651,5 +2653,130 @@ public void testNameNodeCreateSnapshotTrashRootOnStartup()
}
}
@Test
public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
Configuration conf = getTestConfiguration();
conf.setClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class,
BlockPlacementPolicy.class);
conf.setBoolean(
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
false);
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION, 2);
// 3 racks & 3 nodes. 1 per rack
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
// kill one DN, so only 2 racks stays with active DN
cluster.stopDataNode(0);
// create a file with replication 3, for rack fault tolerant BPP,
// it should allocate nodes in all 3 racks.
DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
}
}
}
@Test
public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible()
throws Exception {
Configuration conf = getTestConfiguration();
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class);
conf.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, false);
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, 3);
// 3 racks & 3 nodes. 1 per rack
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
// kill one DN, so only 2 racks stays with active DN
cluster.stopDataNode(0);
LambdaTestUtils.intercept(IOException.class,
() ->
DFSTestUtil.createFile(fs, new Path("/testFile"),
1024L, (short) 3, 1024L));
}
}
@Test
public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
Configuration conf = getTestConfiguration();
conf.setClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class,
BlockPlacementPolicy.class);
conf.setBoolean(
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
false);
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION, 1);
// 3 racks & 3 nodes. 1 per rack
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
// kill 2 DN, so only 1 racks stays with active DN
cluster.stopDataNode(0);
cluster.stopDataNode(1);
// create a file with replication 3, for rack fault tolerant BPP,
// it should allocate nodes in all 3 racks.
DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
}
}
@Test
public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible()
throws Exception {
Configuration conf = getTestConfiguration();
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class,
BlockPlacementPolicy.class);
conf.setBoolean(
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
false);
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION, 2);
// 3 racks & 3 nodes. 1 per rack
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
// kill 2 DN, so only 1 rack stays with active DN
cluster.stopDataNode(0);
cluster.stopDataNode(1);
LambdaTestUtils.intercept(IOException.class,
() ->
DFSTestUtil.createFile(fs, new Path("/testFile"),
1024L, (short) 3, 1024L));
}
}
@Test
public void testAllRackFailureDuringPipelineSetup() throws Exception {
Configuration conf = getTestConfiguration();
conf.setClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class,
BlockPlacementPolicy.class);
conf.setBoolean(
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
false);
// 3 racks & 3 nodes. 1 per rack
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
// shutdown all DNs
cluster.shutdownDataNodes();
// create a file with replication 3, for rack fault tolerant BPP,
// it should allocate nodes in all 3 rack but fail because no DNs are present.
LambdaTestUtils.intercept(IOException.class,
() ->
DFSTestUtil.createFile(fs, new Path("/testFile"),
1024L, (short) 3, 1024L));
}
}
}

View File

@ -1204,6 +1204,12 @@ public synchronized ReplicaHandler createRbw(
return createTemporary(storageType, storageId, b, false);
}
@Override
public ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException {
return createRbw(storageType, storageId, b, allowLazyPersist);
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(StorageType storageType,
String storageId, ExtendedBlock b, boolean isTransfer)

View File

@ -153,6 +153,12 @@ public ReplicaHandler createRbw(StorageType storageType, String id,
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
@Override
public ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException {
return createRbw(storageType, storageId, b, allowLazyPersist);
}
@Override
public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS,
long minBytesRcvd, long maxBytesRcvd) throws IOException {