HDFS-12776. [READ] Increasing replication for PROVIDED files should create local replicas
This commit is contained in:
parent
87dc026bee
commit
90d1b47a2a
@ -187,20 +187,23 @@ public int getCapacity() {
|
|||||||
*/
|
*/
|
||||||
DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
|
DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
|
||||||
int len = getCapacity();
|
int len = getCapacity();
|
||||||
|
DatanodeStorageInfo providedStorageInfo = null;
|
||||||
for(int idx = 0; idx < len; idx++) {
|
for(int idx = 0; idx < len; idx++) {
|
||||||
DatanodeStorageInfo cur = getStorageInfo(idx);
|
DatanodeStorageInfo cur = getStorageInfo(idx);
|
||||||
if(cur != null) {
|
if(cur != null) {
|
||||||
if (cur.getStorageType() == StorageType.PROVIDED) {
|
if (cur.getStorageType() == StorageType.PROVIDED) {
|
||||||
//if block resides on provided storage, only match the storage ids
|
//if block resides on provided storage, only match the storage ids
|
||||||
if (dn.getStorageInfo(cur.getStorageID()) != null) {
|
if (dn.getStorageInfo(cur.getStorageID()) != null) {
|
||||||
return cur;
|
// do not return here as we have to check the other
|
||||||
|
// DatanodeStorageInfos for this block which could be local
|
||||||
|
providedStorageInfo = cur;
|
||||||
}
|
}
|
||||||
} else if (cur.getDatanodeDescriptor() == dn) {
|
} else if (cur.getDatanodeDescriptor() == dn) {
|
||||||
return cur;
|
return cur;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return providedStorageInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1512,6 +1512,13 @@ public ReplicaInPipeline convertTemporaryToRbw(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isReplicaProvided(ReplicaInfo replicaInfo) {
|
||||||
|
if (replicaInfo == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return replicaInfo.getVolume().getStorageType() == StorageType.PROVIDED;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public ReplicaHandler createTemporary(StorageType storageType,
|
public ReplicaHandler createTemporary(StorageType storageType,
|
||||||
String storageId, ExtendedBlock b, boolean isTransfer)
|
String storageId, ExtendedBlock b, boolean isTransfer)
|
||||||
@ -1530,12 +1537,14 @@ public ReplicaHandler createTemporary(StorageType storageType,
|
|||||||
isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|
isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|
||||||
|| currentReplicaInfo.getState() == ReplicaState.RBW;
|
|| currentReplicaInfo.getState() == ReplicaState.RBW;
|
||||||
/*
|
/*
|
||||||
* If the current block is old, reject.
|
* If the current block is not PROVIDED and old, reject.
|
||||||
* else If transfer request, then accept it.
|
* else If transfer request, then accept it.
|
||||||
* else if state is not RBW/Temporary, then reject
|
* else if state is not RBW/Temporary, then reject
|
||||||
|
* If current block is PROVIDED, ignore the replica.
|
||||||
*/
|
*/
|
||||||
if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
|
if (((currentReplicaInfo.getGenerationStamp() >= b
|
||||||
|| (!isTransfer && !isInPipeline)) {
|
.getGenerationStamp()) || (!isTransfer && !isInPipeline))
|
||||||
|
&& !isReplicaProvided(currentReplicaInfo)) {
|
||||||
throw new ReplicaAlreadyExistsException("Block " + b
|
throw new ReplicaAlreadyExistsException("Block " + b
|
||||||
+ " already exists in state " + currentReplicaInfo.getState()
|
+ " already exists in state " + currentReplicaInfo.getState()
|
||||||
+ " and thus cannot be created.");
|
+ " and thus cannot be created.");
|
||||||
@ -1555,11 +1564,17 @@ public ReplicaHandler createTemporary(StorageType storageType,
|
|||||||
+ " after " + writerStopMs + " miniseconds.");
|
+ " after " + writerStopMs + " miniseconds.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if lastFoundReplicaInfo is PROVIDED and FINALIZED,
|
||||||
|
// stopWriter isn't required.
|
||||||
|
if (isReplicaProvided(lastFoundReplicaInfo) &&
|
||||||
|
lastFoundReplicaInfo.getState() == ReplicaState.FINALIZED) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// Stop the previous writer
|
// Stop the previous writer
|
||||||
((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
|
((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
|
||||||
} while (true);
|
} while (true);
|
||||||
|
if (lastFoundReplicaInfo != null
|
||||||
if (lastFoundReplicaInfo != null) {
|
&& !isReplicaProvided(lastFoundReplicaInfo)) {
|
||||||
// Old blockfile should be deleted synchronously as it might collide
|
// Old blockfile should be deleted synchronously as it might collide
|
||||||
// with the new block if allocated in same volume.
|
// with the new block if allocated in same volume.
|
||||||
// Do the deletion outside of lock as its DISK IO.
|
// Do the deletion outside of lock as its DISK IO.
|
||||||
|
@ -401,33 +401,37 @@ private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client,
|
|||||||
public void testSetReplicationForProvidedFiles() throws Exception {
|
public void testSetReplicationForProvidedFiles() throws Exception {
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
startCluster(NNDIRPATH, 2, null,
|
// 10 Datanodes with both DISK and PROVIDED storage
|
||||||
new StorageType[][]{
|
startCluster(NNDIRPATH, 10,
|
||||||
{StorageType.PROVIDED},
|
new StorageType[]{
|
||||||
{StorageType.DISK}},
|
StorageType.PROVIDED, StorageType.DISK},
|
||||||
|
null,
|
||||||
false);
|
false);
|
||||||
|
|
||||||
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
|
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
|
||||||
Path file = new Path(filename);
|
Path file = new Path(filename);
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
//set the replication to 2, and test that the file has
|
// set the replication to 4, and test that the file has
|
||||||
//the required replication.
|
// the required replication.
|
||||||
fs.setReplication(file, (short) 2);
|
short newReplication = 4;
|
||||||
|
LOG.info("Setting replication of file {} to {}", filename, newReplication);
|
||||||
|
fs.setReplication(file, newReplication);
|
||||||
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
|
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
|
||||||
file, (short) 2, 10000);
|
file, newReplication, 10000);
|
||||||
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
||||||
cluster.getNameNodePort()), cluster.getConfiguration(0));
|
cluster.getNameNodePort()), cluster.getConfiguration(0));
|
||||||
getAndCheckBlockLocations(client, filename, 2);
|
getAndCheckBlockLocations(client, filename, newReplication);
|
||||||
|
|
||||||
//set the replication back to 1
|
// set the replication back to 1
|
||||||
fs.setReplication(file, (short) 1);
|
newReplication = 1;
|
||||||
|
LOG.info("Setting replication of file {} back to {}",
|
||||||
|
filename, newReplication);
|
||||||
|
fs.setReplication(file, newReplication);
|
||||||
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
|
DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
|
||||||
file, (short) 1, 10000);
|
file, newReplication, 10000);
|
||||||
//the only replica left should be the PROVIDED datanode
|
// the only replica left should be the PROVIDED datanode
|
||||||
DatanodeInfo[] infos = getAndCheckBlockLocations(client, filename, 1);
|
getAndCheckBlockLocations(client, filename, newReplication);
|
||||||
assertEquals(cluster.getDataNodes().get(0).getDatanodeUuid(),
|
|
||||||
infos[0].getDatanodeUuid());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user