HDFS-12942. Synchronization issue in FSDataSetImpl#moveBlock. Contributed by Ajay Kumar.

This commit is contained in:
Anu Engineer 2018-02-01 18:03:01 -08:00
parent 09dd709d6e
commit aa45faf0b2
2 changed files with 178 additions and 20 deletions

View File

@ -971,24 +971,72 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
* @throws IOException * @throws IOException
*/ */
private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws FsVolumeReference volumeRef) throws IOException {
IOException { ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
volumeRef);
finalizeNewReplica(newReplicaInfo, block);
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
return newReplicaInfo;
}
/**
* Cleanup the replicaInfo object passed.
*
* @param bpid - block pool id
* @param replicaInfo - ReplicaInfo
*/
private void cleanupReplica(String bpid, ReplicaInfo replicaInfo) {
if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) {
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk());
if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) {
volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength());
}
}
}
/**
* Create a new temporary replica of replicaInfo object in specified volume.
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @return newReplicaInfo new replica object created in specified volume.
* @throws IOException
*/
@VisibleForTesting
ReplicaInfo copyReplicaToVolume(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first // Copy files to temp dir first
ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block, ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block,
replicaInfo, smallBufferSize, conf); replicaInfo, smallBufferSize, conf);
return newReplicaInfo;
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
try (AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.incrNumBlocks(block.getBlockPoolId());
} }
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId()); /**
return newReplicaInfo; * Finalizes newReplica by calling finalizeReplica internally.
*
* @param newReplicaInfo - ReplicaInfo
* @param block - Extended Block
* @throws IOException
*/
@VisibleForTesting
void finalizeNewReplica(ReplicaInfo newReplicaInfo,
ExtendedBlock block) throws IOException {
// Finalize the copied files
try {
String bpid = block.getBlockPoolId();
finalizeReplica(bpid, newReplicaInfo);
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.incrNumBlocks(bpid);
} catch (IOException ioe) {
// Cleanup block data and metadata
// Decrement of dfsUsed and noOfBlocks for volume not required
newReplicaInfo.deleteBlockData();
newReplicaInfo.deleteMetadata();
throw ioe;
}
} }
/** /**
@ -1664,6 +1712,13 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
throw new IOException("Generation Stamp should be monotonically "
+ "increased.");
}
ReplicaInfo newReplicaInfo = null; ReplicaInfo newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR && if (replicaInfo.getState() == ReplicaState.RUR &&
replicaInfo.getOriginalReplica().getState() replicaInfo.getOriginalReplica().getState()
@ -1689,6 +1744,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
} }
assert newReplicaInfo.getState() == ReplicaState.FINALIZED assert newReplicaInfo.getState() == ReplicaState.FINALIZED
: "Replica should be finalized"; : "Replica should be finalized";
volumeMap.add(bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
return newReplicaInfo; return newReplicaInfo;
} }
@ -2940,6 +2996,13 @@ private void setupAsyncLazyPersistThread(final FsVolumeImpl v) {
} }
} }
/**
* Cleanup the old replica and notifies the NN about new replica.
*
* @param replicaInfo - Old replica to be deleted
* @param newReplicaInfo - New replica object
* @param bpid - block pool id
*/
private void removeOldReplica(ReplicaInfo replicaInfo, private void removeOldReplica(ReplicaInfo replicaInfo,
ReplicaInfo newReplicaInfo, final String bpid) { ReplicaInfo newReplicaInfo, final String bpid) {
// Before deleting the files from old storage we must notify the // Before deleting the files from old storage we must notify the
@ -2958,13 +3021,7 @@ private void removeOldReplica(ReplicaInfo replicaInfo,
newReplicaInfo.isOnTransientStorage()); newReplicaInfo.isOnTransientStorage());
// Remove the old replicas // Remove the old replicas
if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) { cleanupReplica(bpid, replicaInfo);
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk());
if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) {
volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength());
}
}
// If deletion failed then the directory scanner will cleanup the blocks // If deletion failed then the directory scanner will cleanup the blocks
// eventually. // eventually.

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Assert; import org.junit.Assert;
@ -747,4 +748,104 @@ public void testReportBadBlocks() throws Exception {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout = 30000)
public void testMoveBlockFailure() {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("testData");
DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
// Append to file to update its GS
FSDataOutputStream out = fs.append(filePath, (short) 1);
out.write(100);
out.hflush();
// Call finalizeNewReplica
LOG.info("GenerationStamp of old replica: {}",
block.getGenerationStamp());
LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl
.getReplicaInfo(block.getBlockPoolId(), newReplicaInfo.getBlockId())
.getGenerationStamp());
LambdaTestUtils.intercept(IOException.class, "Generation Stamp "
+ "should be monotonically increased.",
() -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block));
} catch (Exception ex) {
LOG.info("Exception in testMoveBlockFailure ", ex);
fail("Exception while testing testMoveBlockFailure ");
} finally {
if (cluster.isClusterUp()) {
cluster.shutdown();
}
}
}
@Test(timeout = 30000)
public void testMoveBlockSuccess() {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("testData");
DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block);
} catch (Exception ex) {
LOG.info("Exception in testMoveBlockSuccess ", ex);
fail("MoveBlock operation should succeed");
} finally {
if (cluster.isClusterUp()) {
cluster.shutdown();
}
}
}
/**
* Create a new temporary replica of replicaInfo object in another volume.
*
* @param block - Extended Block
* @param fsDataSetImpl - FsDatasetImpl reference
* @throws IOException
*/
private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl
fsDataSetImpl) throws IOException {
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
FsVolumeSpi destVolume = null;
final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID();
try (FsVolumeReferences volumeReferences =
fsDataSetImpl.getFsVolumeReferences()) {
for (int i = 0; i < volumeReferences.size(); i++) {
if (!volumeReferences.get(i).getStorageID().equals(srcStorageId)) {
destVolume = volumeReferences.get(i);
break;
}
}
}
return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo,
destVolume.obtainReference());
}
} }