HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid explicitly creating Files in the tests code. (lei)
This commit is contained in:
parent
9cb5d35353
commit
71e533a153
@ -1540,6 +1540,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-9250. Add Precondition check to LocatedBlock#addCachedLoc.
|
HDFS-9250. Add Precondition check to LocatedBlock#addCachedLoc.
|
||||||
(Xiao Chen via wang)
|
(Xiao Chen via wang)
|
||||||
|
|
||||||
|
HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid
|
||||||
|
explicitly creating Files in the tests code. (lei)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
@ -137,4 +138,58 @@ interface MaterializedReplica {
|
|||||||
*/
|
*/
|
||||||
MaterializedReplica getMaterializedReplica(ExtendedBlock block)
|
MaterializedReplica getMaterializedReplica(ExtendedBlock block)
|
||||||
throws ReplicaNotFoundException;
|
throws ReplicaNotFoundException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a finalized replica and add it into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createFinalizedReplica(ExtendedBlock block) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a finalized replica on a particular volume, and add it into
|
||||||
|
* the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaInPipeline} and add it into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createReplicaInPipeline(ExtendedBlock block) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaInPipeline} and add it into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createReplicaInPipeline(FsVolumeSpi volume, ExtendedBlock block)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaBeingWritten} and add it into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createRBW(ExtendedBlock block) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaBeingWritten} on the particular volume, and add it
|
||||||
|
* into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createRBW(FsVolumeSpi volume, ExtendedBlock block) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaWaitingToBeRecovered} object and add it into the
|
||||||
|
* FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createReplicaWaitingToBeRecovered(ExtendedBlock block)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaWaitingToBeRecovered} on the particular volume,
|
||||||
|
* and add it into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createReplicaWaitingToBeRecovered(
|
||||||
|
FsVolumeSpi volume, ExtendedBlock block) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ReplicaUnderRecovery} object and add it into the FsDataset.
|
||||||
|
*/
|
||||||
|
Replica createReplicaUnderRecovery(ExtendedBlock block, long recoveryId)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
@ -23,10 +23,20 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
@ -176,4 +186,103 @@ public MaterializedReplica getMaterializedReplica(ExtendedBlock block)
|
|||||||
blockFile, block.getGenerationStamp());
|
blockFile, block.getGenerationStamp());
|
||||||
return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
|
return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createFinalizedReplica(ExtendedBlock block)
|
||||||
|
throws IOException {
|
||||||
|
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
||||||
|
return createFinalizedReplica(volumes.get(0), block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
|
||||||
|
throws IOException {
|
||||||
|
FsVolumeImpl vol = (FsVolumeImpl) volume;
|
||||||
|
ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol,
|
||||||
|
vol.getCurrentDir().getParentFile());
|
||||||
|
dataset.volumeMap.add(block.getBlockPoolId(), info);
|
||||||
|
info.getBlockFile().createNewFile();
|
||||||
|
info.getMetaFile().createNewFile();
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createReplicaInPipeline(ExtendedBlock block)
|
||||||
|
throws IOException {
|
||||||
|
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
||||||
|
return createReplicaInPipeline(volumes.get(0), block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createReplicaInPipeline(
|
||||||
|
FsVolumeSpi volume, ExtendedBlock block) throws IOException {
|
||||||
|
FsVolumeImpl vol = (FsVolumeImpl) volume;
|
||||||
|
ReplicaInPipeline rip = new ReplicaInPipeline(
|
||||||
|
block.getBlockId(), block.getGenerationStamp(), volume,
|
||||||
|
vol.createTmpFile(
|
||||||
|
block.getBlockPoolId(), block.getLocalBlock()).getParentFile(),
|
||||||
|
0);
|
||||||
|
dataset.volumeMap.add(block.getBlockPoolId(), rip);
|
||||||
|
return rip;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createRBW(ExtendedBlock eb) throws IOException {
|
||||||
|
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
||||||
|
return createRBW(volumes.get(0), eb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
|
||||||
|
throws IOException {
|
||||||
|
FsVolumeImpl vol = (FsVolumeImpl) volume;
|
||||||
|
final String bpid = eb.getBlockPoolId();
|
||||||
|
final Block block = eb.getLocalBlock();
|
||||||
|
ReplicaBeingWritten rbw = new ReplicaBeingWritten(
|
||||||
|
eb.getLocalBlock(), volume,
|
||||||
|
vol.createRbwFile(bpid, block).getParentFile(), null);
|
||||||
|
rbw.getBlockFile().createNewFile();
|
||||||
|
rbw.getMetaFile().createNewFile();
|
||||||
|
dataset.volumeMap.add(bpid, rbw);
|
||||||
|
return rbw;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createReplicaWaitingToBeRecovered(ExtendedBlock eb)
|
||||||
|
throws IOException {
|
||||||
|
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
||||||
|
return createReplicaInPipeline(volumes.get(0), eb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createReplicaWaitingToBeRecovered(
|
||||||
|
FsVolumeSpi volume, ExtendedBlock eb) throws IOException {
|
||||||
|
FsVolumeImpl vol = (FsVolumeImpl) volume;
|
||||||
|
final String bpid = eb.getBlockPoolId();
|
||||||
|
final Block block = eb.getLocalBlock();
|
||||||
|
ReplicaWaitingToBeRecovered rwbr =
|
||||||
|
new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume,
|
||||||
|
vol.createRbwFile(bpid, block).getParentFile());
|
||||||
|
dataset.volumeMap.add(bpid, rwbr);
|
||||||
|
return rwbr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Replica createReplicaUnderRecovery(
|
||||||
|
ExtendedBlock block, long recoveryId) throws IOException {
|
||||||
|
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
|
||||||
|
ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica(
|
||||||
|
block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()),
|
||||||
|
recoveryId
|
||||||
|
);
|
||||||
|
dataset.volumeMap.add(block.getBlockPoolId(), rur);
|
||||||
|
return rur;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -365,12 +365,14 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeletingBlocks() throws IOException {
|
public void testDeletingBlocks() throws IOException {
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
|
FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
|
||||||
|
ds.addBlockPool(BLOCKPOOL, conf);
|
||||||
FsVolumeImpl vol;
|
FsVolumeImpl vol;
|
||||||
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
|
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
|
||||||
vol = (FsVolumeImpl)volumes.get(0);
|
vol = (FsVolumeImpl)volumes.get(0);
|
||||||
@ -378,15 +380,11 @@ public void testDeletingBlocks() throws IOException {
|
|||||||
|
|
||||||
ExtendedBlock eb;
|
ExtendedBlock eb;
|
||||||
ReplicaInfo info;
|
ReplicaInfo info;
|
||||||
List<Block> blockList = new ArrayList<Block>();
|
List<Block> blockList = new ArrayList<>();
|
||||||
for (int i = 1; i <= 63; i++) {
|
for (int i = 1; i <= 63; i++) {
|
||||||
eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
|
eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
|
||||||
info = new FinalizedReplica(
|
cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
|
||||||
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
|
blockList.add(eb.getLocalBlock());
|
||||||
ds.volumeMap.add(BLOCKPOOL, info);
|
|
||||||
info.getBlockFile().createNewFile();
|
|
||||||
info.getMetaFile().createNewFile();
|
|
||||||
blockList.add(info);
|
|
||||||
}
|
}
|
||||||
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
|
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
|
||||||
try {
|
try {
|
||||||
@ -398,12 +396,8 @@ public void testDeletingBlocks() throws IOException {
|
|||||||
|
|
||||||
blockList.clear();
|
blockList.clear();
|
||||||
eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
|
eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
|
||||||
info = new FinalizedReplica(
|
cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
|
||||||
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
|
blockList.add(eb.getLocalBlock());
|
||||||
ds.volumeMap.add(BLOCKPOOL, info);
|
|
||||||
info.getBlockFile().createNewFile();
|
|
||||||
info.getMetaFile().createNewFile();
|
|
||||||
blockList.add(info);
|
|
||||||
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
|
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -35,16 +34,11 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
@ -70,12 +64,12 @@ public void testClose() throws Exception {
|
|||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
|
||||||
|
|
||||||
// set up replicasMap
|
// set up replicasMap
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
|
||||||
ExtendedBlock[] blocks = setup(bpid, dataSet);
|
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
|
||||||
|
|
||||||
// test close
|
// test close
|
||||||
testClose(dataSet, blocks);
|
testClose(dataSet, blocks);
|
||||||
@ -91,11 +85,11 @@ public void testAppend() throws Exception {
|
|||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
|
||||||
|
|
||||||
// set up replicasMap
|
// set up replicasMap
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
ExtendedBlock[] blocks = setup(bpid, dataSet);
|
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
|
||||||
|
|
||||||
// test append
|
// test append
|
||||||
testAppend(bpid, dataSet, blocks);
|
testAppend(bpid, dataSet, blocks);
|
||||||
@ -115,7 +109,7 @@ public void testWriteToRbw() throws Exception {
|
|||||||
|
|
||||||
// set up replicasMap
|
// set up replicasMap
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
ExtendedBlock[] blocks = setup(bpid, dataSet);
|
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
|
||||||
|
|
||||||
// test writeToRbw
|
// test writeToRbw
|
||||||
testWriteToRbw(dataSet, blocks);
|
testWriteToRbw(dataSet, blocks);
|
||||||
@ -135,7 +129,7 @@ public void testWriteToTemporary() throws Exception {
|
|||||||
|
|
||||||
// set up replicasMap
|
// set up replicasMap
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
ExtendedBlock[] blocks = setup(bpid, dataSet);
|
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
|
||||||
|
|
||||||
// test writeToTemporary
|
// test writeToTemporary
|
||||||
testWriteToTemporary(dataSet, blocks);
|
testWriteToTemporary(dataSet, blocks);
|
||||||
@ -149,11 +143,12 @@ public void testWriteToTemporary() throws Exception {
|
|||||||
* on which to run the tests.
|
* on which to run the tests.
|
||||||
*
|
*
|
||||||
* @param bpid Block pool ID to generate blocks for
|
* @param bpid Block pool ID to generate blocks for
|
||||||
* @param dataSet Namespace in which to insert blocks
|
* @param testUtils FsDatasetTestUtils provides white box access to FsDataset.
|
||||||
* @return Contrived blocks for further testing.
|
* @return Contrived blocks for further testing.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOException {
|
private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils)
|
||||||
|
throws IOException {
|
||||||
// setup replicas map
|
// setup replicas map
|
||||||
|
|
||||||
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
||||||
@ -162,49 +157,25 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep
|
|||||||
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
||||||
};
|
};
|
||||||
|
|
||||||
ReplicaMap replicasMap = dataSet.volumeMap;
|
testUtils.createFinalizedReplica(blocks[FINALIZED]);
|
||||||
try (FsDatasetSpi.FsVolumeReferences references =
|
testUtils.createReplicaInPipeline(blocks[TEMPORARY]);
|
||||||
dataSet.getFsVolumeReferences()) {
|
testUtils.createRBW(blocks[RBW]);
|
||||||
FsVolumeImpl vol = (FsVolumeImpl) references.get(0);
|
testUtils.createReplicaWaitingToBeRecovered(blocks[RWR]);
|
||||||
ReplicaInfo replicaInfo = new FinalizedReplica(
|
testUtils.createReplicaUnderRecovery(blocks[RUR], 2007);
|
||||||
blocks[FINALIZED].getLocalBlock(), vol,
|
|
||||||
vol.getCurrentDir().getParentFile());
|
|
||||||
replicasMap.add(bpid, replicaInfo);
|
|
||||||
replicaInfo.getBlockFile().createNewFile();
|
|
||||||
replicaInfo.getMetaFile().createNewFile();
|
|
||||||
|
|
||||||
replicasMap.add(bpid, new ReplicaInPipeline(
|
|
||||||
blocks[TEMPORARY].getBlockId(),
|
|
||||||
blocks[TEMPORARY].getGenerationStamp(), vol,
|
|
||||||
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock())
|
|
||||||
.getParentFile(), 0));
|
|
||||||
|
|
||||||
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
|
|
||||||
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(),
|
|
||||||
null);
|
|
||||||
replicasMap.add(bpid, replicaInfo);
|
|
||||||
replicaInfo.getBlockFile().createNewFile();
|
|
||||||
replicaInfo.getMetaFile().createNewFile();
|
|
||||||
|
|
||||||
replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
|
|
||||||
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
|
|
||||||
blocks[RWR].getLocalBlock()).getParentFile()));
|
|
||||||
replicasMap
|
|
||||||
.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
|
|
||||||
.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()),
|
|
||||||
2007));
|
|
||||||
}
|
|
||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
private void testAppend(String bpid, FsDatasetSpi<?> dataSet,
|
||||||
|
ExtendedBlock[] blocks) throws IOException {
|
||||||
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
|
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
|
||||||
final FsVolumeImpl v = (FsVolumeImpl)dataSet.volumeMap.get(
|
final FsVolumeSpi v = dataSet.getVolume(blocks[FINALIZED]);
|
||||||
bpid, blocks[FINALIZED].getLocalBlock()).getVolume();
|
if (v instanceof FsVolumeImpl) {
|
||||||
long available = v.getCapacity()-v.getDfsUsed();
|
FsVolumeImpl fvi = (FsVolumeImpl) v;
|
||||||
|
long available = fvi.getCapacity() - fvi.getDfsUsed();
|
||||||
long expectedLen = blocks[FINALIZED].getNumBytes();
|
long expectedLen = blocks[FINALIZED].getNumBytes();
|
||||||
try {
|
try {
|
||||||
v.onBlockFileDeletion(bpid, -available);
|
fvi.onBlockFileDeletion(bpid, -available);
|
||||||
blocks[FINALIZED].setNumBytes(expectedLen + 100);
|
blocks[FINALIZED].setNumBytes(expectedLen + 100);
|
||||||
dataSet.append(blocks[FINALIZED], newGS, expectedLen);
|
dataSet.append(blocks[FINALIZED], newGS, expectedLen);
|
||||||
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
|
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
|
||||||
@ -212,8 +183,9 @@ private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] bloc
|
|||||||
Assert.assertTrue(e.getMessage().startsWith(
|
Assert.assertTrue(e.getMessage().startsWith(
|
||||||
"Insufficient space for appending to "));
|
"Insufficient space for appending to "));
|
||||||
}
|
}
|
||||||
v.onBlockFileDeletion(bpid, available);
|
fvi.onBlockFileDeletion(bpid, available);
|
||||||
blocks[FINALIZED].setNumBytes(expectedLen);
|
blocks[FINALIZED].setNumBytes(expectedLen);
|
||||||
|
}
|
||||||
|
|
||||||
newGS = blocks[RBW].getGenerationStamp()+1;
|
newGS = blocks[RBW].getGenerationStamp()+1;
|
||||||
dataSet.append(blocks[FINALIZED], newGS,
|
dataSet.append(blocks[FINALIZED], newGS,
|
||||||
@ -317,7 +289,7 @@ private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] bloc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testClose(FsDatasetImpl dataSet, ExtendedBlock [] blocks) throws IOException {
|
private void testClose(FsDatasetSpi<?> dataSet, ExtendedBlock [] blocks) throws IOException {
|
||||||
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
|
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
|
||||||
dataSet.recoverClose(blocks[FINALIZED], newGS,
|
dataSet.recoverClose(blocks[FINALIZED], newGS,
|
||||||
blocks[FINALIZED].getNumBytes()); // successful
|
blocks[FINALIZED].getNumBytes()); // successful
|
||||||
@ -544,27 +516,26 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
|
|||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
|
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
|
||||||
getFSDataset(dn);
|
getFSDataset(dn);
|
||||||
ReplicaMap replicaMap = dataSet.volumeMap;
|
|
||||||
|
|
||||||
List<FsVolumeImpl> volumes = null;
|
List<FsVolumeSpi> volumes = null;
|
||||||
try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) {
|
try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) {
|
||||||
// number of volumes should be 2 - [data1, data2]
|
// number of volumes should be 2 - [data1, data2]
|
||||||
assertEquals("number of volumes is wrong", 2, referredVols.size());
|
assertEquals("number of volumes is wrong", 2, referredVols.size());
|
||||||
volumes = new ArrayList<>(referredVols.size());
|
volumes = new ArrayList<>(referredVols.size());
|
||||||
for (FsVolumeSpi vol : referredVols) {
|
for (FsVolumeSpi vol : referredVols) {
|
||||||
volumes.add((FsVolumeImpl) vol);
|
volumes.add(vol);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
|
ArrayList<String> bpList = new ArrayList<>(Arrays.asList(
|
||||||
cluster.getNamesystem(0).getBlockPoolId(),
|
cluster.getNamesystem(0).getBlockPoolId(),
|
||||||
cluster.getNamesystem(1).getBlockPoolId()));
|
cluster.getNamesystem(1).getBlockPoolId()));
|
||||||
|
|
||||||
Assert.assertTrue("Cluster should have 2 block pools",
|
Assert.assertTrue("Cluster should have 2 block pools",
|
||||||
bpList.size() == 2);
|
bpList.size() == 2);
|
||||||
|
|
||||||
createReplicas(bpList, volumes, replicaMap);
|
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
|
||||||
ReplicaMap oldReplicaMap = new ReplicaMap(this);
|
ReplicaMap oldReplicaMap = new ReplicaMap(this);
|
||||||
oldReplicaMap.addAll(replicaMap);
|
oldReplicaMap.addAll(dataSet.volumeMap);
|
||||||
|
|
||||||
cluster.restartDataNode(0);
|
cluster.restartDataNode(0);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
@ -622,49 +593,30 @@ private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createReplicas(List<String> bpList, List<FsVolumeImpl> volumes,
|
private void createReplicas(List<String> bpList, List<FsVolumeSpi> volumes,
|
||||||
ReplicaMap volumeMap) throws IOException {
|
FsDatasetTestUtils testUtils) throws IOException {
|
||||||
Assert.assertTrue("Volume map can't be null" , volumeMap != null);
|
|
||||||
|
|
||||||
// Here we create all different type of replicas and add it
|
// Here we create all different type of replicas and add it
|
||||||
// to volume map.
|
// to volume map.
|
||||||
// Created all type of ReplicaInfo, each under Blkpool corresponding volume
|
// Created all type of ReplicaInfo, each under Blkpool corresponding volume
|
||||||
long id = 1; // This variable is used as both blockId and genStamp
|
long id = 1; // This variable is used as both blockId and genStamp
|
||||||
for (String bpId: bpList) {
|
for (String bpId: bpList) {
|
||||||
for (FsVolumeImpl volume: volumes) {
|
for (FsVolumeSpi volume: volumes) {
|
||||||
ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume,
|
ExtendedBlock eb = new ExtendedBlock(bpId, id, 1, id);
|
||||||
DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id));
|
testUtils.createFinalizedReplica(volume, eb);
|
||||||
volumeMap.add(bpId, finalizedReplica);
|
|
||||||
id++;
|
id++;
|
||||||
|
|
||||||
ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume,
|
eb = new ExtendedBlock(bpId, id, 1, id);
|
||||||
volume.getRbwDir(bpId), null, 100);
|
testUtils.createRBW(volume, eb);
|
||||||
volumeMap.add(bpId, rbwReplica);
|
|
||||||
id++;
|
id++;
|
||||||
|
|
||||||
ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id,
|
eb = new ExtendedBlock(bpId, id, 1, id);
|
||||||
volume, volume.getRbwDir(bpId));
|
testUtils.createReplicaWaitingToBeRecovered(volume, eb);
|
||||||
volumeMap.add(bpId, rwrReplica);
|
|
||||||
id++;
|
id++;
|
||||||
|
|
||||||
ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume,
|
eb = new ExtendedBlock(bpId, id, 1, id);
|
||||||
volume.getTmpDir(bpId), 0);
|
testUtils.createReplicaInPipeline(volume, eb);
|
||||||
volumeMap.add(bpId, ripReplica);
|
|
||||||
id++;
|
id++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String bpId: bpList) {
|
|
||||||
for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) {
|
|
||||||
File parentFile = replicaInfo.getBlockFile().getParentFile();
|
|
||||||
if (!parentFile.exists()) {
|
|
||||||
if (!parentFile.mkdirs()) {
|
|
||||||
throw new IOException("Failed to mkdirs " + parentFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
replicaInfo.getBlockFile().createNewFile();
|
|
||||||
replicaInfo.getMetaFile().createNewFile();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user