diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6755f0041e..7fe585041b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1448,8 +1448,6 @@ Release 2.8.0 - UNRELEASED HDFS-9019. Adding informative message to sticky bit permission denied exception. (xyao) - HDFS-8860. Remove unused Replica copyOnWrite code (Lei (Eddy) Xu via Colin P. McCabe) - HDFS-8716. Introduce a new config specifically for safe mode block count (Chang Li via kihwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index 8daeb51e0d..cc3287406a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -27,6 +27,7 @@ * This class describes a replica that has been finalized. */ public class FinalizedReplica extends ReplicaInfo { + private boolean unlinked; // copy-on-write done for block /** * Constructor @@ -57,6 +58,7 @@ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { */ public FinalizedReplica(FinalizedReplica from) { super(from); + this.unlinked = from.isUnlinked(); } @Override // ReplicaInfo @@ -64,6 +66,16 @@ public ReplicaState getState() { return ReplicaState.FINALIZED; } + @Override // ReplicaInfo + public boolean isUnlinked() { + return unlinked; + } + + @Override // ReplicaInfo + public void setUnlinked() { + unlinked = true; + } + @Override public long getVisibleLength() { return getNumBytes(); // all bytes are visible @@ -86,6 +98,7 @@ public int hashCode() { @Override public String toString() { - return super.toString(); + return super.toString() + + "\n unlinked =" + unlinked; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index d19e656c9d..e41cce0535 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -18,12 +18,18 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.LightWeightResizableGSet; import com.google.common.annotations.VisibleForTesting; @@ -193,6 +199,22 @@ public static ReplicaDirInfo parseBaseDir(File dir) { return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs); } + /** + * check if this replica has already been unlinked. + * @return true if the replica has already been unlinked + * or no need to be detached; false otherwise + */ + public boolean isUnlinked() { + return true; // no need to be unlinked + } + + /** + * set that this replica is unlinked + */ + public void setUnlinked() { + // no need to be unlinked + } + /** * Number of bytes reserved for this replica on disk. */ @@ -210,6 +232,72 @@ public long getOriginalBytesReserved() { return 0; } + /** + * Copy specified file into a temporary file. Then rename the + * temporary file to the original name. This will cause any + * hardlinks to the original file to be removed. The temporary + * files are created in the same directory. The temporary files will + * be recovered (especially on Windows) on datanode restart. + */ + private void unlinkFile(File file, Block b) throws IOException { + File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); + try { + FileInputStream in = new FileInputStream(file); + try { + FileOutputStream out = new FileOutputStream(tmpFile); + try { + IOUtils.copyBytes(in, out, 16*1024); + } finally { + out.close(); + } + } finally { + in.close(); + } + if (file.length() != tmpFile.length()) { + throw new IOException("Copy of file " + file + " size " + file.length()+ + " into file " + tmpFile + + " resulted in a size of " + tmpFile.length()); + } + FileUtil.replaceFile(tmpFile, file); + } catch (IOException e) { + boolean done = tmpFile.delete(); + if (!done) { + DataNode.LOG.info("detachFile failed to delete temporary file " + + tmpFile); + } + throw e; + } + } + + /** + * Remove a hard link by copying the block to a temporary place and + * then moving it back + * @param numLinks number of hard links + * @return true if copy is successful; + * false if it is already detached or no need to be detached + * @throws IOException if there is any copy error + */ + public boolean unlinkBlock(int numLinks) throws IOException { + if (isUnlinked()) { + return false; + } + File file = getBlockFile(); + if (file == null || getVolume() == null) { + throw new IOException("detachBlock:Block not found. " + this); + } + File meta = getMetaFile(); + + if (HardLink.getLinkCount(file) > numLinks) { + DataNode.LOG.info("CopyOnWrite for block " + this); + unlinkFile(file, this); + } + if (HardLink.getLinkCount(meta) > numLinks) { + unlinkFile(meta, this); + } + setUnlinked(); + return true; + } + @Override //Object public String toString() { return getClass().getSimpleName() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java index 558ee21753..2cd8a01218 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java @@ -85,6 +85,16 @@ public void setRecoveryID(long recoveryId) { public ReplicaInfo getOriginalReplica() { return original; } + + @Override //ReplicaInfo + public boolean isUnlinked() { + return original.isUnlinked(); + } + + @Override //ReplicaInfo + public void setUnlinked() { + original.setUnlinked(); + } @Override //ReplicaInfo public ReplicaState getState() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java index 220649d1eb..26ab3dbe24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java @@ -33,6 +33,7 @@ * lease recovery. */ public class ReplicaWaitingToBeRecovered extends ReplicaInfo { + private boolean unlinked; // copy-on-write done for block /** * Constructor @@ -63,6 +64,7 @@ public ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) { */ public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) { super(from); + this.unlinked = from.isUnlinked(); } @Override //ReplicaInfo @@ -70,6 +72,16 @@ public ReplicaState getState() { return ReplicaState.RWR; } + @Override //ReplicaInfo + public boolean isUnlinked() { + return unlinked; + } + + @Override //ReplicaInfo + public void setUnlinked() { + unlinked = true; + } + @Override //ReplicaInfo public long getVisibleLength() { return -1; //no bytes are visible @@ -92,6 +104,7 @@ public int hashCode() { @Override public String toString() { - return super.toString(); + return super.toString() + + "\n unlinked=" + unlinked; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 466c7e9325..1d8c705f7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1109,6 +1109,8 @@ private synchronized ReplicaBeingWritten append(String bpid, throws IOException { // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); + // unlink the finalized replica + replicaInfo.unlinkBlock(1); // construct a RBW replica with the new GS File blkfile = replicaInfo.getBlockFile(); @@ -2478,6 +2480,7 @@ private FinalizedReplica updateReplicaUnderRecovery( + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { + rur.unlinkBlock(1); truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); if(!copyOnTruncate) { // update RUR with the new length diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 7b7f41573b..85d92c91b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -109,6 +109,78 @@ private void checkFile(DistributedFileSystem fileSys, Path name, int repl) expected, "Read 1", false); } + /** + * Test that copy on write for blocks works correctly + * @throws IOException an exception might be thrown + */ + @Test + public void testCopyOnWrite() throws IOException { + Configuration conf = new HdfsConfiguration(); + if (simulatedStorage) { + SimulatedFSDataset.setFactory(conf); + } + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + FileSystem fs = cluster.getFileSystem(); + InetSocketAddress addr = new InetSocketAddress("localhost", + cluster.getNameNodePort()); + DFSClient client = new DFSClient(addr, conf); + try { + + // create a new file, write to it and close it. + // + Path file1 = new Path("/filestatus.dat"); + FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1); + writeFile(stm); + stm.close(); + + // Get a handle to the datanode + DataNode[] dn = cluster.listDataNodes(); + assertTrue("There should be only one datanode but found " + dn.length, + dn.length == 1); + + LocatedBlocks locations = client.getNamenode().getBlockLocations( + file1.toString(), 0, Long.MAX_VALUE); + List blocks = locations.getLocatedBlocks(); + + // + // Create hard links for a few of the blocks + // + for (int i = 0; i < blocks.size(); i = i + 2) { + ExtendedBlock b = blocks.get(i).getBlock(); + final File f = DataNodeTestUtils.getFile(dn[0], + b.getBlockPoolId(), b.getLocalBlock().getBlockId()); + File link = new File(f.toString() + ".link"); + System.out.println("Creating hardlink for File " + f + " to " + link); + HardLink.createHardLink(f, link); + } + + // + // Detach all blocks. This should remove hardlinks (if any) + // + for (int i = 0; i < blocks.size(); i++) { + ExtendedBlock b = blocks.get(i).getBlock(); + System.out.println("testCopyOnWrite detaching block " + b); + assertTrue("Detaching block " + b + " should have returned true", + DataNodeTestUtils.unlinkBlock(dn[0], b, 1)); + } + + // Since the blocks were already detached earlier, these calls should + // return false + // + for (int i = 0; i < blocks.size(); i++) { + ExtendedBlock b = blocks.get(i).getBlock(); + System.out.println("testCopyOnWrite detaching block " + b); + assertTrue("Detaching block " + b + " should have returned false", + !DataNodeTestUtils.unlinkBlock(dn[0], b, 1)); + } + + } finally { + client.close(); + fs.close(); + cluster.shutdown(); + } + } + /** * Test a simple flush on a simple HDFS file. * @throws IOException an exception might be thrown diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 1d4719279f..6bcbb1d3be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; @@ -158,6 +159,20 @@ public static FsDatasetSpi getFSDataset(DataNode dn) { return dn.getFSDataset(); } + public static File getFile(DataNode dn, String bpid, long bid) { + return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid); + } + + public static File getBlockFile(DataNode dn, String bpid, Block b + ) throws IOException { + return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b); + } + + public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks + ) throws IOException { + return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks); + } + /** * Fetch a copy of ReplicaInfo from a datanode by block id * @param dn datanode to retrieve a replicainfo object from diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java index f4480a16b9..9c297e881c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java @@ -54,6 +54,12 @@ public static File getMetaFile(FsDatasetSpi fsd, String bpid, Block b) return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b .getGenerationStamp()); } + + public static boolean unlinkBlock(FsDatasetSpi fsd, + ExtendedBlock block, int numLinks) throws IOException { + final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block); + return info.unlinkBlock(numLinks); + } public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi fsd, final String bpid, final long blockId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java index 8bbac9f2f0..4516696f1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java @@ -143,7 +143,79 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) } } + // test recovering unlinked tmp replicas + @Test public void testRecoverReplicas() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + try { + FileSystem fs = cluster.getFileSystem(); + for (int i=0; i<4; i++) { + Path fileName = new Path("/test"+i); + DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L); + DFSTestUtil.waitReplication(fs, fileName, (short)1); + } + String bpid = cluster.getNamesystem().getBlockPoolId(); + DataNode dn = cluster.getDataNodes().get(0); + Iterator replicasItor = + dataset(dn).volumeMap.replicas(bpid).iterator(); + ReplicaInfo replica = replicasItor.next(); + createUnlinkTmpFile(replica, true, true); // rename block file + createUnlinkTmpFile(replica, false, true); // rename meta file + replica = replicasItor.next(); + createUnlinkTmpFile(replica, true, false); // copy block file + createUnlinkTmpFile(replica, false, false); // copy meta file + replica = replicasItor.next(); + createUnlinkTmpFile(replica, true, true); // rename block file + createUnlinkTmpFile(replica, false, false); // copy meta file + + cluster.restartDataNodes(); + cluster.waitActive(); + dn = cluster.getDataNodes().get(0); + + // check volumeMap: 4 finalized replica + Collection replicas = dataset(dn).volumeMap.replicas(bpid); + Assert.assertEquals(4, replicas.size()); + replicasItor = replicas.iterator(); + while (replicasItor.hasNext()) { + Assert.assertEquals(ReplicaState.FINALIZED, + replicasItor.next().getState()); + } + } finally { + cluster.shutdown(); + } + } + private static FsDatasetImpl dataset(DataNode dn) { return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); } + + private static void createUnlinkTmpFile(ReplicaInfo replicaInfo, + boolean changeBlockFile, + boolean isRename) throws IOException { + File src; + if (changeBlockFile) { + src = replicaInfo.getBlockFile(); + } else { + src = replicaInfo.getMetaFile(); + } + File dst = DatanodeUtil.getUnlinkTmpFile(src); + if (isRename) { + src.renameTo(dst); + } else { + FileInputStream in = new FileInputStream(src); + try { + FileOutputStream out = new FileOutputStream(dst); + try { + IOUtils.copyBytes(in, out, 1); + } finally { + out.close(); + } + } finally { + in.close(); + } + } + } }