diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4412b30710..5a30d0a70d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -612,6 +612,12 @@ Release 2.6.0 - UNRELEASED HDFS-6862. Add missing timeout annotations to tests. (Xiaoyu Yao via Arpit Agarwal) + HDFS-6898. DN must reserve space for a full block when an RBW block is + created. (Arpit Agarwal) + + HDFS-7025. HDFS Credential Provider related Unit Test Failure. + (Xiaoyu Yao via cnauroth) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 77fe543784..240dcd01ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -48,7 +48,7 @@ protected HdfsConstants() { "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol"; - public static final int MIN_BLOCKS_FOR_WRITE = 5; + public static final int MIN_BLOCKS_FOR_WRITE = 1; // Long that indicates "leave current quota unchanged" public static final long QUOTA_DONT_SET = Long.MAX_VALUE; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java index 728dd3806f..4a89493f03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java @@ -34,10 +34,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline { * @param genStamp replica generation stamp * @param vol volume where replica is located * @param dir directory path where block and meta files are located + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ public ReplicaBeingWritten(long blockId, long genStamp, - FsVolumeSpi vol, File dir) { - super( blockId, genStamp, vol, dir); + FsVolumeSpi vol, File dir, long bytesToReserve) { + super(blockId, genStamp, vol, dir, bytesToReserve); } /** @@ -60,10 +62,12 @@ public ReplicaBeingWritten(Block block, * @param vol volume where replica is located * @param dir directory path where block and meta files are located * @param writer a thread that is writing to this replica + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ public ReplicaBeingWritten(long blockId, long len, long genStamp, - FsVolumeSpi vol, File dir, Thread writer ) { - super( blockId, len, genStamp, vol, dir, writer); + FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { + super(blockId, len, genStamp, vol, dir, writer, bytesToReserve); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index dc57688834..45862ca771 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -44,6 +44,13 @@ public class ReplicaInPipeline extends ReplicaInfo private long bytesOnDisk; private byte[] lastChecksum; private Thread writer; + + /** + * Bytes reserved for this replica on the containing volume. + * Based off difference between the estimated maximum block length and + * the bytes already written to this block. + */ + private long bytesReserved; /** * Constructor for a zero length replica @@ -51,10 +58,12 @@ public class ReplicaInPipeline extends ReplicaInfo * @param genStamp replica generation stamp * @param vol volume where replica is located * @param dir directory path where block and meta files are located + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ public ReplicaInPipeline(long blockId, long genStamp, - FsVolumeSpi vol, File dir) { - this( blockId, 0L, genStamp, vol, dir, Thread.currentThread()); + FsVolumeSpi vol, File dir, long bytesToReserve) { + this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve); } /** @@ -67,7 +76,7 @@ public ReplicaInPipeline(long blockId, long genStamp, ReplicaInPipeline(Block block, FsVolumeSpi vol, File dir, Thread writer) { this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), - vol, dir, writer); + vol, dir, writer, 0L); } /** @@ -78,13 +87,16 @@ public ReplicaInPipeline(long blockId, long genStamp, * @param vol volume where replica is located * @param dir directory path where block and meta files are located * @param writer a thread that is writing to this replica + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ ReplicaInPipeline(long blockId, long len, long genStamp, - FsVolumeSpi vol, File dir, Thread writer ) { + FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { super( blockId, len, genStamp, vol, dir); this.bytesAcked = len; this.bytesOnDisk = len; this.writer = writer; + this.bytesReserved = bytesToReserve; } /** @@ -96,6 +108,7 @@ public ReplicaInPipeline(ReplicaInPipeline from) { this.bytesAcked = from.getBytesAcked(); this.bytesOnDisk = from.getBytesOnDisk(); this.writer = from.writer; + this.bytesReserved = from.bytesReserved; } @Override @@ -115,13 +128,25 @@ public long getBytesAcked() { @Override // ReplicaInPipelineInterface public void setBytesAcked(long bytesAcked) { + long newBytesAcked = bytesAcked - this.bytesAcked; this.bytesAcked = bytesAcked; + + // Once bytes are ACK'ed we can release equivalent space from the + // volume's reservedForRbw count. We could have released it as soon + // as the write-to-disk completed but that would be inefficient. + getVolume().releaseReservedSpace(newBytesAcked); + bytesReserved -= newBytesAcked; } @Override // ReplicaInPipelineInterface public long getBytesOnDisk() { return bytesOnDisk; } + + @Override + public long getBytesReserved() { + return bytesReserved; + } @Override // ReplicaInPipelineInterface public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index a10e45b179..940d3eb516 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -211,6 +211,13 @@ public boolean isUnlinked() { public void setUnlinked() { // no need to be unlinked } + + /** + * Number of bytes reserved for this replica on disk. + */ + public long getBytesReserved() { + return 0; + } /** * Copy specified file into a temporary file. Then rename the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index e141cd77e6..8ebf2b4a8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -48,4 +48,15 @@ public interface FsVolumeSpi { /** Returns true if the volume is NOT backed by persistent storage. */ public boolean isTransientStorage(); -} \ No newline at end of file + + /** + * Reserve disk space for an RBW block so a writer does not run out of + * space before the block is full. + */ + public void reserveSpaceForRbw(long bytesToReserve); + + /** + * Release disk space previously reserved for RBW block. + */ + public void releaseReservedSpace(long bytesToRelease); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index d8b1f5f9f6..57e05cc927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -247,7 +247,7 @@ File createRbwFile(Block b) throws IOException { return DatanodeUtil.createTmpFile(b, f); } - File addBlock(Block b, File f) throws IOException { + File addFinalizedBlock(Block b, File f) throws IOException { File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); if (!blockDir.exists()) { if (!blockDir.mkdirs()) { @@ -436,9 +436,11 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, // The restart meta file exists if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { // It didn't expire. Load the replica as a RBW. + // We don't know the expected block length, so just use 0 + // and don't reserve any more space for writes. newReplica = new ReplicaBeingWritten(blockId, validateIntegrityAndSetLength(file, genStamp), - genStamp, volume, file.getParentFile(), null); + genStamp, volume, file.getParentFile(), null, 0); loadRwr = false; } sc.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 606b0679e5..b243f273b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -612,7 +612,7 @@ static File moveBlockFiles(Block b, File srcfile, File destdir) + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); } if (LOG.isDebugEnabled()) { - LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta + LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta + " and " + srcfile + " to " + dstfile); } return dstfile; @@ -760,7 +760,7 @@ private synchronized ReplicaBeingWritten append(String bpid, File oldmeta = replicaInfo.getMetaFile(); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, - v, newBlkFile.getParentFile(), Thread.currentThread()); + v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen); File newmeta = newReplicaInfo.getMetaFile(); // rename meta file to rbw directory @@ -796,7 +796,7 @@ private synchronized ReplicaBeingWritten append(String bpid, // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - + v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes()); return newReplicaInfo; } @@ -941,7 +941,7 @@ public synchronized ReplicaInPipeline createRbw(StorageType storageType, // create an rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile()); + b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; @@ -1058,7 +1058,7 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw( // create RBW final ReplicaBeingWritten rbw = new ReplicaBeingWritten( blockId, numBytes, expectedGs, - v, dest.getParentFile(), Thread.currentThread()); + v, dest.getParentFile(), Thread.currentThread(), 0); rbw.setBytesAcked(visible); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); @@ -1079,7 +1079,7 @@ public synchronized ReplicaInPipeline createTemporary(StorageType storageType, // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile()); + b.getGenerationStamp(), v, f.getParentFile(), 0); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -1144,7 +1144,8 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, " for block " + replicaInfo); } - File dest = v.addBlock(bpid, replicaInfo, f); + File dest = v.addFinalizedBlock( + bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); if (v.isTransientStorage()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index ccfb4495e5..276aa5b946 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -62,6 +63,9 @@ public class FsVolumeImpl implements FsVolumeSpi { private final DF usage; private final long reserved; + // Disk space reserved for open blocks. + private AtomicLong reservedForRbw; + // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just // query from the filesystem. @@ -82,6 +86,7 @@ public class FsVolumeImpl implements FsVolumeSpi { this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); + this.reservedForRbw = new AtomicLong(0L); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); @@ -166,13 +171,18 @@ public void setCapacityForTesting(long capacity) { @Override public long getAvailable() throws IOException { - long remaining = getCapacity()-getDfsUsed(); + long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get(); long available = usage.getAvailable(); if (remaining > available) { remaining = available; } return (remaining > 0) ? remaining : 0; } + + @VisibleForTesting + public long getReservedForRbw() { + return reservedForRbw.get(); + } long getReserved(){ return reserved; @@ -222,16 +232,58 @@ File createTmpFile(String bpid, Block b) throws IOException { return getBlockPoolSlice(bpid).createTmpFile(b); } + @Override + public void reserveSpaceForRbw(long bytesToReserve) { + if (bytesToReserve != 0) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath()); + } + reservedForRbw.addAndGet(bytesToReserve); + } + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + if (bytesToRelease != 0) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath()); + } + + long oldReservation, newReservation; + do { + oldReservation = reservedForRbw.get(); + newReservation = oldReservation - bytesToRelease; + if (newReservation < 0) { + // Failsafe, this should never occur in practice, but if it does we don't + // want to start advertising more space than we have available. + newReservation = 0; + } + } while (!reservedForRbw.compareAndSet(oldReservation, newReservation)); + } + } + /** * RBW files. They get moved to the finalized block directory when * the block is finalized. */ File createRbwFile(String bpid, Block b) throws IOException { + reserveSpaceForRbw(b.getNumBytes()); return getBlockPoolSlice(bpid).createRbwFile(b); } - File addBlock(String bpid, Block b, File f) throws IOException { - return getBlockPoolSlice(bpid).addBlock(b, f); + /** + * + * @param bytesReservedForRbw Space that was reserved during + * block creation. Now that the block is being finalized we + * can free up this space. + * @return + * @throws IOException + */ + File addFinalizedBlock(String bpid, Block b, + File f, long bytesReservedForRbw) + throws IOException { + releaseReservedSpace(bytesReservedForRbw); + return getBlockPoolSlice(bpid).addFinalizedBlock(b, f); } Executor getCacheExecutor() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java index 1c83829102..adeabfe856 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java @@ -37,6 +37,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HDFSPolicyProvider; @@ -64,8 +65,9 @@ public void setUp() throws Exception { tmpDir = new File(System.getProperty("test.build.data", "target"), UUID.randomUUID().toString()).getAbsoluteFile(); + final Path jksPath = new Path(tmpDir.toString(), "test.jks"); conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, - JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks"); + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()); dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); dfsCluster.waitClusterUp(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index 5ffd3b5bd8..046265f552 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -809,8 +810,9 @@ public void testGetPassword() throws Exception { "target/test-dir")); Configuration conf = new Configuration(); + final Path jksPath = new Path(testDir.toString(), "test.jks"); final String ourUrl = - JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks"; + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri(); File file = new File(testDir, "test.jks"); file.delete(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 1cf9263c25..0ef538d80b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -99,8 +99,9 @@ public void setup() throws Exception { // Set up java key store String testRoot = fsHelper.getTestRootDir(); testRootDir = new File(testRoot).getAbsoluteFile(); + final Path jksPath = new Path(testRootDir.toString(), "test.jks"); conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, - JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks" + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri() ); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); // Lower the batch size for testing @@ -324,7 +325,7 @@ public void testListEncryptionZonesAsNonSuperUser() throws Exception { final UserGroupInformation user = UserGroupInformation. createUserForTesting("user", new String[] { "mygroup" }); - final Path testRoot = new Path(fsHelper.getTestRootDir()); + final Path testRoot = new Path("/tmp/TestEncryptionZones"); final Path superPath = new Path(testRoot, "superuseronly"); final Path allPath = new Path(testRoot, "accessall"); @@ -358,7 +359,7 @@ public void testGetEZAsNonSuperUser() throws Exception { final UserGroupInformation user = UserGroupInformation. createUserForTesting("user", new String[] { "mygroup" }); - final Path testRoot = new Path(fsHelper.getTestRootDir()); + final Path testRoot = new Path("/tmp/TestEncryptionZones"); final Path superPath = new Path(testRoot, "superuseronly"); final Path superPathFile = new Path(superPath, "file1"); final Path allPath = new Path(testRoot, "accessall"); @@ -451,7 +452,7 @@ public Object run() throws Exception { * Test success of Rename EZ on a directory which is already an EZ. */ private void doRenameEncryptionZone(FSTestWrapper wrapper) throws Exception { - final Path testRoot = new Path(fsHelper.getTestRootDir()); + final Path testRoot = new Path("/tmp/TestEncryptionZones"); final Path pathFoo = new Path(testRoot, "foo"); final Path pathFooBaz = new Path(pathFoo, "baz"); wrapper.mkdir(pathFoo, FsPermission.getDirDefault(), true); @@ -598,8 +599,9 @@ public void testCreateEZWithNoProvider() throws Exception { } catch (IOException e) { assertExceptionContains("since no key provider is available", e); } + final Path jksPath = new Path(testRootDir.toString(), "test.jks"); clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, - JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks" + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri() ); // Try listing EZs as well assertNumZones(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java index 2a20954a39..20e4f4edf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java @@ -69,8 +69,9 @@ public void setup() throws Exception { // Set up java key store String testRoot = fsHelper.getTestRootDir(); File testRootDir = new File(testRoot).getAbsoluteFile(); + final Path jksPath = new Path(testRootDir.toString(), "test.jks"); conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, - JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks" + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri() ); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index ae147e9feb..89649aaee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -429,6 +429,14 @@ public String getStorageID() { public boolean isTransientStorage() { return false; } + + @Override + public void reserveSpaceForRbw(long bytesToReserve) { + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + } } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java new file mode 100644 index 0000000000..74ac16708f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.fs.DU; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Random; + +/** + * Ensure that the DN reserves disk space equivalent to a full block for + * replica being written (RBW). + */ +public class TestRbwSpaceReservation { + static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class); + + private static final short REPL_FACTOR = 1; + private static final int DU_REFRESH_INTERVAL_MSEC = 500; + private static final int STORAGES_PER_DATANODE = 1; + private static final int BLOCK_SIZE = 1024 * 1024; + private static final int SMALL_BLOCK_SIZE = 1024; + + protected MiniDFSCluster cluster; + private Configuration conf; + private DistributedFileSystem fs = null; + private DFSClient client = null; + FsVolumeImpl singletonVolume = null; + + private static Random rand = new Random(); + + private void initConfig(int blockSize) { + conf = new HdfsConfiguration(); + + // Refresh disk usage information frequently. + conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); + conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); + + // Disable the scanner + conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + } + + static { + ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); + } + + private void startCluster(int blockSize, long perVolumeCapacity) throws IOException { + initConfig(blockSize); + + cluster = new MiniDFSCluster + .Builder(conf) + .storagesPerDatanode(STORAGES_PER_DATANODE) + .numDataNodes(REPL_FACTOR) + .build(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + cluster.waitActive(); + + if (perVolumeCapacity >= 0) { + List volumes = + cluster.getDataNodes().get(0).getFSDataset().getVolumes(); + + assertThat(volumes.size(), is(1)); + singletonVolume = ((FsVolumeImpl) volumes.get(0)); + singletonVolume.setCapacityForTesting(perVolumeCapacity); + } + } + + @After + public void shutdownCluster() throws IOException { + if (client != null) { + client.close(); + client = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void createFileAndTestSpaceReservation( + final String fileNamePrefix, final int fileBlockSize) + throws IOException, InterruptedException { + // Enough for 1 block + meta files + some delta. + final long configuredCapacity = fileBlockSize * 2 - 1; + startCluster(BLOCK_SIZE, configuredCapacity); + FSDataOutputStream out = null; + Path path = new Path("/" + fileNamePrefix + ".dat"); + + try { + out = fs.create(path, false, 4096, (short) 1, fileBlockSize); + + byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)]; + out.write(buffer); + out.hsync(); + int bytesWritten = buffer.length; + + // Check that space was reserved for a full block minus the bytesWritten. + assertThat(singletonVolume.getReservedForRbw(), + is((long) fileBlockSize - bytesWritten)); + out.close(); + out = null; + + // Check that the reserved space has been released since we closed the + // file. + assertThat(singletonVolume.getReservedForRbw(), is(0L)); + + // Reopen the file for appends and write 1 more byte. + out = fs.append(path); + out.write(buffer); + out.hsync(); + bytesWritten += buffer.length; + + // Check that space was again reserved for a full block minus the + // bytesWritten so far. + assertThat(singletonVolume.getReservedForRbw(), + is((long) fileBlockSize - bytesWritten)); + + // Write once again and again verify the available space. This ensures + // that the reserved space is progressively adjusted to account for bytes + // written to disk. + out.write(buffer); + out.hsync(); + bytesWritten += buffer.length; + assertThat(singletonVolume.getReservedForRbw(), + is((long) fileBlockSize - bytesWritten)); + } finally { + if (out != null) { + out.close(); + } + } + } + + @Test (timeout=300000) + public void testWithDefaultBlockSize() + throws IOException, InterruptedException { + createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE); + } + + @Test (timeout=300000) + public void testWithNonDefaultBlockSize() + throws IOException, InterruptedException { + // Same test as previous one, but with a non-default block size. + createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2); + } + + /** + * Stress test to ensure we are not leaking reserved space. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=600000) + public void stressTest() throws IOException, InterruptedException { + final int numWriters = 5; + startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10); + Writer[] writers = new Writer[numWriters]; + + // Start a few writers and let them run for a while. + for (int i = 0; i < numWriters; ++i) { + writers[i] = new Writer(client, SMALL_BLOCK_SIZE); + writers[i].start(); + } + + Thread.sleep(60000); + + // Stop the writers. + for (Writer w : writers) { + w.stopWriter(); + } + int filesCreated = 0; + int numFailures = 0; + for (Writer w : writers) { + w.join(); + filesCreated += w.getFilesCreated(); + numFailures += w.getNumFailures(); + } + + LOG.info("Stress test created " + filesCreated + + " files and hit " + numFailures + " failures"); + + // Check no space was leaked. + assertThat(singletonVolume.getReservedForRbw(), is(0L)); + } + + private static class Writer extends Daemon { + private volatile boolean keepRunning; + private final DFSClient localClient; + private int filesCreated = 0; + private int numFailures = 0; + byte[] data; + + Writer(DFSClient client, int blockSize) throws IOException { + localClient = client; + keepRunning = true; + filesCreated = 0; + numFailures = 0; + + // At least some of the files should span a block boundary. + data = new byte[blockSize * 2]; + } + + @Override + public void run() { + /** + * Create a file, write up to 3 blocks of data and close the file. + * Do this in a loop until we are told to stop. + */ + while (keepRunning) { + OutputStream os = null; + try { + String filename = "/file-" + rand.nextLong(); + os = localClient.create(filename, false); + os.write(data, 0, rand.nextInt(data.length)); + IOUtils.closeQuietly(os); + os = null; + localClient.delete(filename, false); + Thread.sleep(50); // Sleep for a bit to avoid killing the system. + ++filesCreated; + } catch (IOException ioe) { + // Just ignore the exception and keep going. + ++numFailures; + } catch (InterruptedException ie) { + return; + } finally { + if (os != null) { + IOUtils.closeQuietly(os); + } + } + } + } + + public void stopWriter() { + keepRunning = false; + } + + public int getFilesCreated() { + return filesCreated; + } + + public int getNumFailures() { + return numFailures; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 8c73c30aec..73b0a5f1cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -158,7 +158,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep replicasMap.add(bpid, new ReplicaInPipeline( blocks[TEMPORARY].getBlockId(), blocks[TEMPORARY].getGenerationStamp(), vol, - vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile())); + vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0)); replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 34a206af6e..beafc22518 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -287,6 +287,9 @@ Release 2.6.0 - UNRELEASED YARN-2431. NM restart: cgroup is not removed for reacquired containers (jlowe) + YARN-2519. Credential Provider related unit tests failed on Windows. + (Xiaoyu Yao via cnauroth) + Release 2.5.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/util/TestWebAppUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/util/TestWebAppUtils.java index 18600fdea6..2bd91b4ac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/util/TestWebAppUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/util/TestWebAppUtils.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.HttpServer2.Builder; import org.apache.hadoop.security.alias.CredentialProvider; @@ -74,8 +75,9 @@ protected Configuration provisionCredentialsForSSL() throws IOException, "target/test-dir")); Configuration conf = new Configuration(); + final Path jksPath = new Path(testDir.toString(), "test.jks"); final String ourUrl = - JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks"; + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri(); File file = new File(testDir, "test.jks"); file.delete();