HDFS-8327. Compute storage type quotas in INodeFile.computeQuotaDeltaForTruncate(). Contributed by Haohui Mai.
This commit is contained in:
parent
3becc3af83
commit
02a4a22b9c
@ -535,6 +535,8 @@ Release 2.8.0 - UNRELEASED
|
||||
|
||||
HDFS-6757. Simplify lease manager with INodeID. (wheat9)
|
||||
|
||||
HDFS-8327. Simplify quota calculations for snapshots and truncate. (wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
@ -393,7 +393,7 @@ static Block[] unprotectedSetReplication(
|
||||
// if replication > oldBR, then newBR == replication.
|
||||
// if replication < oldBR, we don't know newBR yet.
|
||||
if (replication > oldBR) {
|
||||
long dsDelta = file.storagespaceConsumed()/oldBR;
|
||||
long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / oldBR;
|
||||
fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true);
|
||||
}
|
||||
|
||||
@ -402,7 +402,7 @@ static Block[] unprotectedSetReplication(
|
||||
final short newBR = file.getBlockReplication();
|
||||
// check newBR < oldBR case.
|
||||
if (newBR < oldBR) {
|
||||
long dsDelta = file.storagespaceConsumed()/newBR;
|
||||
long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / newBR;
|
||||
fsd.updateCount(iip, 0L, dsDelta, oldBR, newBR, true);
|
||||
}
|
||||
|
||||
|
@ -532,8 +532,8 @@ boolean unprotectedRemoveBlock(String path, INodesInPath iip,
|
||||
INodeFile fileNode, Block block) throws IOException {
|
||||
// modify file-> block and blocksMap
|
||||
// fileNode should be under construction
|
||||
boolean removed = fileNode.removeLastBlock(block);
|
||||
if (!removed) {
|
||||
BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
|
||||
if (uc == null) {
|
||||
return false;
|
||||
}
|
||||
getBlockManager().removeBlockFromMap(block);
|
||||
@ -1134,18 +1134,9 @@ private void verifyQuotaForTruncate(INodesInPath iip, INodeFile file,
|
||||
// Do not check quota if edit log is still being processed
|
||||
return;
|
||||
}
|
||||
final long diff = file.computeQuotaDeltaForTruncate(newLength);
|
||||
final short repl = file.getBlockReplication();
|
||||
delta.addStorageSpace(diff * repl);
|
||||
final BlockStoragePolicy policy = getBlockStoragePolicySuite()
|
||||
.getPolicy(file.getStoragePolicyID());
|
||||
List<StorageType> types = policy.chooseStorageTypes(repl);
|
||||
for (StorageType t : types) {
|
||||
if (t.supportTypeQuota()) {
|
||||
delta.addTypeSpace(t, diff);
|
||||
}
|
||||
}
|
||||
if (diff > 0) {
|
||||
file.computeQuotaDeltaForTruncate(newLength, policy, delta);
|
||||
readLock();
|
||||
try {
|
||||
verifyQuota(iip, iip.length() - 1, delta, null);
|
||||
@ -1153,7 +1144,6 @@ private void verifyQuotaForTruncate(INodesInPath iip, INodeFile file,
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is always called with writeLock of FSDirectory held.
|
||||
|
@ -4330,7 +4330,7 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
|
||||
if (deleteblock) {
|
||||
Block blockToDel = ExtendedBlock.getLocalBlock(oldBlock);
|
||||
boolean remove = iFile.removeLastBlock(blockToDel);
|
||||
boolean remove = iFile.removeLastBlock(blockToDel) != null;
|
||||
if (remove) {
|
||||
blockManager.removeBlock(storedBlock);
|
||||
}
|
||||
|
@ -251,22 +251,24 @@ public BlockInfoContiguousUnderConstruction setLastBlock(
|
||||
* Remove a block from the block list. This block should be
|
||||
* the last one on the list.
|
||||
*/
|
||||
boolean removeLastBlock(Block oldblock) {
|
||||
BlockInfoContiguousUnderConstruction removeLastBlock(Block oldblock) {
|
||||
Preconditions.checkState(isUnderConstruction(),
|
||||
"file is no longer under construction");
|
||||
if (blocks == null || blocks.length == 0) {
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
int size_1 = blocks.length - 1;
|
||||
if (!blocks[size_1].equals(oldblock)) {
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
BlockInfoContiguousUnderConstruction uc =
|
||||
(BlockInfoContiguousUnderConstruction)blocks[size_1];
|
||||
//copy to a new list
|
||||
BlockInfoContiguous[] newlist = new BlockInfoContiguous[size_1];
|
||||
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
||||
setBlocks(newlist);
|
||||
return true;
|
||||
return uc;
|
||||
}
|
||||
|
||||
/* End of Under-Construction Feature */
|
||||
@ -416,11 +418,6 @@ public long getHeaderLong() {
|
||||
return header;
|
||||
}
|
||||
|
||||
/** @return the storagespace required for a full block. */
|
||||
final long getPreferredBlockStoragespace() {
|
||||
return getPreferredBlockSize() * getBlockReplication();
|
||||
}
|
||||
|
||||
/** @return the blocks of the file. */
|
||||
@Override
|
||||
public BlockInfoContiguous[] getBlocks() {
|
||||
@ -567,34 +564,41 @@ public final QuotaCounts computeQuotaUsage(
|
||||
QuotaCounts counts, boolean useCache,
|
||||
int lastSnapshotId) {
|
||||
long nsDelta = 1;
|
||||
final long ssDeltaNoReplication;
|
||||
short replication;
|
||||
counts.addNameSpace(nsDelta);
|
||||
|
||||
BlockStoragePolicy bsp = null;
|
||||
if (blockStoragePolicyId != BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
||||
bsp = bsps.getPolicy(blockStoragePolicyId);
|
||||
}
|
||||
|
||||
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
|
||||
if (sf != null) {
|
||||
if (sf == null) {
|
||||
counts.add(storagespaceConsumed(bsp));
|
||||
return counts;
|
||||
}
|
||||
|
||||
FileDiffList fileDiffList = sf.getDiffs();
|
||||
int last = fileDiffList.getLastSnapshotId();
|
||||
|
||||
if (lastSnapshotId == Snapshot.CURRENT_STATE_ID
|
||||
|| last == Snapshot.CURRENT_STATE_ID) {
|
||||
ssDeltaNoReplication = storagespaceConsumedNoReplication();
|
||||
replication = getBlockReplication();
|
||||
} else if (last < lastSnapshotId) {
|
||||
counts.add(storagespaceConsumed(bsp));
|
||||
return counts;
|
||||
}
|
||||
|
||||
final long ssDeltaNoReplication;
|
||||
short replication;
|
||||
if (last < lastSnapshotId) {
|
||||
ssDeltaNoReplication = computeFileSize(true, false);
|
||||
replication = getFileReplication();
|
||||
} else {
|
||||
int sid = fileDiffList.getSnapshotById(lastSnapshotId);
|
||||
ssDeltaNoReplication = storagespaceConsumedNoReplication(sid);
|
||||
replication = getReplication(sid);
|
||||
ssDeltaNoReplication = computeFileSize(sid);
|
||||
replication = getFileReplication(sid);
|
||||
}
|
||||
} else {
|
||||
ssDeltaNoReplication = storagespaceConsumedNoReplication();
|
||||
replication = getBlockReplication();
|
||||
}
|
||||
counts.addNameSpace(nsDelta);
|
||||
counts.addStorageSpace(ssDeltaNoReplication * replication);
|
||||
|
||||
if (blockStoragePolicyId != BLOCK_STORAGE_POLICY_ID_UNSPECIFIED){
|
||||
BlockStoragePolicy bsp = bsps.getPolicy(blockStoragePolicyId);
|
||||
counts.addStorageSpace(ssDeltaNoReplication * replication);
|
||||
if (bsp != null) {
|
||||
List<StorageType> storageTypes = bsp.chooseStorageTypes(replication);
|
||||
for (StorageType t : storageTypes) {
|
||||
if (!t.supportTypeQuota()) {
|
||||
@ -626,7 +630,8 @@ public final ContentSummaryComputationContext computeContentSummary(
|
||||
}
|
||||
}
|
||||
counts.addContent(Content.LENGTH, fileLen);
|
||||
counts.addContent(Content.DISKSPACE, storagespaceConsumed());
|
||||
counts.addContent(Content.DISKSPACE, storagespaceConsumed(null)
|
||||
.getStorageSpace());
|
||||
|
||||
if (getStoragePolicyID() != BLOCK_STORAGE_POLICY_ID_UNSPECIFIED){
|
||||
BlockStoragePolicy bsp = summary.getBlockStoragePolicySuite().
|
||||
@ -709,19 +714,15 @@ public final long computeFileSize(boolean includesLastUcBlock,
|
||||
* including blocks in its snapshots.
|
||||
* Use preferred block size for the last block if it is under construction.
|
||||
*/
|
||||
public final long storagespaceConsumed() {
|
||||
return storagespaceConsumedNoReplication() * getBlockReplication();
|
||||
}
|
||||
|
||||
public final long storagespaceConsumedNoReplication() {
|
||||
public final QuotaCounts storagespaceConsumed(BlockStoragePolicy bsp) {
|
||||
QuotaCounts counts = new QuotaCounts.Builder().build();
|
||||
final Iterable<BlockInfoContiguous> blocks;
|
||||
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
|
||||
if (sf == null) {
|
||||
return computeFileSize(true, true);
|
||||
}
|
||||
|
||||
blocks = Arrays.asList(getBlocks());
|
||||
} else {
|
||||
// Collect all distinct blocks
|
||||
long size = 0;
|
||||
Set<Block> allBlocks = new HashSet<Block>(Arrays.asList(getBlocks()));
|
||||
Set<BlockInfoContiguous> allBlocks = new HashSet<>(Arrays.asList(getBlocks()));
|
||||
List<FileDiff> diffs = sf.getDiffs().asList();
|
||||
for(FileDiff diff : diffs) {
|
||||
BlockInfoContiguous[] diffBlocks = diff.getBlocks();
|
||||
@ -729,41 +730,24 @@ public final long storagespaceConsumedNoReplication() {
|
||||
allBlocks.addAll(Arrays.asList(diffBlocks));
|
||||
}
|
||||
}
|
||||
for(Block block : allBlocks) {
|
||||
size += block.getNumBytes();
|
||||
}
|
||||
// check if the last block is under construction
|
||||
BlockInfoContiguous lastBlock = getLastBlock();
|
||||
if(lastBlock != null &&
|
||||
lastBlock instanceof BlockInfoContiguousUnderConstruction) {
|
||||
size += getPreferredBlockSize() - lastBlock.getNumBytes();
|
||||
}
|
||||
return size;
|
||||
blocks = allBlocks;
|
||||
}
|
||||
|
||||
public final long storagespaceConsumed(int lastSnapshotId) {
|
||||
if (lastSnapshotId != CURRENT_STATE_ID) {
|
||||
return computeFileSize(lastSnapshotId)
|
||||
* getFileReplication(lastSnapshotId);
|
||||
} else {
|
||||
return storagespaceConsumed();
|
||||
final short replication = getBlockReplication();
|
||||
for (BlockInfoContiguous b : blocks) {
|
||||
long blockSize = b.isComplete() ? b.getNumBytes() :
|
||||
getPreferredBlockSize();
|
||||
counts.addStorageSpace(blockSize * replication);
|
||||
if (bsp != null) {
|
||||
List<StorageType> types = bsp.chooseStorageTypes(replication);
|
||||
for (StorageType t : types) {
|
||||
if (t.supportTypeQuota()) {
|
||||
counts.addTypeSpace(t, blockSize);
|
||||
}
|
||||
}
|
||||
|
||||
public final short getReplication(int lastSnapshotId) {
|
||||
if (lastSnapshotId != CURRENT_STATE_ID) {
|
||||
return getFileReplication(lastSnapshotId);
|
||||
} else {
|
||||
return getBlockReplication();
|
||||
}
|
||||
}
|
||||
|
||||
public final long storagespaceConsumedNoReplication(int lastSnapshotId) {
|
||||
if (lastSnapshotId != CURRENT_STATE_ID) {
|
||||
return computeFileSize(lastSnapshotId);
|
||||
} else {
|
||||
return storagespaceConsumedNoReplication();
|
||||
}
|
||||
return counts;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -832,38 +816,56 @@ public long collectBlocksBeyondMax(final long max,
|
||||
/**
|
||||
* compute the quota usage change for a truncate op
|
||||
* @param newLength the length for truncation
|
||||
* @return the quota usage delta (not considering replication factor)
|
||||
*/
|
||||
long computeQuotaDeltaForTruncate(final long newLength) {
|
||||
**/
|
||||
void computeQuotaDeltaForTruncate(
|
||||
long newLength, BlockStoragePolicy bsps,
|
||||
QuotaCounts delta) {
|
||||
final BlockInfoContiguous[] blocks = getBlocks();
|
||||
if (blocks == null || blocks.length == 0) {
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
int n = 0;
|
||||
long size = 0;
|
||||
for (; n < blocks.length && newLength > size; n++) {
|
||||
size += blocks[n].getNumBytes();
|
||||
}
|
||||
final boolean onBoundary = size == newLength;
|
||||
|
||||
long truncateSize = 0;
|
||||
for (int i = (onBoundary ? n : n - 1); i < blocks.length; i++) {
|
||||
truncateSize += blocks[i].getNumBytes();
|
||||
for (BlockInfoContiguous b : blocks) {
|
||||
size += b.getNumBytes();
|
||||
}
|
||||
|
||||
BlockInfoContiguous[] sblocks = null;
|
||||
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
|
||||
if (sf != null) {
|
||||
FileDiff diff = sf.getDiffs().getLast();
|
||||
BlockInfoContiguous[] sblocks = diff != null ? diff.getBlocks() : null;
|
||||
if (sblocks != null) {
|
||||
for (int i = (onBoundary ? n : n-1); i < blocks.length
|
||||
&& i < sblocks.length && blocks[i].equals(sblocks[i]); i++) {
|
||||
truncateSize -= blocks[i].getNumBytes();
|
||||
sblocks = diff != null ? diff.getBlocks() : null;
|
||||
}
|
||||
|
||||
for (int i = blocks.length - 1; i >= 0 && size > newLength;
|
||||
size -= blocks[i].getNumBytes(), --i) {
|
||||
BlockInfoContiguous bi = blocks[i];
|
||||
long truncatedBytes;
|
||||
if (size - newLength < bi.getNumBytes()) {
|
||||
// Record a full block as the last block will be copied during
|
||||
// recovery
|
||||
truncatedBytes = bi.getNumBytes() - getPreferredBlockSize();
|
||||
} else {
|
||||
truncatedBytes = bi.getNumBytes();
|
||||
}
|
||||
|
||||
// The block exist in snapshot, adding back the truncated bytes in the
|
||||
// existing files
|
||||
if (sblocks != null && i < sblocks.length && bi.equals(sblocks[i])) {
|
||||
truncatedBytes -= bi.getNumBytes();
|
||||
}
|
||||
|
||||
delta.addStorageSpace(-truncatedBytes * getBlockReplication());
|
||||
if (bsps != null) {
|
||||
List<StorageType> types = bsps.chooseStorageTypes(
|
||||
getBlockReplication());
|
||||
for (StorageType t : types) {
|
||||
if (t.supportTypeQuota()) {
|
||||
delta.addTypeSpace(t, -truncatedBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return onBoundary ? -truncateSize : (getPreferredBlockSize() - truncateSize);
|
||||
}
|
||||
|
||||
void truncateBlocksTo(int n) {
|
||||
|
@ -19,8 +19,8 @@
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
@ -32,7 +32,6 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
|
||||
import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||
|
||||
/**
|
||||
* Feature for file with snapshot-related information.
|
||||
@ -145,50 +144,36 @@ public void clearDiffs() {
|
||||
public QuotaCounts updateQuotaAndCollectBlocks(BlockStoragePolicySuite bsps, INodeFile file,
|
||||
FileDiff removed, BlocksMapUpdateInfo collectedBlocks,
|
||||
final List<INode> removedINodes) {
|
||||
long oldStoragespace = file.storagespaceConsumed();
|
||||
|
||||
byte storagePolicyID = file.getStoragePolicyID();
|
||||
BlockStoragePolicy bsp = null;
|
||||
EnumCounters<StorageType> typeSpaces =
|
||||
new EnumCounters<StorageType>(StorageType.class);
|
||||
if (storagePolicyID != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
||||
bsp = bsps.getPolicy(file.getStoragePolicyID());
|
||||
}
|
||||
|
||||
|
||||
QuotaCounts oldCounts = file.storagespaceConsumed(null);
|
||||
long oldStoragespace;
|
||||
if (removed.snapshotINode != null) {
|
||||
short replication = removed.snapshotINode.getFileReplication();
|
||||
short currentRepl = file.getBlockReplication();
|
||||
if (currentRepl == 0) {
|
||||
long oldFileSizeNoRep = file.computeFileSize(true, true);
|
||||
if (replication > currentRepl) {
|
||||
long oldFileSizeNoRep = currentRepl == 0
|
||||
? file.computeFileSize(true, true)
|
||||
: oldCounts.getStorageSpace() / file.getBlockReplication();
|
||||
oldStoragespace = oldFileSizeNoRep * replication;
|
||||
oldCounts.setStorageSpace(oldStoragespace);
|
||||
|
||||
if (bsp != null) {
|
||||
List<StorageType> oldTypeChosen = bsp.chooseStorageTypes(replication);
|
||||
for (StorageType t : oldTypeChosen) {
|
||||
if (t.supportTypeQuota()) {
|
||||
typeSpaces.add(t, -oldFileSizeNoRep);
|
||||
oldCounts.addTypeSpace(t, oldFileSizeNoRep);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (replication > currentRepl) {
|
||||
long oldFileSizeNoRep = file.storagespaceConsumedNoReplication();
|
||||
oldStoragespace = oldFileSizeNoRep * replication;
|
||||
|
||||
if (bsp != null) {
|
||||
List<StorageType> oldTypeChosen = bsp.chooseStorageTypes(replication);
|
||||
for (StorageType t : oldTypeChosen) {
|
||||
if (t.supportTypeQuota()) {
|
||||
typeSpaces.add(t, -oldFileSizeNoRep);
|
||||
}
|
||||
}
|
||||
List<StorageType> newTypeChosen = bsp.chooseStorageTypes(currentRepl);
|
||||
for (StorageType t: newTypeChosen) {
|
||||
if (t.supportTypeQuota()) {
|
||||
typeSpaces.add(t, oldFileSizeNoRep);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
AclFeature aclFeature = removed.getSnapshotINode().getAclFeature();
|
||||
if (aclFeature != null) {
|
||||
AclStorage.removeAclFeature(aclFeature);
|
||||
@ -198,11 +183,9 @@ public QuotaCounts updateQuotaAndCollectBlocks(BlockStoragePolicySuite bsps, INo
|
||||
getDiffs().combineAndCollectSnapshotBlocks(
|
||||
bsps, file, removed, collectedBlocks, removedINodes);
|
||||
|
||||
long ssDelta = oldStoragespace - file.storagespaceConsumed();
|
||||
return new QuotaCounts.Builder().
|
||||
storageSpace(ssDelta).
|
||||
typeSpaces(typeSpaces).
|
||||
build();
|
||||
QuotaCounts current = file.storagespaceConsumed(bsp);
|
||||
oldCounts.subtract(current);
|
||||
return oldCounts;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -69,7 +69,7 @@ private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
||||
blockInfo.setBlockCollection(file);
|
||||
blockInfo.setGenerationStamp(genStamp);
|
||||
blockInfo.initializeBlockRecovery(genStamp);
|
||||
doReturn(true).when(file).removeLastBlock(any(Block.class));
|
||||
doReturn(blockInfo).when(file).removeLastBlock(any(Block.class));
|
||||
doReturn(true).when(file).isUnderConstruction();
|
||||
|
||||
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
|
||||
@ -152,7 +152,7 @@ public void testCommitBlockSynchronizationWithDelete() throws IOException {
|
||||
true, newTargets, null);
|
||||
|
||||
// Simulate removing the last block from the file.
|
||||
doReturn(false).when(file).removeLastBlock(any(Block.class));
|
||||
doReturn(null).when(file).removeLastBlock(any(Block.class));
|
||||
|
||||
// Repeat the call to make sure it does not throw
|
||||
namesystemSpy.commitBlockSynchronization(
|
||||
|
@ -17,19 +17,21 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
import org.junit.After;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Make sure we correctly update the quota usage for truncate.
|
||||
@ -45,204 +47,119 @@
|
||||
public class TestTruncateQuotaUpdate {
|
||||
private static final int BLOCKSIZE = 1024;
|
||||
private static final short REPLICATION = 4;
|
||||
private static final long DISKQUOTA = BLOCKSIZE * 20;
|
||||
static final long seed = 0L;
|
||||
private static final Path dir = new Path("/TestTruncateQuotaUpdate");
|
||||
private static final Path file = new Path(dir, "file");
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private FSDirectory fsdir;
|
||||
private DistributedFileSystem dfs;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
|
||||
fsdir = cluster.getNamesystem().getFSDirectory();
|
||||
dfs = cluster.getFileSystem();
|
||||
|
||||
dfs.mkdirs(dir);
|
||||
dfs.setQuota(dir, Long.MAX_VALUE - 1, DISKQUOTA);
|
||||
dfs.setQuotaByStorageType(dir, StorageType.DISK, DISKQUOTA);
|
||||
dfs.setStoragePolicy(dir, HdfsServerConstants.HOT_STORAGE_POLICY_NAME);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
private long nextMockBlockId;
|
||||
private long nextMockGenstamp;
|
||||
private long nextMockINodeId;
|
||||
|
||||
@Test
|
||||
public void testTruncateQuotaUpdate() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
public interface TruncateCase {
|
||||
public void prepare() throws Exception;
|
||||
public void run() throws Exception;
|
||||
}
|
||||
|
||||
private void testTruncate(long newLength, long expectedDiff,
|
||||
long expectedUsage) throws Exception {
|
||||
// before doing the real truncation, make sure the computation is correct
|
||||
final INodesInPath iip = fsdir.getINodesInPath4Write(file.toString());
|
||||
final INodeFile fileNode = iip.getLastINode().asFile();
|
||||
fileNode.recordModification(iip.getLatestSnapshotId(), true);
|
||||
final long diff = fileNode.computeQuotaDeltaForTruncate(newLength);
|
||||
Assert.assertEquals(expectedDiff, diff);
|
||||
|
||||
// do the real truncation
|
||||
dfs.truncate(file, newLength);
|
||||
// wait for truncate to finish
|
||||
TestFileTruncate.checkBlockRecovery(file, dfs);
|
||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||
.asDirectory();
|
||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
final long diskUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getTypeSpaces().get(StorageType.DISK);
|
||||
Assert.assertEquals(expectedUsage, spaceUsed);
|
||||
Assert.assertEquals(expectedUsage, diskUsed);
|
||||
}
|
||||
|
||||
/**
|
||||
* case 1~3
|
||||
*/
|
||||
private class TruncateWithoutSnapshot implements TruncateCase {
|
||||
@Override
|
||||
public void prepare() throws Exception {
|
||||
// original file size: 2.5 block
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE * 2 + BLOCKSIZE / 2,
|
||||
REPLICATION, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
public void testTruncateWithoutSnapshot() {
|
||||
INodeFile file = createMockFile(BLOCKSIZE * 2 + BLOCKSIZE / 2, REPLICATION);
|
||||
// case 1: first truncate to 1.5 blocks
|
||||
long newLength = BLOCKSIZE + BLOCKSIZE / 2;
|
||||
// we truncate 1 blocks, but not on the boundary, thus the diff should
|
||||
// be -block + (block - 0.5 block) = -0.5 block
|
||||
long diff = -BLOCKSIZE / 2;
|
||||
// the new quota usage should be BLOCKSIZE * 1.5 * replication
|
||||
long usage = (BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION;
|
||||
testTruncate(newLength, diff, usage);
|
||||
QuotaCounts count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(BLOCKSIZE + BLOCKSIZE / 2, null, count);
|
||||
Assert.assertEquals(-BLOCKSIZE / 2 * REPLICATION, count.getStorageSpace());
|
||||
|
||||
// case 2: truncate to 1 block
|
||||
newLength = BLOCKSIZE;
|
||||
// the diff should be -0.5 block since this is not on boundary
|
||||
diff = -BLOCKSIZE / 2;
|
||||
// after truncation the quota usage should be BLOCKSIZE * replication
|
||||
usage = BLOCKSIZE * REPLICATION;
|
||||
testTruncate(newLength, diff, usage);
|
||||
count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(BLOCKSIZE, null, count);
|
||||
Assert.assertEquals(-(BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION,
|
||||
count.getStorageSpace());
|
||||
|
||||
// case 3: truncate to 0
|
||||
testTruncate(0, -BLOCKSIZE, 0);
|
||||
}
|
||||
count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(0, null, count);
|
||||
Assert.assertEquals(-(BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION,
|
||||
count.getStorageSpace());
|
||||
}
|
||||
|
||||
/**
|
||||
* case 4~6
|
||||
*/
|
||||
private class TruncateWithSnapshot implements TruncateCase {
|
||||
@Override
|
||||
public void prepare() throws Exception {
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE * 2 + BLOCKSIZE / 2,
|
||||
REPLICATION, 0L);
|
||||
SnapshotTestHelper.createSnapshot(dfs, dir, "s1");
|
||||
}
|
||||
@Test
|
||||
public void testTruncateWithSnapshotNoDivergence() {
|
||||
INodeFile file = createMockFile(BLOCKSIZE * 2 + BLOCKSIZE / 2, REPLICATION);
|
||||
addSnapshotFeature(file, file.getBlocks());
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
// case 4: truncate to 1.5 blocks
|
||||
long newLength = BLOCKSIZE + BLOCKSIZE / 2;
|
||||
// all the blocks are in snapshot. truncate need to allocate a new block
|
||||
// diff should be +BLOCKSIZE
|
||||
long diff = BLOCKSIZE;
|
||||
// the new quota usage should be BLOCKSIZE * 3 * replication
|
||||
long usage = BLOCKSIZE * 3 * REPLICATION;
|
||||
testTruncate(newLength, diff, usage);
|
||||
QuotaCounts count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(BLOCKSIZE + BLOCKSIZE / 2, null, count);
|
||||
Assert.assertEquals(BLOCKSIZE * REPLICATION, count.getStorageSpace());
|
||||
|
||||
// case 5: truncate to 1 block
|
||||
newLength = BLOCKSIZE;
|
||||
// the block for truncation is not in snapshot, diff should be -0.5 block
|
||||
diff = -BLOCKSIZE / 2;
|
||||
// after truncation the quota usage should be 2.5 block * repl
|
||||
usage = (BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION;
|
||||
testTruncate(newLength, diff, usage);
|
||||
// case 2: truncate to 1 block
|
||||
count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(BLOCKSIZE, null, count);
|
||||
Assert.assertEquals(0, count.getStorageSpace());
|
||||
|
||||
// case 6: truncate to 0
|
||||
testTruncate(0, 0, usage);
|
||||
}
|
||||
// case 3: truncate to 0
|
||||
count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(0, null, count);
|
||||
Assert.assertEquals(0, count.getStorageSpace());
|
||||
}
|
||||
|
||||
/**
|
||||
* case 7~9
|
||||
*/
|
||||
private class TruncateWithSnapshot2 implements TruncateCase {
|
||||
@Override
|
||||
public void prepare() throws Exception {
|
||||
// original size: 2.5 blocks
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE * 2 + BLOCKSIZE / 2,
|
||||
REPLICATION, 0L);
|
||||
SnapshotTestHelper.createSnapshot(dfs, dir, "s1");
|
||||
|
||||
// truncate to 1.5 block
|
||||
dfs.truncate(file, BLOCKSIZE + BLOCKSIZE / 2);
|
||||
TestFileTruncate.checkBlockRecovery(file, dfs);
|
||||
|
||||
// append another 1 BLOCK
|
||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
// case 8: truncate to 2 blocks
|
||||
long newLength = BLOCKSIZE * 2;
|
||||
// the original 2.5 blocks are in snapshot. the block truncated is not
|
||||
// in snapshot. diff should be -0.5 block
|
||||
long diff = -BLOCKSIZE / 2;
|
||||
// the new quota usage should be BLOCKSIZE * 3.5 * replication
|
||||
long usage = (BLOCKSIZE * 3 + BLOCKSIZE / 2) * REPLICATION;
|
||||
testTruncate(newLength, diff, usage);
|
||||
@Test
|
||||
public void testTruncateWithSnapshotAndDivergence() {
|
||||
INodeFile file = createMockFile(BLOCKSIZE * 2 + BLOCKSIZE / 2, REPLICATION);
|
||||
BlockInfoContiguous[] blocks = new BlockInfoContiguous
|
||||
[file.getBlocks().length];
|
||||
System.arraycopy(file.getBlocks(), 0, blocks, 0, blocks.length);
|
||||
addSnapshotFeature(file, blocks);
|
||||
// Update the last two blocks in the current inode
|
||||
file.getBlocks()[1] = newBlock(BLOCKSIZE, REPLICATION);
|
||||
file.getBlocks()[2] = newBlock(BLOCKSIZE / 2, REPLICATION);
|
||||
|
||||
// case 7: truncate to 1.5 block
|
||||
newLength = BLOCKSIZE + BLOCKSIZE / 2;
|
||||
// the block for truncation is not in snapshot, diff should be
|
||||
// -0.5 block + (block - 0.5block) = 0
|
||||
diff = 0;
|
||||
// after truncation the quota usage should be 3 block * repl
|
||||
usage = (BLOCKSIZE * 3) * REPLICATION;
|
||||
testTruncate(newLength, diff, usage);
|
||||
// the block for truncation is not in snapshot, diff should be the same
|
||||
// as case 1
|
||||
QuotaCounts count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(BLOCKSIZE + BLOCKSIZE / 2, null, count);
|
||||
Assert.assertEquals(-BLOCKSIZE / 2 * REPLICATION, count.getStorageSpace());
|
||||
|
||||
// case 8: truncate to 2 blocks
|
||||
// the original 2.5 blocks are in snapshot. the block truncated is not
|
||||
// in snapshot. diff should be -0.5 block
|
||||
count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(BLOCKSIZE + BLOCKSIZE / 2, null, count);
|
||||
Assert.assertEquals(-BLOCKSIZE / 2 * REPLICATION, count.getStorageSpace());
|
||||
|
||||
// case 9: truncate to 0
|
||||
testTruncate(0, -BLOCKSIZE / 2,
|
||||
(BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION);
|
||||
}
|
||||
count = new QuotaCounts.Builder().build();
|
||||
file.computeQuotaDeltaForTruncate(0, null, count);
|
||||
Assert.assertEquals(-(BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, count
|
||||
.getStorageSpace());
|
||||
}
|
||||
|
||||
private void testTruncateQuotaUpdate(TruncateCase t) throws Exception {
|
||||
t.prepare();
|
||||
t.run();
|
||||
private INodeFile createMockFile(long size, short replication) {
|
||||
ArrayList<BlockInfoContiguous> blocks = new ArrayList<>();
|
||||
long createdSize = 0;
|
||||
while (createdSize < size) {
|
||||
long blockSize = Math.min(BLOCKSIZE, size - createdSize);
|
||||
BlockInfoContiguous bi = newBlock(blockSize, replication);
|
||||
blocks.add(bi);
|
||||
createdSize += BLOCKSIZE;
|
||||
}
|
||||
PermissionStatus perm = new PermissionStatus("foo", "bar", FsPermission
|
||||
.createImmutable((short) 0x1ff));
|
||||
return new INodeFile(
|
||||
++nextMockINodeId, new byte[0], perm, 0, 0,
|
||||
blocks.toArray(new BlockInfoContiguous[blocks.size()]), replication,
|
||||
BLOCKSIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuotaNoSnapshot() throws Exception {
|
||||
testTruncateQuotaUpdate(new TruncateWithoutSnapshot());
|
||||
private BlockInfoContiguous newBlock(long size, short replication) {
|
||||
Block b = new Block(++nextMockBlockId, size, ++nextMockGenstamp);
|
||||
return new BlockInfoContiguous(b, replication);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuotaWithSnapshot() throws Exception {
|
||||
testTruncateQuotaUpdate(new TruncateWithSnapshot());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuotaWithSnapshot2() throws Exception {
|
||||
testTruncateQuotaUpdate(new TruncateWithSnapshot2());
|
||||
private static void addSnapshotFeature(INodeFile file, BlockInfoContiguous[] blocks) {
|
||||
FileDiff diff = mock(FileDiff.class);
|
||||
when(diff.getBlocks()).thenReturn(blocks);
|
||||
FileDiffList diffList = new FileDiffList();
|
||||
@SuppressWarnings("unchecked")
|
||||
ArrayList<FileDiff> diffs = ((ArrayList<FileDiff>)Whitebox.getInternalState
|
||||
(diffList, "diffs"));
|
||||
diffs.add(diff);
|
||||
FileWithSnapshotFeature sf = new FileWithSnapshotFeature(diffList);
|
||||
file.addFeature(sf);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,89 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.apache.hadoop.fs.StorageType.DISK;
|
||||
import static org.apache.hadoop.fs.StorageType.SSD;
|
||||
import static org.mockito.Mockito.anyByte;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestFileWithSnapshotFeature {
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
private static final short REPL_3 = 3;
|
||||
private static final short REPL_1 = 1;
|
||||
|
||||
@Test
|
||||
public void testUpdateQuotaAndCollectBlocks() {
|
||||
FileDiffList diffs = new FileDiffList();
|
||||
FileWithSnapshotFeature sf = new FileWithSnapshotFeature(diffs);
|
||||
FileDiff diff = mock(FileDiff.class);
|
||||
BlockStoragePolicySuite bsps = mock(BlockStoragePolicySuite.class);
|
||||
BlockStoragePolicy bsp = mock(BlockStoragePolicy.class);
|
||||
BlockInfoContiguous[] blocks = new BlockInfoContiguous[] {
|
||||
new BlockInfoContiguous(new Block(1, BLOCK_SIZE, 1), REPL_1)
|
||||
};
|
||||
|
||||
// No snapshot
|
||||
INodeFile file = mock(INodeFile.class);
|
||||
when(file.getFileWithSnapshotFeature()).thenReturn(sf);
|
||||
when(file.getBlocks()).thenReturn(blocks);
|
||||
when(file.getStoragePolicyID()).thenReturn((byte) 1);
|
||||
when(bsps.getPolicy(anyByte())).thenReturn(bsp);
|
||||
INode.BlocksMapUpdateInfo collectedBlocks = mock(
|
||||
INode.BlocksMapUpdateInfo.class);
|
||||
ArrayList<INode> removedINodes = new ArrayList<>();
|
||||
QuotaCounts counts = sf.updateQuotaAndCollectBlocks(
|
||||
bsps, file, diff, collectedBlocks, removedINodes);
|
||||
Assert.assertEquals(0, counts.getStorageSpace());
|
||||
Assert.assertTrue(counts.getTypeSpaces().allLessOrEqual(0));
|
||||
|
||||
// INode only exists in the snapshot
|
||||
INodeFile snapshotINode = mock(INodeFile.class);
|
||||
when(file.getBlockReplication()).thenReturn(REPL_1);
|
||||
Whitebox.setInternalState(snapshotINode, "header", (long) REPL_3 << 48);
|
||||
Whitebox.setInternalState(diff, "snapshotINode", snapshotINode);
|
||||
when(diff.getSnapshotINode()).thenReturn(snapshotINode);
|
||||
|
||||
when(bsp.chooseStorageTypes(REPL_1))
|
||||
.thenReturn(Lists.newArrayList(SSD));
|
||||
when(bsp.chooseStorageTypes(REPL_3))
|
||||
.thenReturn(Lists.newArrayList(DISK));
|
||||
counts = sf.updateQuotaAndCollectBlocks(
|
||||
bsps, file, diff, collectedBlocks, removedINodes);
|
||||
Assert.assertEquals((REPL_3 - REPL_1) * BLOCK_SIZE,
|
||||
counts.getStorageSpace());
|
||||
Assert.assertEquals(BLOCK_SIZE, counts.getTypeSpaces().get(DISK));
|
||||
Assert.assertEquals(-BLOCK_SIZE, counts.getTypeSpaces().get(SSD));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user