diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a4e79c2397..86d1b32efe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -115,6 +115,8 @@ Trunk (unreleased changes) HDFS-2650. Replace @inheritDoc with @Override. (Hari Mankude via suresh). + HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 343f4f36b4..73bb2190e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -627,12 +627,12 @@ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, Configuration conf, UserGroupInformation ugi) throws IOException { /** * Currently we have simply burnt-in support for a SINGLE - * protocol - protocolR23Compatible. This will be replaced + * protocol - protocolPB. This will be replaced * by a way to pick the right protocol based on the * version of the target server. */ - return new org.apache.hadoop.hdfs.protocolR23Compatible. - ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi); + return new org.apache.hadoop.hdfs.protocolPB. + ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi); } /** Create a {@link ClientDatanodeProtocol} proxy */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 1204a76c4e..c75c349422 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -19,11 +19,16 @@ import java.io.IOException; import java.util.Arrays; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -124,9 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto; import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.io.Text; @@ -218,9 +221,7 @@ public GetBlockLocationsResponseProto getBlockLocations( Builder builder = GetBlockLocationsResponseProto .newBuilder(); if (b != null) { - builder.setLocations( - PBHelper.convert(server.getBlockLocations(req.getSrc(), - req.getOffset(), req.getLength()))).build(); + builder.setLocations(PBHelper.convert(b)).build(); } return builder.build(); } catch (IOException e) { @@ -233,14 +234,19 @@ public GetServerDefaultsResponseProto getServerDefaults( RpcController controller, GetServerDefaultsRequestProto req) throws ServiceException { try { + FsServerDefaults result = server.getServerDefaults(); return GetServerDefaultsResponseProto.newBuilder() - .setServerDefaults(PBHelper.convert(server.getServerDefaults())) + .setServerDefaults(PBHelper.convert(result)) .build(); } catch (IOException e) { throw new ServiceException(e); } } + + static final CreateResponseProto VOID_CREATE_RESPONSE = + CreateResponseProto.newBuilder().build(); + @Override public CreateResponseProto create(RpcController controller, CreateRequestProto req) throws ServiceException { @@ -252,19 +258,22 @@ public CreateResponseProto create(RpcController controller, } catch (IOException e) { throw new ServiceException(e); } - return CreateResponseProto.newBuilder().build(); - + return VOID_CREATE_RESPONSE; } + static final AppendResponseProto NULL_APPEND_RESPONSE = + AppendResponseProto.newBuilder().build(); + @Override public AppendResponseProto append(RpcController controller, AppendRequestProto req) throws ServiceException { try { - return AppendResponseProto - .newBuilder() - .setBlock( - PBHelper.convert(server.append(req.getSrc(), req.getClientName()))) - .build(); + LocatedBlock result = server.append(req.getSrc(), req.getClientName()); + if (result != null) { + return AppendResponseProto.newBuilder() + .setBlock(PBHelper.convert(result)).build(); + } + return NULL_APPEND_RESPONSE; } catch (IOException e) { throw new ServiceException(e); } @@ -274,18 +283,16 @@ public AppendResponseProto append(RpcController controller, public SetReplicationResponseProto setReplication(RpcController controller, SetReplicationRequestProto req) throws ServiceException { try { - return SetReplicationResponseProto - .newBuilder() - .setResult( - server.setReplication(req.getSrc(), (short) req.getReplication())) - .build(); + boolean result = + server.setReplication(req.getSrc(), (short) req.getReplication()); + return SetReplicationResponseProto.newBuilder().setResult(result).build(); } catch (IOException e) { throw new ServiceException(e); } } - static final SetPermissionResponseProto SET_PERM_RESPONSE = + static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE = SetPermissionResponseProto.newBuilder().build(); @Override @@ -296,24 +303,26 @@ public SetPermissionResponseProto setPermission(RpcController controller, } catch (IOException e) { throw new ServiceException(e); } - return SET_PERM_RESPONSE; + return VOID_SET_PERM_RESPONSE; } - static final SetOwnerResponseProto SET_OWNER_RESPONSE = + static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE = SetOwnerResponseProto.newBuilder().build(); @Override public SetOwnerResponseProto setOwner(RpcController controller, SetOwnerRequestProto req) throws ServiceException { try { - server.setOwner(req.getSrc(), req.getUsername(), req.getGroupname()); + server.setOwner(req.getSrc(), + req.hasUsername() ? req.getUsername() : null, + req.hasGroupname() ? req.getGroupname() : null); } catch (IOException e) { throw new ServiceException(e); } - return SET_OWNER_RESPONSE; + return VOID_SET_OWNER_RESPONSE; } - static final AbandonBlockResponseProto ABD_BLOCK_RESPONSE = + static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE = AbandonBlockResponseProto.newBuilder().build(); @Override @@ -325,20 +334,22 @@ public AbandonBlockResponseProto abandonBlock(RpcController controller, } catch (IOException e) { throw new ServiceException(e); } - return ABD_BLOCK_RESPONSE; + return VOID_ADD_BLOCK_RESPONSE; } @Override public AddBlockResponseProto addBlock(RpcController controller, AddBlockRequestProto req) throws ServiceException { + try { + List excl = req.getExcludeNodesList(); + LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(), + req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, + (excl == null || + excl.size() == 0) ? null : + PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()]))); return AddBlockResponseProto.newBuilder().setBlock( - PBHelper.convert( - server.addBlock(req.getSrc(), req.getClientName(), - req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, - PBHelper.convert( - (DatanodeInfoProto[]) req.getExcludeNodesList().toArray())))) - .build(); + PBHelper.convert(result)).build(); } catch (IOException e) { throw new ServiceException(e); } @@ -349,15 +360,17 @@ public GetAdditionalDatanodeResponseProto getAdditionalDatanode( RpcController controller, GetAdditionalDatanodeRequestProto req) throws ServiceException { try { + List existingList = req.getExistingsList(); + List excludesList = req.getExcludesList(); + LocatedBlock result = server.getAdditionalDatanode( + req.getSrc(), PBHelper.convert(req.getBlk()), + PBHelper.convert(existingList.toArray( + new DatanodeInfoProto[existingList.size()])), + PBHelper.convert(excludesList.toArray( + new DatanodeInfoProto[excludesList.size()])), + req.getNumAdditionalNodes(), req.getClientName()); return GetAdditionalDatanodeResponseProto.newBuilder().setBlock( - PBHelper.convert( - server.getAdditionalDatanode(req.getSrc(), - PBHelper.convert(req.getBlk()), - PBHelper.convert((DatanodeInfoProto[]) req.getExistingsList() - .toArray()), PBHelper - .convert((DatanodeInfoProto[]) req.getExcludesList() - .toArray()), req.getNumAdditionalNodes(), req - .getClientName()))) + PBHelper.convert(result)) .build(); } catch (IOException e) { throw new ServiceException(e); @@ -368,10 +381,10 @@ public GetAdditionalDatanodeResponseProto getAdditionalDatanode( public CompleteResponseProto complete(RpcController controller, CompleteRequestProto req) throws ServiceException { try { - return CompleteResponseProto.newBuilder().setResult( - server.complete(req.getSrc(), req.getClientName(), - PBHelper.convert(req.getLast()))) - .build(); + boolean result = + server.complete(req.getSrc(), req.getClientName(), + req.hasLast() ? PBHelper.convert(req.getLast()) : null); + return CompleteResponseProto.newBuilder().setResult(result).build(); } catch (IOException e) { throw new ServiceException(e); } @@ -384,8 +397,9 @@ public CompleteResponseProto complete(RpcController controller, public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller, ReportBadBlocksRequestProto req) throws ServiceException { try { + List bl = req.getBlocksList(); server.reportBadBlocks(PBHelper.convertLocatedBlock( - (LocatedBlockProto[]) req.getBlocksList().toArray())); + bl.toArray(new LocatedBlockProto[bl.size()]))); } catch (IOException e) { throw new ServiceException(e); } @@ -399,7 +413,8 @@ public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller, public ConcatResponseProto concat(RpcController controller, ConcatRequestProto req) throws ServiceException { try { - server.concat(req.getTrg(), (String[])req.getSrcsList().toArray()); + List srcs = req.getSrcsList(); + server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()])); } catch (IOException e) { throw new ServiceException(e); } @@ -456,14 +471,21 @@ public MkdirsResponseProto mkdirs(RpcController controller, } } + static final GetListingResponseProto NULL_GETLISTING_RESPONSE = + GetListingResponseProto.newBuilder().build(); @Override public GetListingResponseProto getListing(RpcController controller, GetListingRequestProto req) throws ServiceException { try { - DirectoryListingProto result = PBHelper.convert(server.getListing( + DirectoryListing result = server.getListing( req.getSrc(), req.getStartAfter().toByteArray(), - req.getNeedLocation())); - return GetListingResponseProto.newBuilder().setDirList(result).build(); + req.getNeedLocation()); + if (result !=null) { + return GetListingResponseProto.newBuilder().setDirList( + PBHelper.convert(result)).build(); + } else { + return NULL_GETLISTING_RESPONSE; + } } catch (IOException e) { throw new ServiceException(e); } @@ -494,6 +516,19 @@ public RecoverLeaseResponseProto recoverLease(RpcController controller, } } + @Override + public RestoreFailedStorageResponseProto restoreFailedStorage( + RpcController controller, RestoreFailedStorageRequestProto req) + throws ServiceException { + try { + boolean result = server.restoreFailedStorage(req.getArg()); + return RestoreFailedStorageResponseProto.newBuilder().setResult(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public GetFsStatsResponseProto getFsStats(RpcController controller, GetFsStatusRequestProto req) throws ServiceException { @@ -557,19 +592,6 @@ public SaveNamespaceResponseProto saveNamespace(RpcController controller, } - @Override - public RestoreFailedStorageResponseProto restoreFailedStorage( - RpcController controller, RestoreFailedStorageRequestProto req) - throws ServiceException { - try { - boolean result = server.restoreFailedStorage(req.getArg()); - return RestoreFailedStorageResponseProto.newBuilder().setResult(result) - .build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = RefreshNodesResponseProto.newBuilder().build(); @@ -622,9 +644,10 @@ public ListCorruptFileBlocksResponseProto listCorruptFileBlocks( RpcController controller, ListCorruptFileBlocksRequestProto req) throws ServiceException { try { - CorruptFileBlocksProto result = PBHelper.convert(server - .listCorruptFileBlocks(req.getPath(), req.getCookie())); - return ListCorruptFileBlocksResponseProto.newBuilder().setCorrupt(result) + CorruptFileBlocks result = server.listCorruptFileBlocks( + req.getPath(), req.hasCookie() ? req.getCookie(): null); + return ListCorruptFileBlocksResponseProto.newBuilder() + .setCorrupt(PBHelper.convert(result)) .build(); } catch (IOException e) { throw new ServiceException(e); @@ -646,29 +669,40 @@ public MetaSaveResponseProto metaSave(RpcController controller, } + static final GetFileInfoResponseProto NULL_GETFILEINFO_RESPONSE = + GetFileInfoResponseProto.newBuilder().build(); @Override public GetFileInfoResponseProto getFileInfo(RpcController controller, GetFileInfoRequestProto req) throws ServiceException { try { - HdfsFileStatus res = server.getFileInfo(req.getSrc()); - GetFileInfoResponseProto.Builder builder = - GetFileInfoResponseProto.newBuilder(); - if (res != null) { - builder.setFs(PBHelper.convert(res)); + HdfsFileStatus result = server.getFileInfo(req.getSrc()); + + if (result != null) { + return GetFileInfoResponseProto.newBuilder().setFs( + PBHelper.convert(result)).build(); } - return builder.build(); + return NULL_GETFILEINFO_RESPONSE; } catch (IOException e) { throw new ServiceException(e); } } + static final GetFileLinkInfoResponseProto NULL_GETFILELINKINFO_RESPONSE = + GetFileLinkInfoResponseProto.newBuilder().build(); @Override public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller, GetFileLinkInfoRequestProto req) throws ServiceException { try { - HdfsFileStatusProto result = - PBHelper.convert(server.getFileLinkInfo(req.getSrc())); - return GetFileLinkInfoResponseProto.newBuilder().setFs(result).build(); + HdfsFileStatus result = server.getFileLinkInfo(req.getSrc()); + if (result != null) { + System.out.println("got non null result for getFileLinkInfo for " + req.getSrc()); + return GetFileLinkInfoResponseProto.newBuilder().setFs( + PBHelper.convert(result)).build(); + } else { + System.out.println("got null result for getFileLinkInfo for " + req.getSrc()); + return NULL_GETFILELINKINFO_RESPONSE; + } + } catch (IOException e) { throw new ServiceException(e); } @@ -679,10 +713,9 @@ public GetContentSummaryResponseProto getContentSummary( RpcController controller, GetContentSummaryRequestProto req) throws ServiceException { try { - ContentSummaryProto result = - PBHelper.convert(server.getContentSummary(req.getPath())); - return - GetContentSummaryResponseProto.newBuilder().setSummary(result).build(); + ContentSummary result = server.getContentSummary(req.getPath()); + return GetContentSummaryResponseProto.newBuilder() + .setSummary(PBHelper.convert(result)).build(); } catch (IOException e) { throw new ServiceException(e); } @@ -780,10 +813,11 @@ public UpdateBlockForPipelineResponseProto updateBlockForPipeline( public UpdatePipelineResponseProto updatePipeline(RpcController controller, UpdatePipelineRequestProto req) throws ServiceException { try { + List newNodes = req.getNewNodesList(); server .updatePipeline(req.getClientName(), PBHelper.convert(req .getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper - .convert((DatanodeIDProto[]) req.getNewNodesList().toArray())); + .convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()]))); return VOID_UPDATEPIPELINE_RESPONSE; } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 75fbc7bc8e..5860d3a13a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; @@ -95,9 +96,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; @@ -121,6 +124,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; @@ -263,7 +268,8 @@ public LocatedBlock append(String src, String clientName) .setClientName(clientName) .build(); try { - return PBHelper.convert(rpcProxy.append(null, req).getBlock()); + AppendResponseProto res = rpcProxy.append(null, req); + return res.hasBlock() ? PBHelper.convert(res.getBlock()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -304,13 +310,14 @@ public void setPermission(String src, FsPermission permission) public void setOwner(String src, String username, String groupname) throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { - SetOwnerRequestProto req = SetOwnerRequestProto.newBuilder() - .setSrc(src) - .setUsername(username) - .setGroupname(groupname) - .build(); + SetOwnerRequestProto.Builder req = SetOwnerRequestProto.newBuilder() + .setSrc(src); + if (username != null) + req.setUsername(username); + if (groupname != null) + req.setGroupname(groupname); try { - rpcProxy.setOwner(null, req); + rpcProxy.setOwner(null, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -335,15 +342,14 @@ public LocatedBlock addBlock(String src, String clientName, throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { - AddBlockRequestProto.Builder builder = AddBlockRequestProto.newBuilder(); - builder.setSrc(src) - .setClientName(clientName) - .addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes))); - if (previous != null) { - builder.setPrevious(PBHelper.convert(previous)); - } + AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src) + .setClientName(clientName); + if (previous != null) + req.setPrevious(PBHelper.convert(previous)); + if (excludeNodes != null) + req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes))); try { - return PBHelper.convert(rpcProxy.addBlock(null, builder.build()).getBlock()); + return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -376,13 +382,13 @@ public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk, public boolean complete(String src, String clientName, ExtendedBlock last) throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { - CompleteRequestProto req = CompleteRequestProto.newBuilder() + CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder() .setSrc(src) - .setClientName(clientName) - .setLast(PBHelper.convert(last)) - .build(); + .setClientName(clientName); + if (last != null) + req.setLast(PBHelper.convert(last)); try { - return rpcProxy.complete(null, req).getResult(); + return rpcProxy.complete(null, req.build()).getResult(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -493,7 +499,12 @@ public DirectoryListing getListing(String src, byte[] startAfter, .setStartAfter(ByteString.copyFrom(startAfter)) .setNeedLocation(needLocation).build(); try { - return PBHelper.convert(rpcProxy.getListing(null, req).getDirList()); + GetListingResponseProto result = rpcProxy.getListing(null, req); + + if (result.hasDirList()) { + return PBHelper.convert(result.getDirList()); + } + return null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -635,11 +646,13 @@ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) @Override public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { - ListCorruptFileBlocksRequestProto req = ListCorruptFileBlocksRequestProto - .newBuilder().setPath(path).setCookie(cookie).build(); + ListCorruptFileBlocksRequestProto.Builder req = + ListCorruptFileBlocksRequestProto.newBuilder().setPath(path); + if (cookie != null) + req.setCookie(cookie); try { return PBHelper.convert( - rpcProxy.listCorruptFileBlocks(null, req).getCorrupt()); + rpcProxy.listCorruptFileBlocks(null, req.build()).getCorrupt()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -676,7 +689,9 @@ public HdfsFileStatus getFileLinkInfo(String src) GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder() .setSrc(src).build(); try { - return PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()); + GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req); + return result.hasFs() ? + PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 232b820758..9fc6b7e590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -974,6 +974,13 @@ public static EnumSetWritable convert(int flag) { if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) { result.add(CreateFlag.APPEND); } + if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) { + result.add(CreateFlag.CREATE); + } + if ((flag & CreateFlagProto.OVERWRITE_VALUE) + == CreateFlagProto.OVERWRITE_VALUE) { + result.add(CreateFlag.OVERWRITE); + } return new EnumSetWritable(result); } @@ -1005,7 +1012,7 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) { public static HdfsFileStatusProto convert(HdfsFileStatus fs) { if (fs == null) return null; - FileType fType = FileType.IS_DIR;; + FileType fType = FileType.IS_FILE; if (fs.isDir()) { fType = FileType.IS_DIR; } else if (fs.isSymlink()) { @@ -1024,8 +1031,7 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) { setOwner(fs.getOwner()). setGroup(fs.getGroup()). setPath(ByteString.copyFrom(fs.getLocalNameInBytes())); - - if (fs.getSymlink() != null) { + if (fs.isSymlink()) { builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes())); } if (fs instanceof HdfsLocatedFileStatus) { @@ -1052,7 +1058,7 @@ public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) { final int len = fs.length; HdfsFileStatus[] result = new HdfsFileStatus[len]; for (int i = 0; i < len; ++i) { - PBHelper.convert(fs[i]); + result[i] = PBHelper.convert(fs[i]); } return result; } @@ -1060,9 +1066,11 @@ public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) { public static DirectoryListing convert(DirectoryListingProto dl) { if (dl == null) return null; - return new DirectoryListing( - PBHelper.convert((HdfsFileStatusProto[]) - dl.getPartialListingList().toArray()), + List partList = dl.getPartialListingList(); + return new DirectoryListing( + partList.isEmpty() ? new HdfsFileStatus[0] + : PBHelper.convert( + partList.toArray(new HdfsFileStatusProto[partList.size()])), dl.getRemainingEntries()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 924e63192c..de2a7f3472 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -58,14 +58,15 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; -import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; @@ -89,6 +90,7 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; @@ -141,9 +143,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT); InetSocketAddress socAddr = nn.getRpcServerAddress(conf); - ClientNamenodeProtocolServerSideTranslatorR23 - clientProtocolServerTranslator = - new ClientNamenodeProtocolServerSideTranslatorR23(this); + RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, + ProtobufRpcEngine.class); + ClientNamenodeProtocolServerSideTranslatorPB + clientProtocolServerTranslator = + new ClientNamenodeProtocolServerSideTranslatorPB(this); + BlockingService clientNNPbService = ClientNamenodeProtocol. + newReflectiveBlockingService(clientProtocolServerTranslator); DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = new DatanodeProtocolServerSideTranslatorPB(this); @@ -152,8 +158,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); - BlockingService service = NamenodeProtocolService - .newReflectiveBlockingService(namenodeProtocolXlator); + BlockingService NNPbService = NamenodeProtocolService + .newReflectiveBlockingService(namenodeProtocolXlator); InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { @@ -162,8 +168,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); // Add all the RPC protocols that the namenode implements this.serviceRpcServer = - RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible. - ClientNamenodeWireProtocol.class, clientProtocolServerTranslator, + RPC.getServer(org.apache.hadoop.hdfs.protocolPB. + ClientNamenodeProtocolPB.class, clientNNPbService, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); @@ -173,7 +179,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) RefreshUserMappingsProtocol.class, this); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, GetUserMappingsProtocol.class, this); - DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, + DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, serviceRpcServer); @@ -186,9 +192,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) } // Add all the RPC protocols that the namenode implements this.clientRpcServer = RPC.getServer( - org.apache.hadoop.hdfs.protocolR23Compatible. - ClientNamenodeWireProtocol.class, - clientProtocolServerTranslator, socAddr.getHostName(), + org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, + clientNNPbService, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, @@ -197,7 +202,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) RefreshUserMappingsProtocol.class, this); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, GetUserMappingsProtocol.class, this); - DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, + DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer); @@ -259,7 +264,7 @@ public long getProtocolVersion(String protocol, long clientVersion) throws IOException { if (protocol.equals(ClientProtocol.class.getName())) { throw new IOException("Old Namenode Client protocol is not supported:" + - protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class); + protocol + "Switch your clientside to " + ClientNamenodeProtocol.class); } else if (protocol.equals(DatanodeProtocol.class.getName())){ return DatanodeProtocol.versionID; } else if (protocol.equals(NamenodeProtocol.class.getName())){ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 7a52460ef0..10f39eaa13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -74,7 +74,7 @@ message AppendRequestProto { } message AppendResponseProto { - required LocatedBlockProto block = 1; + optional LocatedBlockProto block = 1; } message SetReplicationRequestProto { @@ -96,8 +96,8 @@ message SetPermissionResponseProto { // void response message SetOwnerRequestProto { required string src = 1; - required string username = 2; - required string groupname = 3; + optional string username = 2; + optional string groupname = 3; } message SetOwnerResponseProto { // void response @@ -139,7 +139,7 @@ message GetAdditionalDatanodeResponseProto { message CompleteRequestProto { required string src = 1; required string clientName = 2; - required ExtendedBlockProto last = 3; + optional ExtendedBlockProto last = 3; } message CompleteResponseProto { @@ -204,7 +204,7 @@ message GetListingRequestProto { required bool needLocation = 3; } message GetListingResponseProto { - required DirectoryListingProto dirList = 1; + optional DirectoryListingProto dirList = 1; } message RenewLeaseRequestProto { @@ -311,7 +311,7 @@ message DistributedUpgradeProgressResponseProto { message ListCorruptFileBlocksRequestProto { required string path = 1; - required string cookie = 2; + optional string cookie = 2; } message ListCorruptFileBlocksResponseProto { @@ -338,7 +338,7 @@ message GetFileLinkInfoRequestProto { } message GetFileLinkInfoResponseProto { - required HdfsFileStatusProto fs = 1; + optional HdfsFileStatusProto fs = 1; } message GetContentSummaryRequestProto {