Merge trunk into HDFS-6581

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
This commit is contained in:
arp 2014-09-07 14:46:46 -07:00
commit eb8284d50e
18 changed files with 448 additions and 32 deletions

View File

@ -612,6 +612,12 @@ Release 2.6.0 - UNRELEASED
HDFS-6862. Add missing timeout annotations to tests. (Xiaoyu Yao via
Arpit Agarwal)
HDFS-6898. DN must reserve space for a full block when an RBW block is
created. (Arpit Agarwal)
HDFS-7025. HDFS Credential Provider related Unit Test Failure.
(Xiaoyu Yao via cnauroth)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -48,7 +48,7 @@ protected HdfsConstants() {
"org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol";
public static final int MIN_BLOCKS_FOR_WRITE = 5;
public static final int MIN_BLOCKS_FOR_WRITE = 1;
// Long that indicates "leave current quota unchanged"
public static final long QUOTA_DONT_SET = Long.MAX_VALUE;

View File

@ -34,10 +34,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
* @param genStamp replica generation stamp
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
public ReplicaBeingWritten(long blockId, long genStamp,
FsVolumeSpi vol, File dir) {
super( blockId, genStamp, vol, dir);
FsVolumeSpi vol, File dir, long bytesToReserve) {
super(blockId, genStamp, vol, dir, bytesToReserve);
}
/**
@ -60,10 +62,12 @@ public ReplicaBeingWritten(Block block,
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param writer a thread that is writing to this replica
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
public ReplicaBeingWritten(long blockId, long len, long genStamp,
FsVolumeSpi vol, File dir, Thread writer ) {
super( blockId, len, genStamp, vol, dir, writer);
FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
super(blockId, len, genStamp, vol, dir, writer, bytesToReserve);
}
/**

View File

@ -45,16 +45,25 @@ public class ReplicaInPipeline extends ReplicaInfo
private byte[] lastChecksum;
private Thread writer;
/**
* Bytes reserved for this replica on the containing volume.
* Based off difference between the estimated maximum block length and
* the bytes already written to this block.
*/
private long bytesReserved;
/**
* Constructor for a zero length replica
* @param blockId block id
* @param genStamp replica generation stamp
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
public ReplicaInPipeline(long blockId, long genStamp,
FsVolumeSpi vol, File dir) {
this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
FsVolumeSpi vol, File dir, long bytesToReserve) {
this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve);
}
/**
@ -67,7 +76,7 @@ public ReplicaInPipeline(long blockId, long genStamp,
ReplicaInPipeline(Block block,
FsVolumeSpi vol, File dir, Thread writer) {
this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
vol, dir, writer);
vol, dir, writer, 0L);
}
/**
@ -78,13 +87,16 @@ public ReplicaInPipeline(long blockId, long genStamp,
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param writer a thread that is writing to this replica
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
ReplicaInPipeline(long blockId, long len, long genStamp,
FsVolumeSpi vol, File dir, Thread writer ) {
FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
super( blockId, len, genStamp, vol, dir);
this.bytesAcked = len;
this.bytesOnDisk = len;
this.writer = writer;
this.bytesReserved = bytesToReserve;
}
/**
@ -96,6 +108,7 @@ public ReplicaInPipeline(ReplicaInPipeline from) {
this.bytesAcked = from.getBytesAcked();
this.bytesOnDisk = from.getBytesOnDisk();
this.writer = from.writer;
this.bytesReserved = from.bytesReserved;
}
@Override
@ -115,7 +128,14 @@ public long getBytesAcked() {
@Override // ReplicaInPipelineInterface
public void setBytesAcked(long bytesAcked) {
long newBytesAcked = bytesAcked - this.bytesAcked;
this.bytesAcked = bytesAcked;
// Once bytes are ACK'ed we can release equivalent space from the
// volume's reservedForRbw count. We could have released it as soon
// as the write-to-disk completed but that would be inefficient.
getVolume().releaseReservedSpace(newBytesAcked);
bytesReserved -= newBytesAcked;
}
@Override // ReplicaInPipelineInterface
@ -123,6 +143,11 @@ public long getBytesOnDisk() {
return bytesOnDisk;
}
@Override
public long getBytesReserved() {
return bytesReserved;
}
@Override // ReplicaInPipelineInterface
public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
this.bytesOnDisk = dataLength;

View File

@ -212,6 +212,13 @@ public void setUnlinked() {
// no need to be unlinked
}
/**
* Number of bytes reserved for this replica on disk.
*/
public long getBytesReserved() {
return 0;
}
/**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any

View File

@ -48,4 +48,15 @@ public interface FsVolumeSpi {
/** Returns true if the volume is NOT backed by persistent storage. */
public boolean isTransientStorage();
/**
* Reserve disk space for an RBW block so a writer does not run out of
* space before the block is full.
*/
public void reserveSpaceForRbw(long bytesToReserve);
/**
* Release disk space previously reserved for RBW block.
*/
public void releaseReservedSpace(long bytesToRelease);
}

View File

@ -247,7 +247,7 @@ File createRbwFile(Block b) throws IOException {
return DatanodeUtil.createTmpFile(b, f);
}
File addBlock(Block b, File f) throws IOException {
File addFinalizedBlock(Block b, File f) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
if (!blockDir.exists()) {
if (!blockDir.mkdirs()) {
@ -436,9 +436,11 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir,
// The restart meta file exists
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
// It didn't expire. Load the replica as a RBW.
// We don't know the expected block length, so just use 0
// and don't reserve any more space for writes.
newReplica = new ReplicaBeingWritten(blockId,
validateIntegrityAndSetLength(file, genStamp),
genStamp, volume, file.getParentFile(), null);
genStamp, volume, file.getParentFile(), null, 0);
loadRwr = false;
}
sc.close();

View File

@ -612,7 +612,7 @@ static File moveBlockFiles(Block b, File srcfile, File destdir)
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta
+ " and " + srcfile + " to " + dstfile);
}
return dstfile;
@ -760,7 +760,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
File oldmeta = replicaInfo.getMetaFile();
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
v, newBlkFile.getParentFile(), Thread.currentThread());
v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen);
File newmeta = newReplicaInfo.getMetaFile();
// rename meta file to rbw directory
@ -796,7 +796,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
// Replace finalized replica by a RBW replica in replicas map
volumeMap.add(bpid, newReplicaInfo);
v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
return newReplicaInfo;
}
@ -941,7 +941,7 @@ public synchronized ReplicaInPipeline createRbw(StorageType storageType,
// create an rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
@ -1058,7 +1058,7 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw(
// create RBW
final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
blockId, numBytes, expectedGs,
v, dest.getParentFile(), Thread.currentThread());
v, dest.getParentFile(), Thread.currentThread(), 0);
rbw.setBytesAcked(visible);
// overwrite the RBW in the volume map
volumeMap.add(b.getBlockPoolId(), rbw);
@ -1079,7 +1079,7 @@ public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
b.getGenerationStamp(), v, f.getParentFile(), 0);
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
}
@ -1144,7 +1144,8 @@ private synchronized FinalizedReplica finalizeReplica(String bpid,
" for block " + replicaInfo);
}
File dest = v.addBlock(bpid, replicaInfo, f);
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) {

View File

@ -28,6 +28,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
@ -62,6 +63,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final DF usage;
private final long reserved;
// Disk space reserved for open blocks.
private AtomicLong reservedForRbw;
// Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
@ -82,6 +86,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.reserved = conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.reservedForRbw = new AtomicLong(0L);
this.currentDir = currentDir;
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
@ -166,7 +171,7 @@ public void setCapacityForTesting(long capacity) {
@Override
public long getAvailable() throws IOException {
long remaining = getCapacity()-getDfsUsed();
long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
long available = usage.getAvailable();
if (remaining > available) {
remaining = available;
@ -174,6 +179,11 @@ public long getAvailable() throws IOException {
return (remaining > 0) ? remaining : 0;
}
@VisibleForTesting
public long getReservedForRbw() {
return reservedForRbw.get();
}
long getReserved(){
return reserved;
}
@ -222,16 +232,58 @@ File createTmpFile(String bpid, Block b) throws IOException {
return getBlockPoolSlice(bpid).createTmpFile(b);
}
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
if (bytesToReserve != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
}
reservedForRbw.addAndGet(bytesToReserve);
}
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
}
long oldReservation, newReservation;
do {
oldReservation = reservedForRbw.get();
newReservation = oldReservation - bytesToRelease;
if (newReservation < 0) {
// Failsafe, this should never occur in practice, but if it does we don't
// want to start advertising more space than we have available.
newReservation = 0;
}
} while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
}
}
/**
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
*/
File createRbwFile(String bpid, Block b) throws IOException {
reserveSpaceForRbw(b.getNumBytes());
return getBlockPoolSlice(bpid).createRbwFile(b);
}
File addBlock(String bpid, Block b, File f) throws IOException {
return getBlockPoolSlice(bpid).addBlock(b, f);
/**
*
* @param bytesReservedForRbw Space that was reserved during
* block creation. Now that the block is being finalized we
* can free up this space.
* @return
* @throws IOException
*/
File addFinalizedBlock(String bpid, Block b,
File f, long bytesReservedForRbw)
throws IOException {
releaseReservedSpace(bytesReservedForRbw);
return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
}
Executor getCacheExecutor() {

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
@ -64,8 +65,9 @@ public void setUp() throws Exception {
tmpDir = new File(System.getProperty("test.build.data", "target"),
UUID.randomUUID().toString()).getAbsoluteFile();
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
dfsCluster.waitClusterUp();

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -809,8 +810,9 @@ public void testGetPassword() throws Exception {
"target/test-dir"));
Configuration conf = new Configuration();
final Path jksPath = new Path(testDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(testDir, "test.jks");
file.delete();

View File

@ -99,8 +99,9 @@ public void setup() throws Exception {
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// Lower the batch size for testing
@ -324,7 +325,7 @@ public void testListEncryptionZonesAsNonSuperUser() throws Exception {
final UserGroupInformation user = UserGroupInformation.
createUserForTesting("user", new String[] { "mygroup" });
final Path testRoot = new Path(fsHelper.getTestRootDir());
final Path testRoot = new Path("/tmp/TestEncryptionZones");
final Path superPath = new Path(testRoot, "superuseronly");
final Path allPath = new Path(testRoot, "accessall");
@ -358,7 +359,7 @@ public void testGetEZAsNonSuperUser() throws Exception {
final UserGroupInformation user = UserGroupInformation.
createUserForTesting("user", new String[] { "mygroup" });
final Path testRoot = new Path(fsHelper.getTestRootDir());
final Path testRoot = new Path("/tmp/TestEncryptionZones");
final Path superPath = new Path(testRoot, "superuseronly");
final Path superPathFile = new Path(superPath, "file1");
final Path allPath = new Path(testRoot, "accessall");
@ -451,7 +452,7 @@ public Object run() throws Exception {
* Test success of Rename EZ on a directory which is already an EZ.
*/
private void doRenameEncryptionZone(FSTestWrapper wrapper) throws Exception {
final Path testRoot = new Path(fsHelper.getTestRootDir());
final Path testRoot = new Path("/tmp/TestEncryptionZones");
final Path pathFoo = new Path(testRoot, "foo");
final Path pathFooBaz = new Path(pathFoo, "baz");
wrapper.mkdir(pathFoo, FsPermission.getDirDefault(), true);
@ -598,8 +599,9 @@ public void testCreateEZWithNoProvider() throws Exception {
} catch (IOException e) {
assertExceptionContains("since no key provider is available", e);
}
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
// Try listing EZs as well
assertNumZones(0);

View File

@ -69,8 +69,9 @@ public void setup() throws Exception {
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
File testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE);

View File

@ -429,6 +429,14 @@ public String getStorageID() {
public boolean isTransientStorage() {
return false;
}
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

View File

@ -0,0 +1,288 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
* replica being written (RBW).
*/
public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
private static final int SMALL_BLOCK_SIZE = 1024;
protected MiniDFSCluster cluster;
private Configuration conf;
private DistributedFileSystem fs = null;
private DFSClient client = null;
FsVolumeImpl singletonVolume = null;
private static Random rand = new Random();
private void initConfig(int blockSize) {
conf = new HdfsConfiguration();
// Refresh disk usage information frequently.
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
// Disable the scanner
conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
}
static {
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
.numDataNodes(REPL_FACTOR)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@After
public void shutdownCluster() throws IOException {
if (client != null) {
client.close();
client = null;
}
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void createFileAndTestSpaceReservation(
final String fileNamePrefix, final int fileBlockSize)
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
startCluster(BLOCK_SIZE, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
try {
out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
out.write(buffer);
out.hsync();
int bytesWritten = buffer.length;
// Check that space was reserved for a full block minus the bytesWritten.
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
out.close();
out = null;
// Check that the reserved space has been released since we closed the
// file.
assertThat(singletonVolume.getReservedForRbw(), is(0L));
// Reopen the file for appends and write 1 more byte.
out = fs.append(path);
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
// Check that space was again reserved for a full block minus the
// bytesWritten so far.
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
// Write once again and again verify the available space. This ensures
// that the reserved space is progressively adjusted to account for bytes
// written to disk.
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
} finally {
if (out != null) {
out.close();
}
}
}
@Test (timeout=300000)
public void testWithDefaultBlockSize()
throws IOException, InterruptedException {
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
}
@Test (timeout=300000)
public void testWithNonDefaultBlockSize()
throws IOException, InterruptedException {
// Same test as previous one, but with a non-default block size.
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
}
/**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.
for (int i = 0; i < numWriters; ++i) {
writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
writers[i].start();
}
Thread.sleep(60000);
// Stop the writers.
for (Writer w : writers) {
w.stopWriter();
}
int filesCreated = 0;
int numFailures = 0;
for (Writer w : writers) {
w.join();
filesCreated += w.getFilesCreated();
numFailures += w.getNumFailures();
}
LOG.info("Stress test created " + filesCreated +
" files and hit " + numFailures + " failures");
// Check no space was leaked.
assertThat(singletonVolume.getReservedForRbw(), is(0L));
}
private static class Writer extends Daemon {
private volatile boolean keepRunning;
private final DFSClient localClient;
private int filesCreated = 0;
private int numFailures = 0;
byte[] data;
Writer(DFSClient client, int blockSize) throws IOException {
localClient = client;
keepRunning = true;
filesCreated = 0;
numFailures = 0;
// At least some of the files should span a block boundary.
data = new byte[blockSize * 2];
}
@Override
public void run() {
/**
* Create a file, write up to 3 blocks of data and close the file.
* Do this in a loop until we are told to stop.
*/
while (keepRunning) {
OutputStream os = null;
try {
String filename = "/file-" + rand.nextLong();
os = localClient.create(filename, false);
os.write(data, 0, rand.nextInt(data.length));
IOUtils.closeQuietly(os);
os = null;
localClient.delete(filename, false);
Thread.sleep(50); // Sleep for a bit to avoid killing the system.
++filesCreated;
} catch (IOException ioe) {
// Just ignore the exception and keep going.
++numFailures;
} catch (InterruptedException ie) {
return;
} finally {
if (os != null) {
IOUtils.closeQuietly(os);
}
}
}
}
public void stopWriter() {
keepRunning = false;
}
public int getFilesCreated() {
return filesCreated;
}
public int getNumFailures() {
return numFailures;
}
}
}

View File

@ -158,7 +158,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep
replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile()));
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0));
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);

View File

@ -287,6 +287,9 @@ Release 2.6.0 - UNRELEASED
YARN-2431. NM restart: cgroup is not removed for reacquired containers
(jlowe)
YARN-2519. Credential Provider related unit tests failed on Windows.
(Xiaoyu Yao via cnauroth)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.HttpServer2.Builder;
import org.apache.hadoop.security.alias.CredentialProvider;
@ -74,8 +75,9 @@ protected Configuration provisionCredentialsForSSL() throws IOException,
"target/test-dir"));
Configuration conf = new Configuration();
final Path jksPath = new Path(testDir.toString(), "test.jks");
final String ourUrl =
JavaKeyStoreProvider.SCHEME_NAME + "://file/" + testDir + "/test.jks";
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri();
File file = new File(testDir, "test.jks");
file.delete();