Revert "Revert "HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via Colin P. McCabe)""
This reverts commit 7f393a6f61
.
This commit is contained in:
parent
07b0fb996a
commit
de522d2cd4
@ -1448,6 +1448,8 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-9019. Adding informative message to sticky bit permission denied
|
HDFS-9019. Adding informative message to sticky bit permission denied
|
||||||
exception. (xyao)
|
exception. (xyao)
|
||||||
|
|
||||||
|
HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via Colin P. McCabe)
|
||||||
|
|
||||||
HDFS-8716. Introduce a new config specifically for safe mode block count
|
HDFS-8716. Introduce a new config specifically for safe mode block count
|
||||||
(Chang Li via kihwal)
|
(Chang Li via kihwal)
|
||||||
|
|
||||||
|
@ -27,7 +27,6 @@
|
|||||||
* This class describes a replica that has been finalized.
|
* This class describes a replica that has been finalized.
|
||||||
*/
|
*/
|
||||||
public class FinalizedReplica extends ReplicaInfo {
|
public class FinalizedReplica extends ReplicaInfo {
|
||||||
private boolean unlinked; // copy-on-write done for block
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
@ -58,7 +57,6 @@ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
|||||||
*/
|
*/
|
||||||
public FinalizedReplica(FinalizedReplica from) {
|
public FinalizedReplica(FinalizedReplica from) {
|
||||||
super(from);
|
super(from);
|
||||||
this.unlinked = from.isUnlinked();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ReplicaInfo
|
@Override // ReplicaInfo
|
||||||
@ -66,16 +64,6 @@ public ReplicaState getState() {
|
|||||||
return ReplicaState.FINALIZED;
|
return ReplicaState.FINALIZED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ReplicaInfo
|
|
||||||
public boolean isUnlinked() {
|
|
||||||
return unlinked;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override // ReplicaInfo
|
|
||||||
public void setUnlinked() {
|
|
||||||
unlinked = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getVisibleLength() {
|
public long getVisibleLength() {
|
||||||
return getNumBytes(); // all bytes are visible
|
return getNumBytes(); // all bytes are visible
|
||||||
@ -98,7 +86,6 @@ public int hashCode() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return super.toString()
|
return super.toString();
|
||||||
+ "\n unlinked =" + unlinked;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,18 +18,12 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.fs.HardLink;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.util.LightWeightResizableGSet;
|
import org.apache.hadoop.util.LightWeightResizableGSet;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -199,22 +193,6 @@ public static ReplicaDirInfo parseBaseDir(File dir) {
|
|||||||
return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
|
return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* check if this replica has already been unlinked.
|
|
||||||
* @return true if the replica has already been unlinked
|
|
||||||
* or no need to be detached; false otherwise
|
|
||||||
*/
|
|
||||||
public boolean isUnlinked() {
|
|
||||||
return true; // no need to be unlinked
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* set that this replica is unlinked
|
|
||||||
*/
|
|
||||||
public void setUnlinked() {
|
|
||||||
// no need to be unlinked
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of bytes reserved for this replica on disk.
|
* Number of bytes reserved for this replica on disk.
|
||||||
*/
|
*/
|
||||||
@ -232,72 +210,6 @@ public long getOriginalBytesReserved() {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Copy specified file into a temporary file. Then rename the
|
|
||||||
* temporary file to the original name. This will cause any
|
|
||||||
* hardlinks to the original file to be removed. The temporary
|
|
||||||
* files are created in the same directory. The temporary files will
|
|
||||||
* be recovered (especially on Windows) on datanode restart.
|
|
||||||
*/
|
|
||||||
private void unlinkFile(File file, Block b) throws IOException {
|
|
||||||
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
|
|
||||||
try {
|
|
||||||
FileInputStream in = new FileInputStream(file);
|
|
||||||
try {
|
|
||||||
FileOutputStream out = new FileOutputStream(tmpFile);
|
|
||||||
try {
|
|
||||||
IOUtils.copyBytes(in, out, 16*1024);
|
|
||||||
} finally {
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
in.close();
|
|
||||||
}
|
|
||||||
if (file.length() != tmpFile.length()) {
|
|
||||||
throw new IOException("Copy of file " + file + " size " + file.length()+
|
|
||||||
" into file " + tmpFile +
|
|
||||||
" resulted in a size of " + tmpFile.length());
|
|
||||||
}
|
|
||||||
FileUtil.replaceFile(tmpFile, file);
|
|
||||||
} catch (IOException e) {
|
|
||||||
boolean done = tmpFile.delete();
|
|
||||||
if (!done) {
|
|
||||||
DataNode.LOG.info("detachFile failed to delete temporary file " +
|
|
||||||
tmpFile);
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove a hard link by copying the block to a temporary place and
|
|
||||||
* then moving it back
|
|
||||||
* @param numLinks number of hard links
|
|
||||||
* @return true if copy is successful;
|
|
||||||
* false if it is already detached or no need to be detached
|
|
||||||
* @throws IOException if there is any copy error
|
|
||||||
*/
|
|
||||||
public boolean unlinkBlock(int numLinks) throws IOException {
|
|
||||||
if (isUnlinked()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
File file = getBlockFile();
|
|
||||||
if (file == null || getVolume() == null) {
|
|
||||||
throw new IOException("detachBlock:Block not found. " + this);
|
|
||||||
}
|
|
||||||
File meta = getMetaFile();
|
|
||||||
|
|
||||||
if (HardLink.getLinkCount(file) > numLinks) {
|
|
||||||
DataNode.LOG.info("CopyOnWrite for block " + this);
|
|
||||||
unlinkFile(file, this);
|
|
||||||
}
|
|
||||||
if (HardLink.getLinkCount(meta) > numLinks) {
|
|
||||||
unlinkFile(meta, this);
|
|
||||||
}
|
|
||||||
setUnlinked();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override //Object
|
@Override //Object
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName()
|
return getClass().getSimpleName()
|
||||||
|
@ -86,16 +86,6 @@ public ReplicaInfo getOriginalReplica() {
|
|||||||
return original;
|
return original;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
|
||||||
public boolean isUnlinked() {
|
|
||||||
return original.isUnlinked();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
|
||||||
public void setUnlinked() {
|
|
||||||
original.setUnlinked();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
@Override //ReplicaInfo
|
||||||
public ReplicaState getState() {
|
public ReplicaState getState() {
|
||||||
return ReplicaState.RUR;
|
return ReplicaState.RUR;
|
||||||
|
@ -33,7 +33,6 @@
|
|||||||
* lease recovery.
|
* lease recovery.
|
||||||
*/
|
*/
|
||||||
public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
||||||
private boolean unlinked; // copy-on-write done for block
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
@ -64,7 +63,6 @@ public ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) {
|
|||||||
*/
|
*/
|
||||||
public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
|
public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
|
||||||
super(from);
|
super(from);
|
||||||
this.unlinked = from.isUnlinked();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
@Override //ReplicaInfo
|
||||||
@ -72,16 +70,6 @@ public ReplicaState getState() {
|
|||||||
return ReplicaState.RWR;
|
return ReplicaState.RWR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
|
||||||
public boolean isUnlinked() {
|
|
||||||
return unlinked;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
|
||||||
public void setUnlinked() {
|
|
||||||
unlinked = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override //ReplicaInfo
|
@Override //ReplicaInfo
|
||||||
public long getVisibleLength() {
|
public long getVisibleLength() {
|
||||||
return -1; //no bytes are visible
|
return -1; //no bytes are visible
|
||||||
@ -104,7 +92,6 @@ public int hashCode() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return super.toString()
|
return super.toString();
|
||||||
+ "\n unlinked=" + unlinked;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1128,8 +1128,6 @@ private synchronized ReplicaBeingWritten append(String bpid,
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
// If the block is cached, start uncaching it.
|
// If the block is cached, start uncaching it.
|
||||||
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
|
||||||
// unlink the finalized replica
|
|
||||||
replicaInfo.unlinkBlock(1);
|
|
||||||
|
|
||||||
// construct a RBW replica with the new GS
|
// construct a RBW replica with the new GS
|
||||||
File blkfile = replicaInfo.getBlockFile();
|
File blkfile = replicaInfo.getBlockFile();
|
||||||
@ -2495,7 +2493,6 @@ private FinalizedReplica updateReplicaUnderRecovery(
|
|||||||
+ ", rur=" + rur);
|
+ ", rur=" + rur);
|
||||||
}
|
}
|
||||||
if (rur.getNumBytes() > newlength) {
|
if (rur.getNumBytes() > newlength) {
|
||||||
rur.unlinkBlock(1);
|
|
||||||
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
|
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
|
||||||
if(!copyOnTruncate) {
|
if(!copyOnTruncate) {
|
||||||
// update RUR with the new length
|
// update RUR with the new length
|
||||||
|
@ -109,78 +109,6 @@ private void checkFile(DistributedFileSystem fileSys, Path name, int repl)
|
|||||||
expected, "Read 1", false);
|
expected, "Read 1", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that copy on write for blocks works correctly
|
|
||||||
* @throws IOException an exception might be thrown
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testCopyOnWrite() throws IOException {
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
if (simulatedStorage) {
|
|
||||||
SimulatedFSDataset.setFactory(conf);
|
|
||||||
}
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
|
||||||
InetSocketAddress addr = new InetSocketAddress("localhost",
|
|
||||||
cluster.getNameNodePort());
|
|
||||||
DFSClient client = new DFSClient(addr, conf);
|
|
||||||
try {
|
|
||||||
|
|
||||||
// create a new file, write to it and close it.
|
|
||||||
//
|
|
||||||
Path file1 = new Path("/filestatus.dat");
|
|
||||||
FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
|
|
||||||
writeFile(stm);
|
|
||||||
stm.close();
|
|
||||||
|
|
||||||
// Get a handle to the datanode
|
|
||||||
DataNode[] dn = cluster.listDataNodes();
|
|
||||||
assertTrue("There should be only one datanode but found " + dn.length,
|
|
||||||
dn.length == 1);
|
|
||||||
|
|
||||||
LocatedBlocks locations = client.getNamenode().getBlockLocations(
|
|
||||||
file1.toString(), 0, Long.MAX_VALUE);
|
|
||||||
List<LocatedBlock> blocks = locations.getLocatedBlocks();
|
|
||||||
|
|
||||||
//
|
|
||||||
// Create hard links for a few of the blocks
|
|
||||||
//
|
|
||||||
for (int i = 0; i < blocks.size(); i = i + 2) {
|
|
||||||
ExtendedBlock b = blocks.get(i).getBlock();
|
|
||||||
final File f = DataNodeTestUtils.getFile(dn[0],
|
|
||||||
b.getBlockPoolId(), b.getLocalBlock().getBlockId());
|
|
||||||
File link = new File(f.toString() + ".link");
|
|
||||||
System.out.println("Creating hardlink for File " + f + " to " + link);
|
|
||||||
HardLink.createHardLink(f, link);
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Detach all blocks. This should remove hardlinks (if any)
|
|
||||||
//
|
|
||||||
for (int i = 0; i < blocks.size(); i++) {
|
|
||||||
ExtendedBlock b = blocks.get(i).getBlock();
|
|
||||||
System.out.println("testCopyOnWrite detaching block " + b);
|
|
||||||
assertTrue("Detaching block " + b + " should have returned true",
|
|
||||||
DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since the blocks were already detached earlier, these calls should
|
|
||||||
// return false
|
|
||||||
//
|
|
||||||
for (int i = 0; i < blocks.size(); i++) {
|
|
||||||
ExtendedBlock b = blocks.get(i).getBlock();
|
|
||||||
System.out.println("testCopyOnWrite detaching block " + b);
|
|
||||||
assertTrue("Detaching block " + b + " should have returned false",
|
|
||||||
!DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
client.close();
|
|
||||||
fs.close();
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test a simple flush on a simple HDFS file.
|
* Test a simple flush on a simple HDFS file.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
|
@ -35,7 +35,6 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
@ -159,20 +158,6 @@ public static FsDatasetSpi<?> getFSDataset(DataNode dn) {
|
|||||||
return dn.getFSDataset();
|
return dn.getFSDataset();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static File getFile(DataNode dn, String bpid, long bid) {
|
|
||||||
return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static File getBlockFile(DataNode dn, String bpid, Block b
|
|
||||||
) throws IOException {
|
|
||||||
return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
|
|
||||||
) throws IOException {
|
|
||||||
return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch a copy of ReplicaInfo from a datanode by block id
|
* Fetch a copy of ReplicaInfo from a datanode by block id
|
||||||
* @param dn datanode to retrieve a replicainfo object from
|
* @param dn datanode to retrieve a replicainfo object from
|
||||||
|
@ -55,12 +55,6 @@ public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
|
|||||||
.getGenerationStamp());
|
.getGenerationStamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
|
|
||||||
ExtendedBlock block, int numLinks) throws IOException {
|
|
||||||
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
|
|
||||||
return info.unlinkBlock(numLinks);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
||||||
final String bpid, final long blockId) {
|
final String bpid, final long blockId) {
|
||||||
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
||||||
|
@ -143,79 +143,7 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// test recovering unlinked tmp replicas
|
|
||||||
@Test public void testRecoverReplicas() throws Exception {
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
|
|
||||||
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
||||||
cluster.waitActive();
|
|
||||||
try {
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
|
||||||
for (int i=0; i<4; i++) {
|
|
||||||
Path fileName = new Path("/test"+i);
|
|
||||||
DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
|
|
||||||
DFSTestUtil.waitReplication(fs, fileName, (short)1);
|
|
||||||
}
|
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
|
||||||
Iterator<ReplicaInfo> replicasItor =
|
|
||||||
dataset(dn).volumeMap.replicas(bpid).iterator();
|
|
||||||
ReplicaInfo replica = replicasItor.next();
|
|
||||||
createUnlinkTmpFile(replica, true, true); // rename block file
|
|
||||||
createUnlinkTmpFile(replica, false, true); // rename meta file
|
|
||||||
replica = replicasItor.next();
|
|
||||||
createUnlinkTmpFile(replica, true, false); // copy block file
|
|
||||||
createUnlinkTmpFile(replica, false, false); // copy meta file
|
|
||||||
replica = replicasItor.next();
|
|
||||||
createUnlinkTmpFile(replica, true, true); // rename block file
|
|
||||||
createUnlinkTmpFile(replica, false, false); // copy meta file
|
|
||||||
|
|
||||||
cluster.restartDataNodes();
|
|
||||||
cluster.waitActive();
|
|
||||||
dn = cluster.getDataNodes().get(0);
|
|
||||||
|
|
||||||
// check volumeMap: 4 finalized replica
|
|
||||||
Collection<ReplicaInfo> replicas = dataset(dn).volumeMap.replicas(bpid);
|
|
||||||
Assert.assertEquals(4, replicas.size());
|
|
||||||
replicasItor = replicas.iterator();
|
|
||||||
while (replicasItor.hasNext()) {
|
|
||||||
Assert.assertEquals(ReplicaState.FINALIZED,
|
|
||||||
replicasItor.next().getState());
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static FsDatasetImpl dataset(DataNode dn) {
|
private static FsDatasetImpl dataset(DataNode dn) {
|
||||||
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
|
|
||||||
boolean changeBlockFile,
|
|
||||||
boolean isRename) throws IOException {
|
|
||||||
File src;
|
|
||||||
if (changeBlockFile) {
|
|
||||||
src = replicaInfo.getBlockFile();
|
|
||||||
} else {
|
|
||||||
src = replicaInfo.getMetaFile();
|
|
||||||
}
|
|
||||||
File dst = DatanodeUtil.getUnlinkTmpFile(src);
|
|
||||||
if (isRename) {
|
|
||||||
src.renameTo(dst);
|
|
||||||
} else {
|
|
||||||
FileInputStream in = new FileInputStream(src);
|
|
||||||
try {
|
|
||||||
FileOutputStream out = new FileOutputStream(dst);
|
|
||||||
try {
|
|
||||||
IOUtils.copyBytes(in, out, 1);
|
|
||||||
} finally {
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
in.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user