HDFS-17408:Reduce the number of quota calculations in FSDirRenameOp (#6653). Contributed by lei w.
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org> Reviewed-by: Dinesh Chitlangia <dineshc@apache.org> Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
This commit is contained in:
parent
5f3eb446f7
commit
36c22400b2
@ -36,6 +36,8 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
class FSDirMkdirOp {
|
class FSDirMkdirOp {
|
||||||
@ -221,8 +223,8 @@ private static INodesInPath unprotectedMkdir(FSDirectory fsd, long inodeId,
|
|||||||
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
|
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
|
||||||
timestamp);
|
timestamp);
|
||||||
|
|
||||||
INodesInPath iip =
|
INodesInPath iip = fsd.addLastINode(parent, dir, permission.getPermission(),
|
||||||
fsd.addLastINode(parent, dir, permission.getPermission(), true);
|
true, Optional.empty());
|
||||||
if (iip != null && aclEntries != null) {
|
if (iip != null && aclEntries != null) {
|
||||||
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
|
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.util.Preconditions;
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
@ -43,6 +45,8 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
||||||
import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
|
import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
|
||||||
|
|
||||||
@ -68,14 +72,18 @@ static RenameResult renameToInt(
|
|||||||
* Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
|
* Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
|
||||||
* dstInodes[dstInodes.length-1]
|
* dstInodes[dstInodes.length-1]
|
||||||
*/
|
*/
|
||||||
private static void verifyQuotaForRename(FSDirectory fsd, INodesInPath src,
|
private static Pair<Optional<QuotaCounts>, Optional<QuotaCounts>> verifyQuotaForRename(
|
||||||
INodesInPath dst) throws QuotaExceededException {
|
FSDirectory fsd, INodesInPath src, INodesInPath dst) throws QuotaExceededException {
|
||||||
|
Optional<QuotaCounts> srcDelta = Optional.empty();
|
||||||
|
Optional<QuotaCounts> dstDelta = Optional.empty();
|
||||||
if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
|
if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
|
||||||
// Do not check quota if edits log is still being processed
|
// Do not check quota if edits log is still being processed
|
||||||
return;
|
return Pair.of(srcDelta, dstDelta);
|
||||||
}
|
}
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while(src.getINode(i) == dst.getINode(i)) { i++; }
|
while (src.getINode(i) == dst.getINode(i)) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
// src[i - 1] is the last common ancestor.
|
// src[i - 1] is the last common ancestor.
|
||||||
BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
|
BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
|
||||||
// Assume dstParent existence check done by callers.
|
// Assume dstParent existence check done by callers.
|
||||||
@ -88,13 +96,19 @@ private static void verifyQuotaForRename(FSDirectory fsd, INodesInPath src,
|
|||||||
final QuotaCounts delta = src.getLastINode()
|
final QuotaCounts delta = src.getLastINode()
|
||||||
.computeQuotaUsage(bsps, storagePolicyID, false,
|
.computeQuotaUsage(bsps, storagePolicyID, false,
|
||||||
Snapshot.CURRENT_STATE_ID);
|
Snapshot.CURRENT_STATE_ID);
|
||||||
|
QuotaCounts srcQuota = new QuotaCounts.Builder().quotaCount(delta).build();
|
||||||
|
srcDelta = Optional.of(srcQuota);
|
||||||
|
|
||||||
// Reduce the required quota by dst that is being removed
|
// Reduce the required quota by dst that is being removed
|
||||||
final INode dstINode = dst.getLastINode();
|
final INode dstINode = dst.getLastINode();
|
||||||
if (dstINode != null) {
|
if (dstINode != null) {
|
||||||
delta.subtract(dstINode.computeQuotaUsage(bsps));
|
QuotaCounts counts = dstINode.computeQuotaUsage(bsps);
|
||||||
|
QuotaCounts dstQuota = new QuotaCounts.Builder().quotaCount(counts).build();
|
||||||
|
dstDelta = Optional.of(dstQuota);
|
||||||
|
delta.subtract(counts);
|
||||||
}
|
}
|
||||||
FSDirectory.verifyQuota(dst, dst.length() - 1, delta, src.getINode(i - 1));
|
FSDirectory.verifyQuota(dst, dst.length() - 1, delta, src.getINode(i - 1));
|
||||||
|
return Pair.of(srcDelta, dstDelta);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -202,9 +216,10 @@ static INodesInPath unprotectedRenameTo(FSDirectory fsd,
|
|||||||
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);
|
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);
|
||||||
// Ensure dst has quota to accommodate rename
|
// Ensure dst has quota to accommodate rename
|
||||||
verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
|
verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
|
||||||
verifyQuotaForRename(fsd, srcIIP, dstIIP);
|
Pair<Optional<QuotaCounts>, Optional<QuotaCounts>> countPair =
|
||||||
|
verifyQuotaForRename(fsd, srcIIP, dstIIP);
|
||||||
|
|
||||||
RenameOperation tx = new RenameOperation(fsd, srcIIP, dstIIP);
|
RenameOperation tx = new RenameOperation(fsd, srcIIP, dstIIP, countPair);
|
||||||
|
|
||||||
boolean added = false;
|
boolean added = false;
|
||||||
|
|
||||||
@ -421,9 +436,10 @@ static RenameResult unprotectedRenameTo(FSDirectory fsd,
|
|||||||
|
|
||||||
// Ensure dst has quota to accommodate rename
|
// Ensure dst has quota to accommodate rename
|
||||||
verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
|
verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
|
||||||
verifyQuotaForRename(fsd, srcIIP, dstIIP);
|
Pair<Optional<QuotaCounts>, Optional<QuotaCounts>> quotaPair =
|
||||||
|
verifyQuotaForRename(fsd, srcIIP, dstIIP);
|
||||||
|
|
||||||
RenameOperation tx = new RenameOperation(fsd, srcIIP, dstIIP);
|
RenameOperation tx = new RenameOperation(fsd, srcIIP, dstIIP, quotaPair);
|
||||||
|
|
||||||
boolean undoRemoveSrc = true;
|
boolean undoRemoveSrc = true;
|
||||||
tx.removeSrc();
|
tx.removeSrc();
|
||||||
@ -638,16 +654,22 @@ private static class RenameOperation {
|
|||||||
private final byte[] srcChildName;
|
private final byte[] srcChildName;
|
||||||
private final boolean isSrcInSnapshot;
|
private final boolean isSrcInSnapshot;
|
||||||
private final boolean srcChildIsReference;
|
private final boolean srcChildIsReference;
|
||||||
private final QuotaCounts oldSrcCounts;
|
private final QuotaCounts oldSrcCountsInSnapshot;
|
||||||
|
private final boolean sameStoragePolicy;
|
||||||
|
private final Optional<QuotaCounts> srcSubTreeCount;
|
||||||
|
private final Optional<QuotaCounts> dstSubTreeCount;
|
||||||
private INode srcChild;
|
private INode srcChild;
|
||||||
private INode oldDstChild;
|
private INode oldDstChild;
|
||||||
|
|
||||||
RenameOperation(FSDirectory fsd, INodesInPath srcIIP, INodesInPath dstIIP) {
|
RenameOperation(FSDirectory fsd, INodesInPath srcIIP, INodesInPath dstIIP,
|
||||||
|
Pair<Optional<QuotaCounts>, Optional<QuotaCounts>> quotaPair) {
|
||||||
this.fsd = fsd;
|
this.fsd = fsd;
|
||||||
this.srcIIP = srcIIP;
|
this.srcIIP = srcIIP;
|
||||||
this.dstIIP = dstIIP;
|
this.dstIIP = dstIIP;
|
||||||
this.srcParentIIP = srcIIP.getParentINodesInPath();
|
this.srcParentIIP = srcIIP.getParentINodesInPath();
|
||||||
this.dstParentIIP = dstIIP.getParentINodesInPath();
|
this.dstParentIIP = dstIIP.getParentINodesInPath();
|
||||||
|
this.sameStoragePolicy = isSameStoragePolicy();
|
||||||
|
|
||||||
|
|
||||||
BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
|
BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
|
||||||
srcChild = this.srcIIP.getLastINode();
|
srcChild = this.srcIIP.getLastINode();
|
||||||
@ -672,7 +694,7 @@ private static class RenameOperation {
|
|||||||
// check srcChild for reference
|
// check srcChild for reference
|
||||||
srcRefDstSnapshot = srcChildIsReference ?
|
srcRefDstSnapshot = srcChildIsReference ?
|
||||||
srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
|
srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
|
||||||
oldSrcCounts = new QuotaCounts.Builder().build();
|
oldSrcCountsInSnapshot = new QuotaCounts.Builder().build();
|
||||||
if (isSrcInSnapshot) {
|
if (isSrcInSnapshot) {
|
||||||
final INodeReference.WithName withName = srcParent
|
final INodeReference.WithName withName = srcParent
|
||||||
.replaceChild4ReferenceWithName(srcChild, srcLatestSnapshotId);
|
.replaceChild4ReferenceWithName(srcChild, srcLatestSnapshotId);
|
||||||
@ -681,7 +703,7 @@ private static class RenameOperation {
|
|||||||
this.srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1,
|
this.srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1,
|
||||||
srcChild);
|
srcChild);
|
||||||
// get the counts before rename
|
// get the counts before rename
|
||||||
oldSrcCounts.add(withCount.getReferredINode().computeQuotaUsage(bsps));
|
oldSrcCountsInSnapshot.add(withCount.getReferredINode().computeQuotaUsage(bsps));
|
||||||
} else if (srcChildIsReference) {
|
} else if (srcChildIsReference) {
|
||||||
// srcChild is reference but srcChild is not in latest snapshot
|
// srcChild is reference but srcChild is not in latest snapshot
|
||||||
withCount = (INodeReference.WithCount) srcChild.asReference()
|
withCount = (INodeReference.WithCount) srcChild.asReference()
|
||||||
@ -689,6 +711,38 @@ private static class RenameOperation {
|
|||||||
} else {
|
} else {
|
||||||
withCount = null;
|
withCount = null;
|
||||||
}
|
}
|
||||||
|
// Set quota for src and dst, ignore src is in Snapshot or is Reference
|
||||||
|
this.srcSubTreeCount = withCount == null ?
|
||||||
|
quotaPair.getLeft() : Optional.empty();
|
||||||
|
this.dstSubTreeCount = quotaPair.getRight();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isSameStoragePolicy() {
|
||||||
|
final INode src = srcIIP.getLastINode();
|
||||||
|
final INode dst = dstIIP.getLastINode();
|
||||||
|
// If the source INode has a storagePolicyID, we should use
|
||||||
|
// its storagePolicyId to update dst`s quota usage.
|
||||||
|
if (src.isSetStoragePolicy()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte srcSp;
|
||||||
|
final byte dstSp;
|
||||||
|
if (dst == null) {
|
||||||
|
dstSp = dstIIP.getINode(-2).getStoragePolicyID();
|
||||||
|
} else if (dst.isSymlink()) {
|
||||||
|
dstSp = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||||
|
} else {
|
||||||
|
dstSp = dst.getStoragePolicyID();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (src.isSymlink()) {
|
||||||
|
srcSp = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||||
|
} else {
|
||||||
|
// Update src should use src·s storage policyID
|
||||||
|
srcSp = src.getStoragePolicyID();
|
||||||
|
}
|
||||||
|
return srcSp == dstSp;
|
||||||
}
|
}
|
||||||
|
|
||||||
long removeSrc() throws IOException {
|
long removeSrc() throws IOException {
|
||||||
@ -701,7 +755,9 @@ long removeSrc() throws IOException {
|
|||||||
throw new IOException(error);
|
throw new IOException(error);
|
||||||
} else {
|
} else {
|
||||||
// update the quota count if necessary
|
// update the quota count if necessary
|
||||||
fsd.updateCountForDelete(srcChild, srcIIP);
|
Optional<QuotaCounts> countOp = sameStoragePolicy ?
|
||||||
|
srcSubTreeCount : Optional.empty();
|
||||||
|
fsd.updateCountForDelete(srcChild, srcIIP, countOp);
|
||||||
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
|
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
|
||||||
return removedNum;
|
return removedNum;
|
||||||
}
|
}
|
||||||
@ -716,7 +772,9 @@ boolean removeSrc4OldRename() {
|
|||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
// update the quota count if necessary
|
// update the quota count if necessary
|
||||||
fsd.updateCountForDelete(srcChild, srcIIP);
|
Optional<QuotaCounts> countOp = sameStoragePolicy ?
|
||||||
|
srcSubTreeCount : Optional.empty();
|
||||||
|
fsd.updateCountForDelete(srcChild, srcIIP, countOp);
|
||||||
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
|
srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, null);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -727,7 +785,7 @@ long removeDst() {
|
|||||||
if (removedNum != -1) {
|
if (removedNum != -1) {
|
||||||
oldDstChild = dstIIP.getLastINode();
|
oldDstChild = dstIIP.getLastINode();
|
||||||
// update the quota count if necessary
|
// update the quota count if necessary
|
||||||
fsd.updateCountForDelete(oldDstChild, dstIIP);
|
fsd.updateCountForDelete(oldDstChild, dstIIP, dstSubTreeCount);
|
||||||
dstIIP = INodesInPath.replace(dstIIP, dstIIP.length() - 1, null);
|
dstIIP = INodesInPath.replace(dstIIP, dstIIP.length() - 1, null);
|
||||||
}
|
}
|
||||||
return removedNum;
|
return removedNum;
|
||||||
@ -745,7 +803,7 @@ INodesInPath addSourceToDestination() {
|
|||||||
toDst = new INodeReference.DstReference(dstParent.asDirectory(),
|
toDst = new INodeReference.DstReference(dstParent.asDirectory(),
|
||||||
withCount, dstIIP.getLatestSnapshotId());
|
withCount, dstIIP.getLatestSnapshotId());
|
||||||
}
|
}
|
||||||
return fsd.addLastINodeNoQuotaCheck(dstParentIIP, toDst);
|
return fsd.addLastINodeNoQuotaCheck(dstParentIIP, toDst, srcSubTreeCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateMtimeAndLease(long timestamp) {
|
void updateMtimeAndLease(long timestamp) {
|
||||||
@ -777,7 +835,9 @@ void restoreSource() {
|
|||||||
} else {
|
} else {
|
||||||
// srcParent is not an INodeDirectoryWithSnapshot, we only need to add
|
// srcParent is not an INodeDirectoryWithSnapshot, we only need to add
|
||||||
// the srcChild back
|
// the srcChild back
|
||||||
fsd.addLastINodeNoQuotaCheck(srcParentIIP, srcChild);
|
Optional<QuotaCounts> countOp = sameStoragePolicy ?
|
||||||
|
srcSubTreeCount : Optional.empty();
|
||||||
|
fsd.addLastINodeNoQuotaCheck(srcParentIIP, srcChild, countOp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -787,7 +847,7 @@ void restoreDst(BlockStoragePolicySuite bsps) {
|
|||||||
if (dstParent.isWithSnapshot()) {
|
if (dstParent.isWithSnapshot()) {
|
||||||
dstParent.undoRename4DstParent(bsps, oldDstChild, dstIIP.getLatestSnapshotId());
|
dstParent.undoRename4DstParent(bsps, oldDstChild, dstIIP.getLatestSnapshotId());
|
||||||
} else {
|
} else {
|
||||||
fsd.addLastINodeNoQuotaCheck(dstParentIIP, oldDstChild);
|
fsd.addLastINodeNoQuotaCheck(dstParentIIP, oldDstChild, dstSubTreeCount);
|
||||||
}
|
}
|
||||||
if (oldDstChild != null && oldDstChild.isReference()) {
|
if (oldDstChild != null && oldDstChild.isReference()) {
|
||||||
final INodeReference removedDstRef = oldDstChild.asReference();
|
final INodeReference removedDstRef = oldDstChild.asReference();
|
||||||
@ -826,7 +886,7 @@ void updateQuotasInSourceTree(BlockStoragePolicySuite bsps) {
|
|||||||
if (isSrcInSnapshot) {
|
if (isSrcInSnapshot) {
|
||||||
// get the counts after rename
|
// get the counts after rename
|
||||||
QuotaCounts newSrcCounts = srcChild.computeQuotaUsage(bsps, false);
|
QuotaCounts newSrcCounts = srcChild.computeQuotaUsage(bsps, false);
|
||||||
newSrcCounts.subtract(oldSrcCounts);
|
newSrcCounts.subtract(oldSrcCountsInSnapshot);
|
||||||
srcParent.addSpaceConsumed(newSrcCounts);
|
srcParent.addSpaceConsumed(newSrcCounts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,6 +79,7 @@
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.ForkJoinPool;
|
||||||
@ -1008,10 +1009,12 @@ public void updateCount(INodesInPath iip, INode.QuotaDelta quotaDelta,
|
|||||||
* when image/edits have been loaded and the file/dir to be deleted is not
|
* when image/edits have been loaded and the file/dir to be deleted is not
|
||||||
* contained in snapshots.
|
* contained in snapshots.
|
||||||
*/
|
*/
|
||||||
void updateCountForDelete(final INode inode, final INodesInPath iip) {
|
void updateCountForDelete(final INode inode, final INodesInPath iip,
|
||||||
|
final Optional<QuotaCounts> quotaCounts) {
|
||||||
if (getFSNamesystem().isImageLoaded() &&
|
if (getFSNamesystem().isImageLoaded() &&
|
||||||
!inode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
|
!inode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
|
||||||
QuotaCounts counts = inode.computeQuotaUsage(getBlockStoragePolicySuite());
|
QuotaCounts counts = quotaCounts.orElseGet(() ->
|
||||||
|
inode.computeQuotaUsage(getBlockStoragePolicySuite()));
|
||||||
unprotectedUpdateCount(iip, iip.length() - 1, counts.negation());
|
unprotectedUpdateCount(iip, iip.length() - 1, counts.negation());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1198,7 +1201,7 @@ INodesInPath addINode(INodesInPath existing, INode child,
|
|||||||
cacheName(child);
|
cacheName(child);
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
return addLastINode(existing, child, modes, true);
|
return addLastINode(existing, child, modes, true, Optional.empty());
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
@ -1349,7 +1352,8 @@ private void copyINodeDefaultAcl(INode child, FsPermission modes) {
|
|||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public INodesInPath addLastINode(INodesInPath existing, INode inode,
|
public INodesInPath addLastINode(INodesInPath existing, INode inode,
|
||||||
FsPermission modes, boolean checkQuota) throws QuotaExceededException {
|
FsPermission modes, boolean checkQuota, Optional<QuotaCounts> quotaCount)
|
||||||
|
throws QuotaExceededException {
|
||||||
assert existing.getLastINode() != null &&
|
assert existing.getLastINode() != null &&
|
||||||
existing.getLastINode().isDirectory();
|
existing.getLastINode().isDirectory();
|
||||||
|
|
||||||
@ -1380,13 +1384,10 @@ public INodesInPath addLastINode(INodesInPath existing, INode inode,
|
|||||||
// always verify inode name
|
// always verify inode name
|
||||||
verifyINodeName(inode.getLocalNameBytes());
|
verifyINodeName(inode.getLocalNameBytes());
|
||||||
|
|
||||||
final boolean isSrcSetSp = inode.isSetStoragePolicy();
|
final QuotaCounts counts = quotaCount.orElseGet(() -> inode.
|
||||||
final byte storagePolicyID = isSrcSetSp ?
|
computeQuotaUsage(getBlockStoragePolicySuite(),
|
||||||
inode.getLocalStoragePolicyID() :
|
parent.getStoragePolicyID(), false,
|
||||||
parent.getStoragePolicyID();
|
Snapshot.CURRENT_STATE_ID));
|
||||||
final QuotaCounts counts = inode
|
|
||||||
.computeQuotaUsage(getBlockStoragePolicySuite(),
|
|
||||||
storagePolicyID, false, Snapshot.CURRENT_STATE_ID);
|
|
||||||
updateCount(existing, pos, counts, checkQuota);
|
updateCount(existing, pos, counts, checkQuota);
|
||||||
|
|
||||||
boolean isRename = (inode.getParent() != null);
|
boolean isRename = (inode.getParent() != null);
|
||||||
@ -1404,10 +1405,11 @@ public INodesInPath addLastINode(INodesInPath existing, INode inode,
|
|||||||
return INodesInPath.append(existing, inode, inode.getLocalNameBytes());
|
return INodesInPath.append(existing, inode, inode.getLocalNameBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
INodesInPath addLastINodeNoQuotaCheck(INodesInPath existing, INode i) {
|
INodesInPath addLastINodeNoQuotaCheck(INodesInPath existing, INode i,
|
||||||
|
Optional<QuotaCounts> quotaCount) {
|
||||||
try {
|
try {
|
||||||
// All callers do not have create modes to pass.
|
// All callers do not have create modes to pass.
|
||||||
return addLastINode(existing, i, null, false);
|
return addLastINode(existing, i, null, false, quotaCount);
|
||||||
} catch (QuotaExceededException e) {
|
} catch (QuotaExceededException e) {
|
||||||
NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
|
NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,161 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
import org.apache.hadoop.fs.Options;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HOT_STORAGE_POLICY_NAME;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.ONESSD_STORAGE_POLICY_NAME;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class TestCorrectnessOfQuotaAfterRenameOp {
|
||||||
|
private static MiniDFSCluster cluster;
|
||||||
|
private static DistributedFileSystem dfs;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws IOException {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
dfs = cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQuotaUsageWhenRenameWithSameStoragePolicy() throws Exception {
|
||||||
|
final int fileLen = 1024;
|
||||||
|
final short replication = 3;
|
||||||
|
final long spaceQuota = dfs.getClient().getConf().getDefaultBlockSize() * 10;
|
||||||
|
final Path root = new Path(PathUtils.getTestDir(getClass()).getPath(),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
assertTrue(dfs.mkdirs(root));
|
||||||
|
// Init test dir
|
||||||
|
Path testParentDir1 = new Path(root, "test1");
|
||||||
|
assertTrue(dfs.mkdirs(testParentDir1));
|
||||||
|
Path testParentDir2 = new Path(root, "test2");
|
||||||
|
assertTrue(dfs.mkdirs(testParentDir2));
|
||||||
|
Path testParentDir3 = new Path(root, "test3");
|
||||||
|
assertTrue(dfs.mkdirs(testParentDir3));
|
||||||
|
// Set quota to update quota cache when rename
|
||||||
|
dfs.setQuota(testParentDir1, HdfsConstants.QUOTA_DONT_SET, spaceQuota);
|
||||||
|
dfs.setQuota(testParentDir2, HdfsConstants.QUOTA_DONT_SET, spaceQuota);
|
||||||
|
dfs.setQuota(testParentDir3, HdfsConstants.QUOTA_DONT_SET, spaceQuota);
|
||||||
|
|
||||||
|
final Path srcDir = new Path(testParentDir1, "src-dir");
|
||||||
|
Path file = new Path(srcDir, "file1");
|
||||||
|
DFSTestUtil.createFile(dfs, file, fileLen, replication, 0);
|
||||||
|
Path file2 = new Path(srcDir, "file2");
|
||||||
|
DFSTestUtil.createFile(dfs, file2, fileLen, replication, 0);
|
||||||
|
|
||||||
|
final Path dstDir1 = new Path(testParentDir2, "dst-dir");
|
||||||
|
ContentSummary cs1 = dfs.getContentSummary(testParentDir1);
|
||||||
|
// srcDir=/root/test1/src/dir
|
||||||
|
// dstDir1=/root/test2/dst-dir dstDir1 not exist
|
||||||
|
boolean rename = dfs.rename(srcDir, dstDir1);
|
||||||
|
assertEquals(true, rename);
|
||||||
|
ContentSummary cs2 = dfs.getContentSummary(testParentDir2);
|
||||||
|
assertTrue(cs1.equals(cs2));
|
||||||
|
|
||||||
|
|
||||||
|
final Path dstDir2 = new Path(testParentDir3, "dst-dir");
|
||||||
|
assertTrue(dfs.mkdirs(dstDir2));
|
||||||
|
ContentSummary cs3 = dfs.getContentSummary(testParentDir2);
|
||||||
|
|
||||||
|
//Src and dst must be same (all file or all dir)
|
||||||
|
// dstDir1=/root/test2/dst-dir
|
||||||
|
// dstDir2=/root/test3/dst-dir
|
||||||
|
dfs.rename(dstDir1, dstDir2, Options.Rename.OVERWRITE);
|
||||||
|
ContentSummary cs4 = dfs.getContentSummary(testParentDir3);
|
||||||
|
assertTrue(cs3.equals(cs4));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQuotaUsageWhenRenameWithDifferStoragePolicy() throws Exception {
|
||||||
|
final int fileLen = 1024;
|
||||||
|
final short replication = 3;
|
||||||
|
final long spaceQuota = dfs.getClient().getConf().getDefaultBlockSize() * 10;
|
||||||
|
final Path root = new Path(PathUtils.getTestDir(getClass()).getPath(),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
assertTrue(dfs.mkdirs(root));
|
||||||
|
|
||||||
|
// Init test dir
|
||||||
|
Path testParentDir1 = new Path(root, "test1");
|
||||||
|
assertTrue(dfs.mkdirs(testParentDir1));
|
||||||
|
Path testParentDir2 = new Path(root, "test2");
|
||||||
|
assertTrue(dfs.mkdirs(testParentDir2));
|
||||||
|
|
||||||
|
final Path srcDir = new Path(testParentDir1, "src-dir");
|
||||||
|
Path file = new Path(srcDir, "file1");
|
||||||
|
DFSTestUtil.createFile(dfs, file, fileLen, replication, 0);
|
||||||
|
Path file2 = new Path(srcDir, "file2");
|
||||||
|
DFSTestUtil.createFile(dfs, file2, fileLen, replication, 0);
|
||||||
|
|
||||||
|
// Set quota to update quota cache when rename
|
||||||
|
dfs.setStoragePolicy(testParentDir1, HOT_STORAGE_POLICY_NAME);
|
||||||
|
dfs.setQuota(testParentDir1, HdfsConstants.QUOTA_DONT_SET, spaceQuota);
|
||||||
|
dfs.setStoragePolicy(testParentDir2, ONESSD_STORAGE_POLICY_NAME);
|
||||||
|
dfs.setQuota(testParentDir2, HdfsConstants.QUOTA_DONT_SET, spaceQuota);
|
||||||
|
|
||||||
|
|
||||||
|
final Path dstDir1 = new Path(testParentDir2, "dst-dir");
|
||||||
|
assertTrue(dfs.mkdirs(dstDir1));
|
||||||
|
|
||||||
|
FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
|
||||||
|
BlockStoragePolicySuite bsps = namesystem.getBlockManager().getStoragePolicySuite();
|
||||||
|
INodesInPath iipSrc = namesystem.getFSDirectory().resolvePath(
|
||||||
|
null, srcDir.toString(), FSDirectory.DirOp.READ);
|
||||||
|
INodesInPath iipDst = namesystem.getFSDirectory().resolvePath(
|
||||||
|
null, dstDir1.toString(), FSDirectory.DirOp.READ);
|
||||||
|
|
||||||
|
// Src`s quotaCounts with dst storage policy
|
||||||
|
QuotaCounts srcCounts = iipSrc.getLastINode().computeQuotaUsage(bsps,
|
||||||
|
iipDst.getLastINode().getStoragePolicyID(),
|
||||||
|
false, Snapshot.CURRENT_STATE_ID);
|
||||||
|
|
||||||
|
|
||||||
|
QuotaCounts dstCountsBeforeRename = iipDst.getLastINode().
|
||||||
|
computeQuotaUsage(bsps, iipDst.getLastINode().getStoragePolicyID(),
|
||||||
|
false, Snapshot.CURRENT_STATE_ID);
|
||||||
|
|
||||||
|
boolean rename = dfs.rename(srcDir, dstDir1);
|
||||||
|
assertEquals(true, rename);
|
||||||
|
|
||||||
|
QuotaCounts dstCountsAfterRename = iipDst.getLastINode().
|
||||||
|
computeQuotaUsage(bsps, iipDst.getLastINode().getStoragePolicyID(),
|
||||||
|
false, Snapshot.CURRENT_STATE_ID);
|
||||||
|
|
||||||
|
QuotaCounts subtract = dstCountsAfterRename.subtract(dstCountsBeforeRename);
|
||||||
|
assertTrue(subtract.equals(srcCounts));
|
||||||
|
}
|
||||||
|
}
|
@ -1640,7 +1640,7 @@ public void testRenameUndo_5() throws Exception {
|
|||||||
final Path foo2 = new Path(subdir2, foo.getName());
|
final Path foo2 = new Path(subdir2, foo.getName());
|
||||||
FSDirectory fsdir2 = Mockito.spy(fsdir);
|
FSDirectory fsdir2 = Mockito.spy(fsdir);
|
||||||
Mockito.doThrow(new NSQuotaExceededException("fake exception")).when(fsdir2)
|
Mockito.doThrow(new NSQuotaExceededException("fake exception")).when(fsdir2)
|
||||||
.addLastINode(any(), any(), any(), anyBoolean());
|
.addLastINode(any(), any(), any(), anyBoolean(), any());
|
||||||
Whitebox.setInternalState(fsn, "dir", fsdir2);
|
Whitebox.setInternalState(fsn, "dir", fsdir2);
|
||||||
// rename /test/dir1/foo to /test/dir2/subdir2/foo.
|
// rename /test/dir1/foo to /test/dir2/subdir2/foo.
|
||||||
// FSDirectory#verifyQuota4Rename will pass since the remaining quota is 2.
|
// FSDirectory#verifyQuota4Rename will pass since the remaining quota is 2.
|
||||||
|
Loading…
Reference in New Issue
Block a user