HDFS-7210. Avoid two separate RPC's namenode.append() and namenode.getFileInfo() for an append call from DFSClient. (Vinayakumar B via umamahesh)
This commit is contained in:
parent
c1f2bb2d31
commit
1556f86a31
@ -404,6 +404,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7310. Mover can give first priority to local DN if it has target storage type
|
||||
available in local DN. (Vinayakumar B via umamahesh)
|
||||
|
||||
HDFS-7210. Avoid two separate RPC's namenode.append() and namenode.getFileInfo()
|
||||
for an append call from DFSClient. (Vinayakumar B via umamahesh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -163,6 +163,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
@ -1770,9 +1771,9 @@ public String getLinkTarget(String path) throws IOException {
|
||||
/** Method to get stream returned by append call */
|
||||
private DFSOutputStream callAppend(String src,
|
||||
int buffersize, Progressable progress) throws IOException {
|
||||
LocatedBlock lastBlock = null;
|
||||
LastBlockWithStatus lastBlockWithStatus = null;
|
||||
try {
|
||||
lastBlock = namenode.append(src, clientName);
|
||||
lastBlockWithStatus = namenode.append(src, clientName);
|
||||
} catch(RemoteException re) {
|
||||
throw re.unwrapRemoteException(AccessControlException.class,
|
||||
FileNotFoundException.class,
|
||||
@ -1782,9 +1783,10 @@ private DFSOutputStream callAppend(String src,
|
||||
UnresolvedPathException.class,
|
||||
SnapshotAccessControlException.class);
|
||||
}
|
||||
HdfsFileStatus newStat = getFileInfo(src);
|
||||
HdfsFileStatus newStat = lastBlockWithStatus.getFileStatus();
|
||||
return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
|
||||
lastBlock, newStat, dfsClientConf.createChecksum());
|
||||
lastBlockWithStatus.getLastBlock(), newStat,
|
||||
dfsClientConf.createChecksum());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -203,7 +203,8 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
||||
* Append to the end of the file.
|
||||
* @param src path of the file being created.
|
||||
* @param clientName name of the current client.
|
||||
* @return information about the last partial block if any.
|
||||
* @return wrapper with information about the last partial block and file
|
||||
* status if any
|
||||
* @throws AccessControlException if permission to append file is
|
||||
* denied by the system. As usually on the client side the exception will
|
||||
* be wrapped into {@link org.apache.hadoop.ipc.RemoteException}.
|
||||
@ -224,7 +225,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
||||
* @throws UnsupportedOperationException if append is not supported
|
||||
*/
|
||||
@AtMostOnce
|
||||
public LocatedBlock append(String src, String clientName)
|
||||
public LastBlockWithStatus append(String src, String clientName)
|
||||
throws AccessControlException, DSQuotaExceededException,
|
||||
FileNotFoundException, SafeModeException, UnresolvedLinkException,
|
||||
SnapshotAccessControlException, IOException;
|
||||
|
@ -0,0 +1,46 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Class to contain Lastblock and HdfsFileStatus for the Append operation
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class LastBlockWithStatus {
|
||||
|
||||
private final LocatedBlock lastBlock;
|
||||
|
||||
private final HdfsFileStatus fileStatus;
|
||||
|
||||
public LastBlockWithStatus(LocatedBlock lastBlock, HdfsFileStatus fileStatus) {
|
||||
this.lastBlock = lastBlock;
|
||||
this.fileStatus = fileStatus;
|
||||
}
|
||||
|
||||
public LocatedBlock getLastBlock() {
|
||||
return lastBlock;
|
||||
}
|
||||
|
||||
public HdfsFileStatus getFileStatus() {
|
||||
return fileStatus;
|
||||
}
|
||||
}
|
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
@ -247,9 +248,6 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||
private static final CreateResponseProto VOID_CREATE_RESPONSE =
|
||||
CreateResponseProto.newBuilder().build();
|
||||
|
||||
private static final AppendResponseProto VOID_APPEND_RESPONSE =
|
||||
AppendResponseProto.newBuilder().build();
|
||||
|
||||
private static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE =
|
||||
SetPermissionResponseProto.newBuilder().build();
|
||||
|
||||
@ -407,17 +405,21 @@ public CreateResponseProto create(RpcController controller,
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public AppendResponseProto append(RpcController controller,
|
||||
AppendRequestProto req) throws ServiceException {
|
||||
try {
|
||||
LocatedBlock result = server.append(req.getSrc(), req.getClientName());
|
||||
if (result != null) {
|
||||
return AppendResponseProto.newBuilder()
|
||||
.setBlock(PBHelper.convert(result)).build();
|
||||
LastBlockWithStatus result = server.append(req.getSrc(),
|
||||
req.getClientName());
|
||||
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
|
||||
if (result.getLastBlock() != null) {
|
||||
builder.setBlock(PBHelper.convert(result.getLastBlock()));
|
||||
}
|
||||
return VOID_APPEND_RESPONSE;
|
||||
if (result.getFileStatus() != null) {
|
||||
builder.setStat(PBHelper.convert(result.getFileStatus()));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
@ -63,6 +64,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
@ -188,7 +190,6 @@
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
||||
import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||
import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
|
||||
.EncryptionZoneProto;
|
||||
@ -301,7 +302,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocatedBlock append(String src, String clientName)
|
||||
public LastBlockWithStatus append(String src, String clientName)
|
||||
throws AccessControlException, DSQuotaExceededException,
|
||||
FileNotFoundException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
@ -311,7 +312,11 @@ public LocatedBlock append(String src, String clientName)
|
||||
.build();
|
||||
try {
|
||||
AppendResponseProto res = rpcProxy.append(null, req);
|
||||
return res.hasBlock() ? PBHelper.convert(res.getBlock()) : null;
|
||||
LocatedBlock lastBlock = res.hasBlock() ? PBHelper
|
||||
.convert(res.getBlock()) : null;
|
||||
HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat())
|
||||
: null;
|
||||
return new LastBlockWithStatus(lastBlock, stat);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
|
@ -38,6 +38,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
@ -392,8 +393,12 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
||||
|
||||
// add the op into retry cache is necessary
|
||||
if (toAddRetryCache) {
|
||||
HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
|
||||
HdfsFileStatus.EMPTY_NAME, newFile,
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED,
|
||||
Snapshot.CURRENT_STATE_ID, false, iip);
|
||||
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
|
||||
addCloseOp.rpcCallId, lb);
|
||||
addCloseOp.rpcCallId, new LastBlockWithStatus(lb, stat));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -185,6 +185,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
@ -2888,7 +2889,7 @@ private void recoverLeaseInternal(INodeFile fileInode,
|
||||
/**
|
||||
* Append to an existing file in the namespace.
|
||||
*/
|
||||
LocatedBlock appendFile(
|
||||
LastBlockWithStatus appendFile(
|
||||
String src, String holder, String clientMachine, boolean logRetryCache)
|
||||
throws IOException {
|
||||
try {
|
||||
@ -2899,7 +2900,7 @@ LocatedBlock appendFile(
|
||||
}
|
||||
}
|
||||
|
||||
private LocatedBlock appendFileInt(final String srcArg, String holder,
|
||||
private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
|
||||
String clientMachine, boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, FileNotFoundException,
|
||||
@ -2912,6 +2913,7 @@ private LocatedBlock appendFileInt(final String srcArg, String holder,
|
||||
}
|
||||
boolean skipSync = false;
|
||||
LocatedBlock lb = null;
|
||||
HdfsFileStatus stat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
@ -2921,6 +2923,8 @@ private LocatedBlock appendFileInt(final String srcArg, String holder,
|
||||
checkNameNodeSafeMode("Cannot append to file" + src);
|
||||
src = dir.resolvePath(pc, src, pathComponents);
|
||||
lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
|
||||
stat = dir.getFileInfo(src, false, FSDirectory.isReservedRawName(srcArg),
|
||||
true);
|
||||
} catch (StandbyException se) {
|
||||
skipSync = true;
|
||||
throw se;
|
||||
@ -2941,7 +2945,7 @@ private LocatedBlock appendFileInt(final String srcArg, String holder,
|
||||
}
|
||||
}
|
||||
logAuditEvent(true, "append", srcArg);
|
||||
return lb;
|
||||
return new LastBlockWithStatus(lb, stat);
|
||||
}
|
||||
|
||||
ExtendedBlock getExtendedBlock(Block blk) {
|
||||
|
@ -35,6 +35,7 @@
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -86,6 +87,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.FSLimitException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
@ -618,7 +620,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public LocatedBlock append(String src, String clientName)
|
||||
public LastBlockWithStatus append(String src, String clientName)
|
||||
throws IOException {
|
||||
String clientMachine = getClientMachine();
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
@ -627,10 +629,10 @@ public LocatedBlock append(String src, String clientName)
|
||||
}
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (LocatedBlock) cacheEntry.getPayload();
|
||||
return (LastBlockWithStatus) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
LocatedBlock info = null;
|
||||
LastBlockWithStatus info = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
info = namesystem.appendFile(src, clientName, clientMachine,
|
||||
|
@ -90,6 +90,7 @@ message AppendRequestProto {
|
||||
|
||||
message AppendResponseProto {
|
||||
optional LocatedBlockProto block = 1;
|
||||
optional HdfsFileStatusProto stat = 2;
|
||||
}
|
||||
|
||||
message SetReplicationRequestProto {
|
||||
|
@ -45,7 +45,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.ipc.ClientId;
|
||||
@ -232,7 +232,7 @@ public void testAppend() throws Exception {
|
||||
|
||||
// Retried append requests succeed
|
||||
newCall();
|
||||
LocatedBlock b = nnRpc.append(src, "holder");
|
||||
LastBlockWithStatus b = nnRpc.append(src, "holder");
|
||||
Assert.assertEquals(b, nnRpc.append(src, "holder"));
|
||||
Assert.assertEquals(b, nnRpc.append(src, "holder"));
|
||||
|
||||
|
@ -66,6 +66,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
@ -420,7 +421,7 @@ Object getResult() {
|
||||
/** append operation */
|
||||
class AppendOp extends AtMostOnceOp {
|
||||
private final String fileName;
|
||||
private LocatedBlock lbk;
|
||||
private LastBlockWithStatus lbk;
|
||||
|
||||
AppendOp(DFSClient client, String fileName) {
|
||||
super("append", client);
|
||||
|
Loading…
Reference in New Issue
Block a user