HDFS-8493. Consolidate truncate() related implementation in a single class. Contributed by Rakesh R.
This commit is contained in:
parent
8e333720e1
commit
d3797f9f3c
@ -685,6 +685,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and
|
HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and
|
||||||
DatanodeStorageInfo. (Zhe Zhang via wang)
|
DatanodeStorageInfo. (Zhe Zhang via wang)
|
||||||
|
|
||||||
|
HDFS-8493. Consolidate truncate() related implementation in a single class.
|
||||||
|
(Rakesh R via wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
@ -0,0 +1,361 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to perform truncate operation.
|
||||||
|
*/
|
||||||
|
final class FSDirTruncateOp {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private constructor for preventing FSDirTruncateOp object creation.
|
||||||
|
* Static-only class.
|
||||||
|
*/
|
||||||
|
private FSDirTruncateOp() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate a file to a given size.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param srcArg path name
|
||||||
|
* @param newLength the target file size
|
||||||
|
* @param clientName client name
|
||||||
|
* @param clientMachine client machine info
|
||||||
|
* @param mtime modified time
|
||||||
|
* @param toRemoveBlocks to be removed blocks
|
||||||
|
* @param pc permission checker to check fs permission
|
||||||
|
* @return tuncate result
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static TruncateResult truncate(final FSNamesystem fsn, final String srcArg,
|
||||||
|
final long newLength, final String clientName,
|
||||||
|
final String clientMachine, final long mtime,
|
||||||
|
final BlocksMapUpdateInfo toRemoveBlocks, final FSPermissionChecker pc)
|
||||||
|
throws IOException, UnresolvedLinkException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
|
||||||
|
FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
byte[][] pathComponents = FSDirectory
|
||||||
|
.getPathComponentsForReservedPath(srcArg);
|
||||||
|
final String src;
|
||||||
|
final INodesInPath iip;
|
||||||
|
final boolean onBlockBoundary;
|
||||||
|
Block truncateBlock = null;
|
||||||
|
fsd.writeLock();
|
||||||
|
try {
|
||||||
|
src = fsd.resolvePath(pc, srcArg, pathComponents);
|
||||||
|
iip = fsd.getINodesInPath4Write(src, true);
|
||||||
|
if (fsn.isPermissionEnabled()) {
|
||||||
|
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||||
|
}
|
||||||
|
INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
|
||||||
|
final BlockStoragePolicy lpPolicy = fsn.getBlockManager()
|
||||||
|
.getStoragePolicy("LAZY_PERSIST");
|
||||||
|
|
||||||
|
if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Cannot truncate lazy persist file " + src);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the file is already being truncated with the same length
|
||||||
|
final BlockInfo last = file.getLastBlock();
|
||||||
|
if (last != null && last.getBlockUCState()
|
||||||
|
== BlockUCState.UNDER_RECOVERY) {
|
||||||
|
final Block truncatedBlock = ((BlockInfoUnderConstruction) last)
|
||||||
|
.getTruncateBlock();
|
||||||
|
if (truncatedBlock != null) {
|
||||||
|
final long truncateLength = file.computeFileSize(false, false)
|
||||||
|
+ truncatedBlock.getNumBytes();
|
||||||
|
if (newLength == truncateLength) {
|
||||||
|
return new TruncateResult(false, fsd.getAuditFileInfo(iip));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opening an existing file for truncate. May need lease recovery.
|
||||||
|
fsn.recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE, iip, src,
|
||||||
|
clientName, clientMachine, false);
|
||||||
|
// Truncate length check.
|
||||||
|
long oldLength = file.computeFileSize();
|
||||||
|
if (oldLength == newLength) {
|
||||||
|
return new TruncateResult(true, fsd.getAuditFileInfo(iip));
|
||||||
|
}
|
||||||
|
if (oldLength < newLength) {
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Cannot truncate to a larger file size. Current size: " + oldLength
|
||||||
|
+ ", truncate size: " + newLength + ".");
|
||||||
|
}
|
||||||
|
// Perform INodeFile truncation.
|
||||||
|
final QuotaCounts delta = new QuotaCounts.Builder().build();
|
||||||
|
onBlockBoundary = unprotectedTruncate(fsn, iip, newLength,
|
||||||
|
toRemoveBlocks, mtime, delta);
|
||||||
|
if (!onBlockBoundary) {
|
||||||
|
// Open file for write, but don't log into edits
|
||||||
|
long lastBlockDelta = file.computeFileSize() - newLength;
|
||||||
|
assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
|
||||||
|
truncateBlock = prepareFileForTruncate(fsn, iip, clientName,
|
||||||
|
clientMachine, lastBlockDelta, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// update the quota: use the preferred block size for UC block
|
||||||
|
fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
|
||||||
|
} finally {
|
||||||
|
fsd.writeUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
fsn.getEditLog().logTruncate(src, clientName, clientMachine, newLength,
|
||||||
|
mtime, truncateBlock);
|
||||||
|
return new TruncateResult(onBlockBoundary, fsd.getAuditFileInfo(iip));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unprotected truncate implementation. Unlike
|
||||||
|
* {@link FSDirTruncateOp#truncate}, this will not schedule block recovery.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param src path name
|
||||||
|
* @param clientName client name
|
||||||
|
* @param clientMachine client machine info
|
||||||
|
* @param newLength the target file size
|
||||||
|
* @param mtime modified time
|
||||||
|
* @param truncateBlock truncate block
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static void unprotectedTruncate(final FSNamesystem fsn, final String src,
|
||||||
|
final String clientName, final String clientMachine,
|
||||||
|
final long newLength, final long mtime, final Block truncateBlock)
|
||||||
|
throws UnresolvedLinkException, QuotaExceededException,
|
||||||
|
SnapshotAccessControlException, IOException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
|
||||||
|
FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
INodesInPath iip = fsd.getINodesInPath(src, true);
|
||||||
|
INodeFile file = iip.getLastINode().asFile();
|
||||||
|
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
|
||||||
|
boolean onBlockBoundary = unprotectedTruncate(fsn, iip, newLength,
|
||||||
|
collectedBlocks, mtime, null);
|
||||||
|
|
||||||
|
if (!onBlockBoundary) {
|
||||||
|
BlockInfo oldBlock = file.getLastBlock();
|
||||||
|
Block tBlk = prepareFileForTruncate(fsn, iip, clientName, clientMachine,
|
||||||
|
file.computeFileSize() - newLength, truncateBlock);
|
||||||
|
assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) &&
|
||||||
|
tBlk.getNumBytes() == truncateBlock.getNumBytes() :
|
||||||
|
"Should be the same block.";
|
||||||
|
if (oldBlock.getBlockId() != tBlk.getBlockId()
|
||||||
|
&& !file.isBlockInLatestSnapshot(oldBlock)) {
|
||||||
|
fsn.getBlockManager().removeBlockFromMap(oldBlock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert onBlockBoundary == (truncateBlock == null) :
|
||||||
|
"truncateBlock is null iff on block boundary: " + truncateBlock;
|
||||||
|
fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert current INode to UnderConstruction. Recreate lease. Create new
|
||||||
|
* block for the truncated copy. Schedule truncation of the replicas.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param iip inodes in the path containing the file
|
||||||
|
* @param leaseHolder lease holder
|
||||||
|
* @param clientMachine client machine info
|
||||||
|
* @param lastBlockDelta last block delta size
|
||||||
|
* @param newBlock new block
|
||||||
|
* @return the returned block will be written to editLog and passed back
|
||||||
|
* into this method upon loading.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
|
||||||
|
String leaseHolder, String clientMachine, long lastBlockDelta,
|
||||||
|
Block newBlock) throws IOException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
|
||||||
|
INodeFile file = iip.getLastINode().asFile();
|
||||||
|
file.recordModification(iip.getLatestSnapshotId());
|
||||||
|
file.toUnderConstruction(leaseHolder, clientMachine);
|
||||||
|
assert file.isUnderConstruction() : "inode should be under construction.";
|
||||||
|
fsn.getLeaseManager().addLease(
|
||||||
|
file.getFileUnderConstructionFeature().getClientName(), file.getId());
|
||||||
|
boolean shouldRecoverNow = (newBlock == null);
|
||||||
|
BlockInfo oldBlock = file.getLastBlock();
|
||||||
|
boolean shouldCopyOnTruncate = shouldCopyOnTruncate(fsn, file, oldBlock);
|
||||||
|
if (newBlock == null) {
|
||||||
|
newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock() : new Block(
|
||||||
|
oldBlock.getBlockId(), oldBlock.getNumBytes(),
|
||||||
|
fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock(
|
||||||
|
oldBlock)));
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockInfoUnderConstruction truncatedBlockUC;
|
||||||
|
if (shouldCopyOnTruncate) {
|
||||||
|
// Add new truncateBlock into blocksMap and
|
||||||
|
// use oldBlock as a source for copy-on-truncate recovery
|
||||||
|
truncatedBlockUC = new BlockInfoUnderConstructionContiguous(newBlock,
|
||||||
|
file.getPreferredBlockReplication());
|
||||||
|
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
||||||
|
truncatedBlockUC.setTruncateBlock(oldBlock);
|
||||||
|
file.setLastBlock(truncatedBlockUC,
|
||||||
|
fsn.getBlockManager().getStorages(oldBlock));
|
||||||
|
fsn.getBlockManager().addBlockCollection(truncatedBlockUC, file);
|
||||||
|
|
||||||
|
NameNode.stateChangeLog.debug(
|
||||||
|
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
|
||||||
|
+ " size {} new block {} old block {}",
|
||||||
|
truncatedBlockUC.getNumBytes(), newBlock,
|
||||||
|
truncatedBlockUC.getTruncateBlock());
|
||||||
|
} else {
|
||||||
|
// Use new generation stamp for in-place truncate recovery
|
||||||
|
fsn.getBlockManager().convertLastBlockToUnderConstruction(file,
|
||||||
|
lastBlockDelta);
|
||||||
|
oldBlock = file.getLastBlock();
|
||||||
|
assert !oldBlock.isComplete() : "oldBlock should be under construction";
|
||||||
|
truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
|
||||||
|
truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
|
||||||
|
truncatedBlockUC.getTruncateBlock().setNumBytes(
|
||||||
|
oldBlock.getNumBytes() - lastBlockDelta);
|
||||||
|
truncatedBlockUC.getTruncateBlock().setGenerationStamp(
|
||||||
|
newBlock.getGenerationStamp());
|
||||||
|
|
||||||
|
NameNode.stateChangeLog.debug(
|
||||||
|
"BLOCK* prepareFileForTruncate: {} Scheduling in-place block "
|
||||||
|
+ "truncate to new size {}", truncatedBlockUC.getTruncateBlock()
|
||||||
|
.getNumBytes(), truncatedBlockUC);
|
||||||
|
}
|
||||||
|
if (shouldRecoverNow) {
|
||||||
|
truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
return newBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate has the following properties:
|
||||||
|
* 1.) Any block deletions occur now.
|
||||||
|
* 2.) INode length is truncated now - new clients can only read up to
|
||||||
|
* the truncated length.
|
||||||
|
* 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
|
||||||
|
* 4.) NN will trigger DN truncation recovery and waits for DNs to report.
|
||||||
|
* 5.) File is considered UNDER_RECOVERY until truncation recovery
|
||||||
|
* completes.
|
||||||
|
* 6.) Soft and hard Lease expiration require truncation recovery to
|
||||||
|
* complete.
|
||||||
|
*
|
||||||
|
* @return true if on the block boundary or false if recovery is need
|
||||||
|
*/
|
||||||
|
private static boolean unprotectedTruncate(FSNamesystem fsn,
|
||||||
|
INodesInPath iip, long newLength, BlocksMapUpdateInfo collectedBlocks,
|
||||||
|
long mtime, QuotaCounts delta) throws IOException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
|
||||||
|
INodeFile file = iip.getLastINode().asFile();
|
||||||
|
int latestSnapshot = iip.getLatestSnapshotId();
|
||||||
|
file.recordModification(latestSnapshot, true);
|
||||||
|
|
||||||
|
verifyQuotaForTruncate(fsn, iip, file, newLength, delta);
|
||||||
|
|
||||||
|
long remainingLength =
|
||||||
|
file.collectBlocksBeyondMax(newLength, collectedBlocks);
|
||||||
|
file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
|
||||||
|
file.setModificationTime(mtime);
|
||||||
|
// return whether on a block boundary
|
||||||
|
return (remainingLength - newLength) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyQuotaForTruncate(FSNamesystem fsn,
|
||||||
|
INodesInPath iip, INodeFile file, long newLength, QuotaCounts delta)
|
||||||
|
throws QuotaExceededException {
|
||||||
|
FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
|
||||||
|
// Do not check quota if edit log is still being processed
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final BlockStoragePolicy policy = fsd.getBlockStoragePolicySuite()
|
||||||
|
.getPolicy(file.getStoragePolicyID());
|
||||||
|
file.computeQuotaDeltaForTruncate(newLength, policy, delta);
|
||||||
|
fsd.readLock();
|
||||||
|
try {
|
||||||
|
FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
|
||||||
|
} finally {
|
||||||
|
fsd.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines if a replica needs to be copied on truncate or
|
||||||
|
* can be truncated in place.
|
||||||
|
*/
|
||||||
|
private static boolean shouldCopyOnTruncate(FSNamesystem fsn, INodeFile file,
|
||||||
|
BlockInfo blk) {
|
||||||
|
if (!fsn.isUpgradeFinalized()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (fsn.isRollingUpgrade()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return file.isBlockInLatestSnapshot(blk);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Result of truncate operation.
|
||||||
|
*/
|
||||||
|
static class TruncateResult {
|
||||||
|
private final boolean result;
|
||||||
|
private final HdfsFileStatus stat;
|
||||||
|
|
||||||
|
public TruncateResult(boolean result, HdfsFileStatus stat) {
|
||||||
|
this.result = result;
|
||||||
|
this.stat = stat;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if client does not need to wait for block recovery,
|
||||||
|
* false if client needs to wait for block recovery.
|
||||||
|
*/
|
||||||
|
boolean getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return file information.
|
||||||
|
*/
|
||||||
|
HdfsFileStatus getFileStatus() {
|
||||||
|
return stat;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -38,7 +38,6 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
||||||
@ -49,11 +48,9 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
|
||||||
import org.apache.hadoop.hdfs.util.ByteArray;
|
import org.apache.hadoop.hdfs.util.ByteArray;
|
||||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
@ -907,98 +904,6 @@ public INodeMap getINodeMap() {
|
|||||||
return inodeMap;
|
return inodeMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* FSEditLogLoader implementation.
|
|
||||||
* Unlike FSNamesystem.truncate, this will not schedule block recovery.
|
|
||||||
*/
|
|
||||||
void unprotectedTruncate(String src, String clientName, String clientMachine,
|
|
||||||
long newLength, long mtime, Block truncateBlock)
|
|
||||||
throws UnresolvedLinkException, QuotaExceededException,
|
|
||||||
SnapshotAccessControlException, IOException {
|
|
||||||
INodesInPath iip = getINodesInPath(src, true);
|
|
||||||
INodeFile file = iip.getLastINode().asFile();
|
|
||||||
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
|
|
||||||
boolean onBlockBoundary =
|
|
||||||
unprotectedTruncate(iip, newLength, collectedBlocks, mtime, null);
|
|
||||||
|
|
||||||
if(! onBlockBoundary) {
|
|
||||||
BlockInfo oldBlock = file.getLastBlock();
|
|
||||||
Block tBlk =
|
|
||||||
getFSNamesystem().prepareFileForTruncate(iip,
|
|
||||||
clientName, clientMachine, file.computeFileSize() - newLength,
|
|
||||||
truncateBlock);
|
|
||||||
assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) &&
|
|
||||||
tBlk.getNumBytes() == truncateBlock.getNumBytes() :
|
|
||||||
"Should be the same block.";
|
|
||||||
if(oldBlock.getBlockId() != tBlk.getBlockId() &&
|
|
||||||
!file.isBlockInLatestSnapshot(oldBlock)) {
|
|
||||||
getBlockManager().removeBlockFromMap(oldBlock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert onBlockBoundary == (truncateBlock == null) :
|
|
||||||
"truncateBlock is null iff on block boundary: " + truncateBlock;
|
|
||||||
getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean truncate(INodesInPath iip, long newLength,
|
|
||||||
BlocksMapUpdateInfo collectedBlocks,
|
|
||||||
long mtime, QuotaCounts delta)
|
|
||||||
throws IOException {
|
|
||||||
writeLock();
|
|
||||||
try {
|
|
||||||
return unprotectedTruncate(iip, newLength, collectedBlocks, mtime, delta);
|
|
||||||
} finally {
|
|
||||||
writeUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Truncate has the following properties:
|
|
||||||
* 1.) Any block deletions occur now.
|
|
||||||
* 2.) INode length is truncated now – new clients can only read up to
|
|
||||||
* the truncated length.
|
|
||||||
* 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
|
|
||||||
* 4.) NN will trigger DN truncation recovery and waits for DNs to report.
|
|
||||||
* 5.) File is considered UNDER_RECOVERY until truncation recovery completes.
|
|
||||||
* 6.) Soft and hard Lease expiration require truncation recovery to complete.
|
|
||||||
*
|
|
||||||
* @return true if on the block boundary or false if recovery is need
|
|
||||||
*/
|
|
||||||
boolean unprotectedTruncate(INodesInPath iip, long newLength,
|
|
||||||
BlocksMapUpdateInfo collectedBlocks,
|
|
||||||
long mtime, QuotaCounts delta) throws IOException {
|
|
||||||
assert hasWriteLock();
|
|
||||||
INodeFile file = iip.getLastINode().asFile();
|
|
||||||
int latestSnapshot = iip.getLatestSnapshotId();
|
|
||||||
file.recordModification(latestSnapshot, true);
|
|
||||||
|
|
||||||
verifyQuotaForTruncate(iip, file, newLength, delta);
|
|
||||||
|
|
||||||
long remainingLength =
|
|
||||||
file.collectBlocksBeyondMax(newLength, collectedBlocks);
|
|
||||||
file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
|
|
||||||
file.setModificationTime(mtime);
|
|
||||||
// return whether on a block boundary
|
|
||||||
return (remainingLength - newLength) == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyQuotaForTruncate(INodesInPath iip, INodeFile file,
|
|
||||||
long newLength, QuotaCounts delta) throws QuotaExceededException {
|
|
||||||
if (!getFSNamesystem().isImageLoaded() || shouldSkipQuotaChecks()) {
|
|
||||||
// Do not check quota if edit log is still being processed
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final BlockStoragePolicy policy = getBlockStoragePolicySuite()
|
|
||||||
.getPolicy(file.getStoragePolicyID());
|
|
||||||
file.computeQuotaDeltaForTruncate(newLength, policy, delta);
|
|
||||||
readLock();
|
|
||||||
try {
|
|
||||||
verifyQuota(iip, iip.length() - 1, delta, null);
|
|
||||||
} finally {
|
|
||||||
readUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is always called with writeLock of FSDirectory held.
|
* This method is always called with writeLock of FSDirectory held.
|
||||||
*/
|
*/
|
||||||
|
@ -901,9 +901,9 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
|
|||||||
}
|
}
|
||||||
case OP_TRUNCATE: {
|
case OP_TRUNCATE: {
|
||||||
TruncateOp truncateOp = (TruncateOp) op;
|
TruncateOp truncateOp = (TruncateOp) op;
|
||||||
fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName,
|
FSDirTruncateOp.unprotectedTruncate(fsNamesys, truncateOp.src,
|
||||||
truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp,
|
truncateOp.clientName, truncateOp.clientMachine,
|
||||||
truncateOp.truncateBlock);
|
truncateOp.newLength, truncateOp.timestamp, truncateOp.truncateBlock);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case OP_SET_STORAGE_POLICY: {
|
case OP_SET_STORAGE_POLICY: {
|
||||||
|
@ -201,7 +201,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
@ -1831,218 +1830,44 @@ void setTimes(String src, long mtime, long atime) throws IOException {
|
|||||||
* block recovery to truncate the last block of the file.
|
* block recovery to truncate the last block of the file.
|
||||||
*
|
*
|
||||||
* @return true if client does not need to wait for block recovery,
|
* @return true if client does not need to wait for block recovery,
|
||||||
* false if client needs to wait for block recovery.
|
* false if client needs to wait for block recovery.
|
||||||
*/
|
*/
|
||||||
boolean truncate(String src, long newLength,
|
boolean truncate(String src, long newLength, String clientName,
|
||||||
String clientName, String clientMachine,
|
String clientMachine, long mtime) throws IOException,
|
||||||
long mtime)
|
UnresolvedLinkException {
|
||||||
throws IOException, UnresolvedLinkException {
|
|
||||||
requireEffectiveLayoutVersionForFeature(Feature.TRUNCATE);
|
requireEffectiveLayoutVersionForFeature(Feature.TRUNCATE);
|
||||||
boolean ret;
|
final FSDirTruncateOp.TruncateResult r;
|
||||||
try {
|
try {
|
||||||
ret = truncateInt(src, newLength, clientName, clientMachine, mtime);
|
NameNode.stateChangeLog.debug(
|
||||||
|
"DIR* NameSystem.truncate: src={} newLength={}", src, newLength);
|
||||||
|
if (newLength < 0) {
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Cannot truncate to a negative file size: " + newLength + ".");
|
||||||
|
}
|
||||||
|
final FSPermissionChecker pc = getPermissionChecker();
|
||||||
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
writeLock();
|
||||||
|
BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
|
||||||
|
try {
|
||||||
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
checkNameNodeSafeMode("Cannot truncate for " + src);
|
||||||
|
r = FSDirTruncateOp.truncate(this, src, newLength, clientName,
|
||||||
|
clientMachine, mtime, toRemoveBlocks, pc);
|
||||||
|
} finally {
|
||||||
|
writeUnlock();
|
||||||
|
}
|
||||||
|
getEditLog().logSync();
|
||||||
|
if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
|
||||||
|
removeBlocks(toRemoveBlocks);
|
||||||
|
toRemoveBlocks.clear();
|
||||||
|
}
|
||||||
|
logAuditEvent(true, "truncate", src, null, r.getFileStatus());
|
||||||
} catch (AccessControlException e) {
|
} catch (AccessControlException e) {
|
||||||
logAuditEvent(false, "truncate", src);
|
logAuditEvent(false, "truncate", src);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return ret;
|
return r.getResult();
|
||||||
}
|
|
||||||
|
|
||||||
boolean truncateInt(String srcArg, long newLength,
|
|
||||||
String clientName, String clientMachine,
|
|
||||||
long mtime)
|
|
||||||
throws IOException, UnresolvedLinkException {
|
|
||||||
String src = srcArg;
|
|
||||||
NameNode.stateChangeLog.debug(
|
|
||||||
"DIR* NameSystem.truncate: src={} newLength={}", src, newLength);
|
|
||||||
if (newLength < 0) {
|
|
||||||
throw new HadoopIllegalArgumentException(
|
|
||||||
"Cannot truncate to a negative file size: " + newLength + ".");
|
|
||||||
}
|
|
||||||
HdfsFileStatus stat = null;
|
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
boolean res;
|
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
|
||||||
writeLock();
|
|
||||||
BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
|
|
||||||
try {
|
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
checkNameNodeSafeMode("Cannot truncate for " + src);
|
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
|
||||||
res = truncateInternal(src, newLength, clientName,
|
|
||||||
clientMachine, mtime, pc, toRemoveBlocks);
|
|
||||||
stat = dir.getAuditFileInfo(dir.getINodesInPath4Write(src, false));
|
|
||||||
} finally {
|
|
||||||
writeUnlock();
|
|
||||||
}
|
|
||||||
getEditLog().logSync();
|
|
||||||
if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
|
|
||||||
removeBlocks(toRemoveBlocks);
|
|
||||||
toRemoveBlocks.clear();
|
|
||||||
}
|
|
||||||
logAuditEvent(true, "truncate", src, null, stat);
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Truncate a file to a given size
|
|
||||||
* Update the count at each ancestor directory with quota
|
|
||||||
*/
|
|
||||||
boolean truncateInternal(String src, long newLength,
|
|
||||||
String clientName, String clientMachine,
|
|
||||||
long mtime, FSPermissionChecker pc,
|
|
||||||
BlocksMapUpdateInfo toRemoveBlocks)
|
|
||||||
throws IOException, UnresolvedLinkException {
|
|
||||||
assert hasWriteLock();
|
|
||||||
INodesInPath iip = dir.getINodesInPath4Write(src, true);
|
|
||||||
if (isPermissionEnabled) {
|
|
||||||
dir.checkPathAccess(pc, iip, FsAction.WRITE);
|
|
||||||
}
|
|
||||||
INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
|
|
||||||
final BlockStoragePolicy lpPolicy =
|
|
||||||
blockManager.getStoragePolicy("LAZY_PERSIST");
|
|
||||||
|
|
||||||
if (lpPolicy != null &&
|
|
||||||
lpPolicy.getId() == file.getStoragePolicyID()) {
|
|
||||||
throw new UnsupportedOperationException(
|
|
||||||
"Cannot truncate lazy persist file " + src);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the file is already being truncated with the same length
|
|
||||||
final BlockInfo last = file.getLastBlock();
|
|
||||||
if (last != null && last.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
|
|
||||||
final Block truncateBlock
|
|
||||||
= ((BlockInfoUnderConstruction)last).getTruncateBlock();
|
|
||||||
if (truncateBlock != null) {
|
|
||||||
final long truncateLength = file.computeFileSize(false, false)
|
|
||||||
+ truncateBlock.getNumBytes();
|
|
||||||
if (newLength == truncateLength) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Opening an existing file for truncate. May need lease recovery.
|
|
||||||
recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE,
|
|
||||||
iip, src, clientName, clientMachine, false);
|
|
||||||
// Truncate length check.
|
|
||||||
long oldLength = file.computeFileSize();
|
|
||||||
if(oldLength == newLength) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if(oldLength < newLength) {
|
|
||||||
throw new HadoopIllegalArgumentException(
|
|
||||||
"Cannot truncate to a larger file size. Current size: " + oldLength +
|
|
||||||
", truncate size: " + newLength + ".");
|
|
||||||
}
|
|
||||||
// Perform INodeFile truncation.
|
|
||||||
final QuotaCounts delta = new QuotaCounts.Builder().build();
|
|
||||||
boolean onBlockBoundary = dir.truncate(iip, newLength, toRemoveBlocks,
|
|
||||||
mtime, delta);
|
|
||||||
Block truncateBlock = null;
|
|
||||||
if(!onBlockBoundary) {
|
|
||||||
// Open file for write, but don't log into edits
|
|
||||||
long lastBlockDelta = file.computeFileSize() - newLength;
|
|
||||||
assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
|
|
||||||
truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine,
|
|
||||||
lastBlockDelta, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
// update the quota: use the preferred block size for UC block
|
|
||||||
dir.writeLock();
|
|
||||||
try {
|
|
||||||
dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
|
|
||||||
} finally {
|
|
||||||
dir.writeUnlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime,
|
|
||||||
truncateBlock);
|
|
||||||
return onBlockBoundary;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert current INode to UnderConstruction.
|
|
||||||
* Recreate lease.
|
|
||||||
* Create new block for the truncated copy.
|
|
||||||
* Schedule truncation of the replicas.
|
|
||||||
*
|
|
||||||
* @return the returned block will be written to editLog and passed back into
|
|
||||||
* this method upon loading.
|
|
||||||
*/
|
|
||||||
Block prepareFileForTruncate(INodesInPath iip,
|
|
||||||
String leaseHolder,
|
|
||||||
String clientMachine,
|
|
||||||
long lastBlockDelta,
|
|
||||||
Block newBlock)
|
|
||||||
throws IOException {
|
|
||||||
INodeFile file = iip.getLastINode().asFile();
|
|
||||||
file.recordModification(iip.getLatestSnapshotId());
|
|
||||||
file.toUnderConstruction(leaseHolder, clientMachine);
|
|
||||||
assert file.isUnderConstruction() : "inode should be under construction.";
|
|
||||||
leaseManager.addLease(
|
|
||||||
file.getFileUnderConstructionFeature().getClientName(), file.getId());
|
|
||||||
boolean shouldRecoverNow = (newBlock == null);
|
|
||||||
BlockInfo oldBlock = file.getLastBlock();
|
|
||||||
boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock);
|
|
||||||
if(newBlock == null) {
|
|
||||||
newBlock = (shouldCopyOnTruncate) ? createNewBlock() :
|
|
||||||
new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
|
|
||||||
nextGenerationStamp(blockIdManager.isLegacyBlock(oldBlock)));
|
|
||||||
}
|
|
||||||
|
|
||||||
BlockInfoUnderConstruction truncatedBlockUC;
|
|
||||||
if(shouldCopyOnTruncate) {
|
|
||||||
// Add new truncateBlock into blocksMap and
|
|
||||||
// use oldBlock as a source for copy-on-truncate recovery
|
|
||||||
truncatedBlockUC = new BlockInfoUnderConstructionContiguous(newBlock,
|
|
||||||
file.getPreferredBlockReplication());
|
|
||||||
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
|
|
||||||
truncatedBlockUC.setTruncateBlock(oldBlock);
|
|
||||||
file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
|
|
||||||
getBlockManager().addBlockCollection(truncatedBlockUC, file);
|
|
||||||
|
|
||||||
NameNode.stateChangeLog.debug(
|
|
||||||
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new" +
|
|
||||||
" size {} new block {} old block {}", truncatedBlockUC.getNumBytes(),
|
|
||||||
newBlock, truncatedBlockUC.getTruncateBlock());
|
|
||||||
} else {
|
|
||||||
// Use new generation stamp for in-place truncate recovery
|
|
||||||
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
|
|
||||||
oldBlock = file.getLastBlock();
|
|
||||||
assert !oldBlock.isComplete() : "oldBlock should be under construction";
|
|
||||||
truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
|
|
||||||
truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
|
|
||||||
truncatedBlockUC.getTruncateBlock().setNumBytes(
|
|
||||||
oldBlock.getNumBytes() - lastBlockDelta);
|
|
||||||
truncatedBlockUC.getTruncateBlock().setGenerationStamp(
|
|
||||||
newBlock.getGenerationStamp());
|
|
||||||
|
|
||||||
NameNode.stateChangeLog.debug(
|
|
||||||
"BLOCK* prepareFileForTruncate: {} Scheduling in-place block " +
|
|
||||||
"truncate to new size {}",
|
|
||||||
truncatedBlockUC.getTruncateBlock().getNumBytes(), truncatedBlockUC);
|
|
||||||
}
|
|
||||||
if (shouldRecoverNow) {
|
|
||||||
truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
|
|
||||||
}
|
|
||||||
|
|
||||||
return newBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Defines if a replica needs to be copied on truncate or
|
|
||||||
* can be truncated in place.
|
|
||||||
*/
|
|
||||||
boolean shouldCopyOnTruncate(INodeFile file, BlockInfo blk) {
|
|
||||||
if(!isUpgradeFinalized()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (isRollingUpgrade()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return file.isBlockInLatestSnapshot(blk);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1008,8 +1008,8 @@ public void testTruncateRecovery() throws IOException {
|
|||||||
fsn.writeLock();
|
fsn.writeLock();
|
||||||
try {
|
try {
|
||||||
Block oldBlock = file.getLastBlock();
|
Block oldBlock = file.getLastBlock();
|
||||||
Block truncateBlock =
|
Block truncateBlock = FSDirTruncateOp.prepareFileForTruncate(fsn, iip,
|
||||||
fsn.prepareFileForTruncate(iip, client, clientMachine, 1, null);
|
client, clientMachine, 1, null);
|
||||||
// In-place truncate uses old block id with new genStamp.
|
// In-place truncate uses old block id with new genStamp.
|
||||||
assertThat(truncateBlock.getBlockId(),
|
assertThat(truncateBlock.getBlockId(),
|
||||||
is(equalTo(oldBlock.getBlockId())));
|
is(equalTo(oldBlock.getBlockId())));
|
||||||
@ -1041,8 +1041,8 @@ public void testTruncateRecovery() throws IOException {
|
|||||||
fsn.writeLock();
|
fsn.writeLock();
|
||||||
try {
|
try {
|
||||||
Block oldBlock = file.getLastBlock();
|
Block oldBlock = file.getLastBlock();
|
||||||
Block truncateBlock =
|
Block truncateBlock = FSDirTruncateOp.prepareFileForTruncate(fsn, iip,
|
||||||
fsn.prepareFileForTruncate(iip, client, clientMachine, 1, null);
|
client, clientMachine, 1, null);
|
||||||
// Copy-on-write truncate makes new block with new id and genStamp
|
// Copy-on-write truncate makes new block with new id and genStamp
|
||||||
assertThat(truncateBlock.getBlockId(),
|
assertThat(truncateBlock.getBlockId(),
|
||||||
is(not(equalTo(oldBlock.getBlockId()))));
|
is(not(equalTo(oldBlock.getBlockId()))));
|
||||||
|
Loading…
Reference in New Issue
Block a user