From 48da033901d3471ef176a94104158546152353e9 Mon Sep 17 00:00:00 2001 From: Sanjay Radia Date: Mon, 12 Dec 2011 05:36:35 +0000 Subject: [PATCH] HDFS-2651 ClientNameNodeProtocol Translators for Protocol Buffers (sanjay) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213143 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSClient.java | 3 +- .../hadoop/hdfs/DistributedFileSystem.java | 3 +- .../hadoop/hdfs/protocol/ClientProtocol.java | 3 +- .../protocolPB/ClientNamenodeProtocolPB.java | 58 ++ ...amenodeProtocolServerSideTranslatorPB.java | 835 +++++++++++++++++ .../ClientNamenodeProtocolTranslatorPB.java | 851 ++++++++++++++++++ ...atanodeProtocolClientSideTranslatorPB.java | 4 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 593 ++++++++++-- ...menodeProtocolServerSideTranslatorR23.java | 3 +- .../ClientNamenodeProtocolTranslatorR23.java | 3 +- .../ClientNamenodeWireProtocol.java | 3 +- .../main/proto/ClientNamenodeProtocol.proto | 16 +- .../hadoop-hdfs/src/main/proto/hdfs.proto | 1 + .../hadoop/hdfs/protocolPB/TestPBHelper.java | 3 +- 15 files changed, 2303 insertions(+), 78 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fd768f1a45..ba995a4ea3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -98,6 +98,8 @@ Trunk (unreleased changes) HDFS-2511. Add dev script to generate HDFS protobufs. (tucu) + HDFS-2651 ClientNameNodeProtocol Translators for Protocol Buffers (sanjay) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0dc224184c..5762a53d0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1473,7 +1473,8 @@ void saveNamespace() throws AccessControlException, IOException { * * @see ClientProtocol#restoreFailedStorage(String arg) */ - boolean restoreFailedStorage(String arg) throws AccessControlException { + boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException{ return namenode.restoreFailedStorage(arg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 4de8fe4fd7..ea7d4c2b63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -660,7 +660,8 @@ public void saveNamespace() throws AccessControlException, IOException { * * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg) */ - public boolean restoreFailedStorage(String arg) throws AccessControlException { + public boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException { return dfs.restoreFailedStorage(arg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 5dbae4e168..28acc2737c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -665,7 +665,8 @@ public boolean setSafeMode(HdfsConstants.SafeModeAction action) * * @throws AccessControlException if the superuser privilege is violated. */ - public boolean restoreFailedStorage(String arg) throws AccessControlException; + public boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException; /** * Tells the namenode to reread the hosts and exclude files. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java new file mode 100644 index 0000000000..6e577cab0a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.TokenInfo; + + +@InterfaceAudience.Private +@InterfaceStability.Stable +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY) +@TokenInfo(DelegationTokenSelector.class) +@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, + protocolVersion = 1) +/** + * Protocol that a clients use to communicate with the NameNode. + * + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +public interface ClientNamenodeProtocolPB extends + ClientNamenodeProtocol.BlockingInterface, VersionedProtocol { + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000..10527f5ac1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,835 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewDelegationTokenResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is used on the server side. Calls come across the wire for the + * for protocol {@link ClientNamenodeProtocolPB}. + * This class translates the PB data types + * to the native data types used inside the NN as specified in the generic + * ClientProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientNamenodeProtocolServerSideTranslatorPB implements + ClientNamenodeProtocolPB { + final private ClientProtocol server; + + /** + * Constructor + * + * @param server - the NN server + * @throws IOException + */ + public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server) + throws IOException { + this.server = server; + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client's call will never reach here. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + * + */ + if (!protocol.equals(RPC.getProtocolName( + ClientNamenodeProtocolPB.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(ClientNamenodeProtocolPB.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), + ClientNamenodeProtocolPB.class); + } + + @Override + public ProtocolSignatureWritable + getProtocolSignature2( + String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + * + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(InterDatanodeProtocolPB.class); + } + + @Override + public GetBlockLocationsResponseProto getBlockLocations( + RpcController controller, GetBlockLocationsRequestProto req) + throws ServiceException { + try { + return GetBlockLocationsResponseProto + .newBuilder() + .setLocations( + PBHelper.convert(server.getBlockLocations(req.getSrc(), + req.getOffset(), req.getLength()))).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetServerDefaultsResponseProto getServerDefaults( + RpcController controller, GetServerDefaultsRequestProto req) + throws ServiceException { + try { + return GetServerDefaultsResponseProto.newBuilder() + .setServerDefaults(PBHelper.convert(server.getServerDefaults())) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public CreateResponseProto create(RpcController controller, + CreateRequestProto req) throws ServiceException { + try { + server.create(req.getSrc(), PBHelper.convert(req.getMasked()), + req.getClientName(), PBHelper.convert(req.getCreateFlag()), + req.getCreateParent(), (short) req.getReplication(), + req.getBlockSize()); + } catch (IOException e) { + throw new ServiceException(e); + } + return CreateResponseProto.newBuilder().build(); + + } + + @Override + public AppendResponseProto append(RpcController controller, + AppendRequestProto req) throws ServiceException { + try { + return AppendResponseProto + .newBuilder() + .setBlock( + PBHelper.convert(server.append(req.getSrc(), req.getClientName()))) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SetReplicationResponseProto setReplication(RpcController controller, + SetReplicationRequestProto req) throws ServiceException { + try { + return SetReplicationResponseProto + .newBuilder() + .setResult( + server.setReplication(req.getSrc(), (short) req.getReplication())) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + + static final SetPermissionResponseProto SET_PERM_RESPONSE = + SetPermissionResponseProto.newBuilder().build(); + + @Override + public SetPermissionResponseProto setPermission(RpcController controller, + SetPermissionRequestProto req) throws ServiceException { + try { + server.setPermission(req.getSrc(), PBHelper.convert(req.getPermission())); + } catch (IOException e) { + throw new ServiceException(e); + } + return SET_PERM_RESPONSE; + } + + static final SetOwnerResponseProto SET_OWNER_RESPONSE = + SetOwnerResponseProto.newBuilder().build(); + + @Override + public SetOwnerResponseProto setOwner(RpcController controller, + SetOwnerRequestProto req) throws ServiceException { + try { + server.setOwner(req.getSrc(), req.getUsername(), req.getGroupname()); + } catch (IOException e) { + throw new ServiceException(e); + } + return SET_OWNER_RESPONSE; + } + + static final AbandonBlockResponseProto ABD_BLOCK_RESPONSE = + AbandonBlockResponseProto.newBuilder().build(); + + @Override + public AbandonBlockResponseProto abandonBlock(RpcController controller, + AbandonBlockRequestProto req) throws ServiceException { + try { + server.abandonBlock(PBHelper.convert(req.getB()), req.getSrc(), + req.getHolder()); + } catch (IOException e) { + throw new ServiceException(e); + } + return ABD_BLOCK_RESPONSE; + } + + @Override + public AddBlockResponseProto addBlock(RpcController controller, + AddBlockRequestProto req) throws ServiceException { + try { + return AddBlockResponseProto.newBuilder().setBlock( + PBHelper.convert( + server.addBlock(req.getSrc(), req.getClientName(), + PBHelper.convert(req.getPrevious()), + PBHelper.convert( + (DatanodeInfoProto[]) req.getExcludeNodesList().toArray())))) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetAdditionalDatanodeResponseProto getAdditionalDatanode( + RpcController controller, GetAdditionalDatanodeRequestProto req) + throws ServiceException { + try { + return GetAdditionalDatanodeResponseProto.newBuilder().setBlock( + PBHelper.convert( + server.getAdditionalDatanode(req.getSrc(), + PBHelper.convert(req.getBlk()), + PBHelper.convert((DatanodeInfoProto[]) req.getExistingsList() + .toArray()), PBHelper + .convert((DatanodeInfoProto[]) req.getExcludesList() + .toArray()), req.getNumAdditionalNodes(), req + .getClientName()))) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public CompleteResponseProto complete(RpcController controller, + CompleteRequestProto req) throws ServiceException { + try { + return CompleteResponseProto.newBuilder().setResult( + server.complete(req.getSrc(), req.getClientName(), + PBHelper.convert(req.getLast()))) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final ReportBadBlocksResponseProto VOID_REP_BAD_BLOCK_RESPONSE = + ReportBadBlocksResponseProto.newBuilder().build(); + + @Override + public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller, + ReportBadBlocksRequestProto req) throws ServiceException { + try { + server.reportBadBlocks(PBHelper.convertLocatedBlock( + (LocatedBlockProto[]) req.getBlocksList().toArray())); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_REP_BAD_BLOCK_RESPONSE; + } + + static final ConcatResponseProto VOID_CONCAT_RESPONSE = + ConcatResponseProto.newBuilder().build(); + + @Override + public ConcatResponseProto concat(RpcController controller, + ConcatRequestProto req) throws ServiceException { + try { + server.concat(req.getTrg(), (String[])req.getSrcsList().toArray()); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_CONCAT_RESPONSE; + } + + @Override + public RenameResponseProto rename(RpcController controller, + RenameRequestProto req) throws ServiceException { + try { + boolean result = server.rename(req.getSrc(), req.getDst()); + return RenameResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final Rename2ResponseProto VOID_RENAME2_RESPONSE = + Rename2ResponseProto.newBuilder().build(); + + @Override + public Rename2ResponseProto rename2(RpcController controller, + Rename2RequestProto req) throws ServiceException { + + try { + server.rename2(req.getSrc(), req.getDst(), + req.getOverwriteDest() ? Rename.OVERWRITE : Rename.NONE); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_RENAME2_RESPONSE; + } + + @Override + public DeleteResponseProto delete(RpcController controller, + DeleteRequestProto req) throws ServiceException { + try { + boolean result = server.delete(req.getSrc(), req.getRecursive()); + return DeleteResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public MkdirsResponseProto mkdirs(RpcController controller, + MkdirsRequestProto req) throws ServiceException { + try { + boolean result = server.mkdirs(req.getSrc(), + PBHelper.convert(req.getMasked()), req.getCreateParent()); + return MkdirsResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetListingResponseProto getListing(RpcController controller, + GetListingRequestProto req) throws ServiceException { + try { + DirectoryListingProto result = PBHelper.convert(server.getListing( + req.getSrc(), req.getStartAfter().toByteArray(), + req.getNeedLocation())); + return GetListingResponseProto.newBuilder().setDirList(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = + RenewLeaseResponseProto.newBuilder().build(); + + @Override + public RenewLeaseResponseProto renewLease(RpcController controller, + RenewLeaseRequestProto req) throws ServiceException { + try { + server.renewLease(req.getClientName()); + return VOID_RENEWLEASE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RecoverLeaseResponseProto recoverLease(RpcController controller, + RecoverLeaseRequestProto req) throws ServiceException { + try { + boolean result = server.recoverLease(req.getSrc(), req.getClientName()); + return RecoverLeaseResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetFsStatsResponseProto getFsStats(RpcController controller, + GetFsStatusRequestProto req) throws ServiceException { + try { + return PBHelper.convert(server.getStats()); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetDatanodeReportResponseProto getDatanodeReport( + RpcController controller, GetDatanodeReportRequestProto req) + throws ServiceException { + try { + DatanodeInfoProto[] result = PBHelper.convert(server + .getDatanodeReport(PBHelper.convert(req.getType()))); + return GetDatanodeReportResponseProto.newBuilder() + .addAllDi(Arrays.asList(result)).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetPreferredBlockSizeResponseProto getPreferredBlockSize( + RpcController controller, GetPreferredBlockSizeRequestProto req) + throws ServiceException { + try { + long result = server.getPreferredBlockSize(req.getFilename()); + return GetPreferredBlockSizeResponseProto.newBuilder().setBsize(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SetSafeModeResponseProto setSafeMode(RpcController controller, + SetSafeModeRequestProto req) throws ServiceException { + try { + boolean result = server.setSafeMode(PBHelper.convert(req.getAction())); + return SetSafeModeResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SaveNamespaceResponseProto VOID_SAVENAMESPACE_RESPONSE = + SaveNamespaceResponseProto.newBuilder().build(); + + @Override + public SaveNamespaceResponseProto saveNamespace(RpcController controller, + SaveNamespaceRequestProto req) throws ServiceException { + try { + server.saveNamespace(); + return VOID_SAVENAMESPACE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + + } + + @Override + public RestoreFailedStorageResponseProto restoreFailedStorage( + RpcController controller, RestoreFailedStorageRequestProto req) + throws ServiceException { + try { + boolean result = server.restoreFailedStorage(req.getArg()); + return RestoreFailedStorageResponseProto.newBuilder().setResult(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = + RefreshNodesResponseProto.newBuilder().build(); + + @Override + public RefreshNodesResponseProto refreshNodes(RpcController controller, + RefreshNodesRequestProto req) throws ServiceException { + try { + server.refreshNodes(); + return VOID_REFRESHNODES_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + + } + + static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE = + FinalizeUpgradeResponseProto.newBuilder().build(); + + @Override + public FinalizeUpgradeResponseProto finalizeUpgrade(RpcController controller, + FinalizeUpgradeRequestProto req) throws ServiceException { + try { + server.finalizeUpgrade(); + return VOID_FINALIZEUPGRADE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public DistributedUpgradeProgressResponseProto distributedUpgradeProgress( + RpcController controller, DistributedUpgradeProgressRequestProto req) + throws ServiceException { + try { + UpgradeStatusReportProto result = PBHelper.convert(server + .distributedUpgradeProgress(PBHelper.convert(req.getAction()))); + return DistributedUpgradeProgressResponseProto.newBuilder() + .setReport(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ListCorruptFileBlocksResponseProto listCorruptFileBlocks( + RpcController controller, ListCorruptFileBlocksRequestProto req) + throws ServiceException { + try { + CorruptFileBlocksProto result = PBHelper.convert(server + .listCorruptFileBlocks(req.getPath(), req.getCookie())); + return ListCorruptFileBlocksResponseProto.newBuilder().setCorrupt(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final MetaSaveResponseProto VOID_METASAVE_RESPONSE = + MetaSaveResponseProto.newBuilder().build(); + + @Override + public MetaSaveResponseProto metaSave(RpcController controller, + MetaSaveRequestProto req) throws ServiceException { + try { + server.metaSave(req.getFilename()); + return VOID_METASAVE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + + } + + @Override + public GetFileInfoResponseProto getFileInfo(RpcController controller, + GetFileInfoRequestProto req) throws ServiceException { + try { + HdfsFileStatusProto result = + PBHelper.convert(server.getFileInfo(req.getSrc())); + return GetFileInfoResponseProto.newBuilder().setFs(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller, + GetFileLinkInfoRequestProto req) throws ServiceException { + try { + HdfsFileStatusProto result = + PBHelper.convert(server.getFileLinkInfo(req.getSrc())); + return GetFileLinkInfoResponseProto.newBuilder().setFs(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetContentSummaryResponseProto getContentSummary( + RpcController controller, GetContentSummaryRequestProto req) + throws ServiceException { + try { + ContentSummaryProto result = + PBHelper.convert(server.getContentSummary(req.getPath())); + return + GetContentSummaryResponseProto.newBuilder().setSummary(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SetQuotaResponseProto VOID_SETQUOTA_RESPONSE = + SetQuotaResponseProto.newBuilder().build(); + + @Override + public SetQuotaResponseProto setQuota(RpcController controller, + SetQuotaRequestProto req) throws ServiceException { + try { + server.setQuota(req.getPath(), req.getNamespaceQuota(), + req.getDiskspaceQuota()); + return VOID_SETQUOTA_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final FsyncResponseProto VOID_FSYNC_RESPONSE = + FsyncResponseProto.newBuilder().build(); + + @Override + public FsyncResponseProto fsync(RpcController controller, + FsyncRequestProto req) throws ServiceException { + try { + server.fsync(req.getSrc(), req.getClient()); + return VOID_FSYNC_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SetTimesResponseProto VOID_SETTIMES_RESPONSE = + SetTimesResponseProto.newBuilder().build(); + + @Override + public SetTimesResponseProto setTimes(RpcController controller, + SetTimesRequestProto req) throws ServiceException { + try { + server.setTimes(req.getSrc(), req.getMtime(), req.getAtime()); + return VOID_SETTIMES_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final CreateSymlinkResponseProto VOID_CREATESYMLINK_RESPONSE = + CreateSymlinkResponseProto.newBuilder().build(); + + @Override + public CreateSymlinkResponseProto createSymlink(RpcController controller, + CreateSymlinkRequestProto req) throws ServiceException { + try { + server.createSymlink(req.getTarget(), req.getLink(), + PBHelper.convert(req.getDirPerm()), req.getCreateParent()); + return VOID_CREATESYMLINK_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetLinkTargetResponseProto getLinkTarget(RpcController controller, + GetLinkTargetRequestProto req) throws ServiceException { + try { + String result = server.getLinkTarget(req.getPath()); + return GetLinkTargetResponseProto.newBuilder().setTargetPath(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public UpdateBlockForPipelineResponseProto updateBlockForPipeline( + RpcController controller, UpdateBlockForPipelineRequestProto req) + throws ServiceException { + try { + LocatedBlockProto result = PBHelper.convert(server + .updateBlockForPipeline(PBHelper.convert(req.getBlock()), + req.getClientName())); + return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final UpdatePipelineResponseProto VOID_UPDATEPIPELINE_RESPONSE = + UpdatePipelineResponseProto.newBuilder().build(); + + @Override + public UpdatePipelineResponseProto updatePipeline(RpcController controller, + UpdatePipelineRequestProto req) throws ServiceException { + try { + server + .updatePipeline(req.getClientName(), PBHelper.convert(req + .getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper + .convert((DatanodeIDProto[]) req.getNewNodesList().toArray())); + return VOID_UPDATEPIPELINE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetDelegationTokenResponseProto getDelegationToken( + RpcController controller, GetDelegationTokenRequestProto req) + throws ServiceException { + try { + BlockTokenIdentifierProto result = PBHelper.convert(server + .getDelegationToken(new Text(req.getRenewer()))); + return GetDelegationTokenResponseProto.newBuilder().setToken(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RenewDelegationTokenResponseProto renewDelegationToken( + RpcController controller, RenewDelegationTokenRequestProto req) + throws ServiceException { + try { + long result = server.renewDelegationToken(PBHelper + .convertDelegationToken(req.getToken())); + return RenewDelegationTokenResponseProto.newBuilder() + .setNewExireTime(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final CancelDelegationTokenResponseProto + VOID_CANCELDELEGATIONTOKEN_RESPONSE = + CancelDelegationTokenResponseProto.newBuilder().build(); + + @Override + public CancelDelegationTokenResponseProto cancelDelegationToken( + RpcController controller, CancelDelegationTokenRequestProto req) + throws ServiceException { + try { + server.cancelDelegationToken(PBHelper.convertDelegationToken(req + .getToken())); + return VOID_CANCELDELEGATIONTOKEN_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SetBalancerBandwidthResponseProto + VOID_SETBALANCERBANDWIDTH_RESPONSE = + SetBalancerBandwidthResponseProto.newBuilder().build(); + + @Override + public SetBalancerBandwidthResponseProto setBalancerBandwidth( + RpcController controller, SetBalancerBandwidthRequestProto req) + throws ServiceException { + try { + server.setBalancerBandwidth(req.getBandwidth()); + return VOID_SETBALANCERBANDWIDTH_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java new file mode 100644 index 0000000000..83aca3987b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -0,0 +1,851 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + +/** + * This class forwards NN's ClientProtocol calls as RPC calls to the NN server + * while translating from the parameter types used in ClientProtocol to those + * used in protocolR23Compatile.*. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientNamenodeProtocolTranslatorPB implements + ClientProtocol, Closeable { + final private ClientNamenodeProtocolPB rpcProxy; + + private static ClientNamenodeProtocolPB createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf, + UserGroupInformation ugi) throws IOException { + RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, + ProtobufRpcEngine.class); + return RPC.getProxy(ClientNamenodeProtocolPB.class, + RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), nameNodeAddr, ugi, conf, + NetUtils.getSocketFactory(conf, ClientNamenodeProtocolPB.class)); + } + + /** Create a {@link NameNode} proxy */ + static ClientNamenodeProtocolPB createNamenodeWithRetry( + ClientNamenodeProtocolPB rpcNamenode) { + RetryPolicy createPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(5, + HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); + + Map, RetryPolicy> remoteExceptionToPolicyMap + = new HashMap, RetryPolicy>(); + remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, + createPolicy); + + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(RemoteException.class, RetryPolicies + .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, + remoteExceptionToPolicyMap)); + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + + methodNameToPolicyMap.put("create", methodPolicy); + + return (ClientNamenodeProtocolPB) RetryProxy.create( + ClientNamenodeProtocolPB.class, rpcNamenode, methodNameToPolicyMap); + } + + public ClientNamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr, + Configuration conf, UserGroupInformation ugi) throws IOException { + + rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi)); + } + + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocolName, + long clientVersion, int clientMethodHash) + throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocolName, clientVersion, clientMethodHash)); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public LocatedBlocks getBlockLocations(String src, long offset, long length) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto + .newBuilder() + .setSrc(src) + .setOffset(offset) + .setLength(length) + .build(); + try { + return PBHelper.convert(rpcProxy.getBlockLocations(null, req).getLocations()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + GetServerDefaultsRequestProto req = GetServerDefaultsRequestProto.newBuilder().build(); + try { + return PBHelper + .convert(rpcProxy.getServerDefaults(null, req).getServerDefaults()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void create(String src, FsPermission masked, String clientName, + EnumSetWritable flag, boolean createParent, + short replication, long blockSize) throws AccessControlException, + AlreadyBeingCreatedException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException { + CreateRequestProto req = CreateRequestProto.newBuilder() + .setSrc(src) + .setMasked(PBHelper.convert(masked)) + .setClientName(clientName) + .setCreateFlag(PBHelper.convertCreateFlag(flag)) + .setCreateParent(createParent) + .setReplication(replication) + .setBlockSize(blockSize) + .build(); + try { + rpcProxy.create(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + + } + + @Override + public LocatedBlock append(String src, String clientName) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + AppendRequestProto req = AppendRequestProto.newBuilder() + .setSrc(src) + .setClientName(clientName) + .build(); + try { + return PBHelper.convert(rpcProxy.append(null, req).getBlock()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean setReplication(String src, short replication) + throws AccessControlException, DSQuotaExceededException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + SetReplicationRequestProto req = SetReplicationRequestProto.newBuilder() + .setSrc(src) + .setReplication(replication) + .build(); + try { + return rpcProxy.setReplication(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void setPermission(String src, FsPermission permission) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + SetPermissionRequestProto req = SetPermissionRequestProto.newBuilder() + .setSrc(src) + .setPermission(PBHelper.convert(permission)) + .build(); + try { + rpcProxy.setPermission(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void setOwner(String src, String username, String groupname) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + SetOwnerRequestProto req = SetOwnerRequestProto.newBuilder() + .setSrc(src) + .setUsername(username) + .setGroupname(groupname) + .build(); + try { + rpcProxy.setOwner(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void abandonBlock(ExtendedBlock b, String src, String holder) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder() + .setB(PBHelper.convert(b)).setSrc(src).setHolder(holder).build(); + try { + rpcProxy.abandonBlock(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public LocatedBlock addBlock(String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes) + throws AccessControlException, FileNotFoundException, + NotReplicatedYetException, SafeModeException, UnresolvedLinkException, + IOException { + AddBlockRequestProto req = AddBlockRequestProto.newBuilder().setSrc(src) + .setClientName(clientName).setPrevious(PBHelper.convert(previous)) + .addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes))) + .build(); + try { + return PBHelper.convert(rpcProxy.addBlock(null, req).getBlock()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk, + DatanodeInfo[] existings, DatanodeInfo[] excludes, + int numAdditionalNodes, String clientName) throws AccessControlException, + FileNotFoundException, SafeModeException, UnresolvedLinkException, + IOException { + GetAdditionalDatanodeRequestProto req = GetAdditionalDatanodeRequestProto + .newBuilder() + .setSrc(src) + .setBlk(PBHelper.convert(blk)) + .addAllExistings(Arrays.asList(PBHelper.convert(existings))) + .addAllExcludes(Arrays.asList(PBHelper.convert(excludes))) + .setNumAdditionalNodes(numAdditionalNodes) + .setClientName(clientName) + .build(); + try { + return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req) + .getBlock()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean complete(String src, String clientName, ExtendedBlock last) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + CompleteRequestProto req = CompleteRequestProto.newBuilder() + .setSrc(src) + .setClientName(clientName) + .setLast(PBHelper.convert(last)) + .build(); + try { + return rpcProxy.complete(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder() + .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlock(blocks))) + .build(); + try { + rpcProxy.reportBadBlocks(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean rename(String src, String dst) throws UnresolvedLinkException, + IOException { + RenameRequestProto req = RenameRequestProto.newBuilder() + .setSrc(src) + .setDst(dst).build(); + try { + return rpcProxy.rename(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + + @Override + public void rename2(String src, String dst, Rename... options) + throws AccessControlException, DSQuotaExceededException, + FileAlreadyExistsException, FileNotFoundException, + NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, + UnresolvedLinkException, IOException { + boolean overwrite = false; + if (options != null) { + for (Rename option : options) { + if (option == Rename.OVERWRITE) { + overwrite = true; + } + } + } + Rename2RequestProto req = Rename2RequestProto.newBuilder(). + setSrc(src). + setDst(dst).setOverwriteDest(overwrite). + build(); + try { + rpcProxy.rename2(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + + } + + @Override + public void concat(String trg, String[] srcs) throws IOException, + UnresolvedLinkException { + ConcatRequestProto req = ConcatRequestProto.newBuilder(). + setTrg(trg). + addAllSrcs(Arrays.asList(srcs)).build(); + try { + rpcProxy.concat(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + + @Override + public boolean delete(String src, boolean recursive) + throws AccessControlException, FileNotFoundException, SafeModeException, + UnresolvedLinkException, IOException { + DeleteRequestProto req = DeleteRequestProto.newBuilder().setSrc(src).setRecursive(recursive).build(); + try { + return rpcProxy.delete(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws AccessControlException, FileAlreadyExistsException, + FileNotFoundException, NSQuotaExceededException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException { + MkdirsRequestProto req = MkdirsRequestProto.newBuilder() + .setSrc(src) + .setMasked(PBHelper.convert(masked)) + .setCreateParent(createParent).build(); + + try { + return rpcProxy.mkdirs(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public DirectoryListing getListing(String src, byte[] startAfter, + boolean needLocation) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + GetListingRequestProto req = GetListingRequestProto.newBuilder() + .setSrc(src) + .setStartAfter(ByteString.copyFrom(startAfter)) + .setNeedLocation(needLocation).build(); + try { + return PBHelper.convert(rpcProxy.getListing(null, req).getDirList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void renewLease(String clientName) throws AccessControlException, + IOException { + RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder() + .setClientName(clientName).build(); + try { + rpcProxy.renewLease(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean recoverLease(String src, String clientName) + throws IOException { + RecoverLeaseRequestProto req = RecoverLeaseRequestProto.newBuilder() + .setSrc(src) + .setClientName(clientName).build(); + try { + return rpcProxy.recoverLease(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public long[] getStats() throws IOException { + GetFsStatusRequestProto req = GetFsStatusRequestProto.newBuilder().build(); + try { + return PBHelper.convert(rpcProxy.getFsStats(null, req)); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) + throws IOException { + GetDatanodeReportRequestProto req = GetDatanodeReportRequestProto + .newBuilder() + .setType(PBHelper.convert(type)).build(); + try { + return PBHelper.convert( + rpcProxy.getDatanodeReport(null, req).getDiList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public long getPreferredBlockSize(String filename) throws IOException, + UnresolvedLinkException { + GetPreferredBlockSizeRequestProto req = GetPreferredBlockSizeRequestProto + .newBuilder() + .setFilename(filename) + .build(); + try { + return rpcProxy.getPreferredBlockSize(null, req).getBsize(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean setSafeMode(SafeModeAction action) throws IOException { + SetSafeModeRequestProto req = SetSafeModeRequestProto.newBuilder(). + setAction(PBHelper.convert(action)).build(); + try { + return rpcProxy.setSafeMode(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void saveNamespace() throws AccessControlException, IOException { + SaveNamespaceRequestProto req = SaveNamespaceRequestProto.newBuilder() + .build(); + try { + rpcProxy.saveNamespace(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException{ + RestoreFailedStorageRequestProto req = RestoreFailedStorageRequestProto + .newBuilder() + .setArg(arg).build(); + try { + return rpcProxy.restoreFailedStorage(null, req).getResult(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void refreshNodes() throws IOException { + RefreshNodesRequestProto req = RefreshNodesRequestProto.newBuilder().build(); + try { + rpcProxy.refreshNodes(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void finalizeUpgrade() throws IOException { + FinalizeUpgradeRequestProto req = FinalizeUpgradeRequestProto.newBuilder().build(); + try { + rpcProxy.finalizeUpgrade(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) + throws IOException { + DistributedUpgradeProgressRequestProto req = + DistributedUpgradeProgressRequestProto.newBuilder(). + setAction(PBHelper.convert(action)).build(); + try { + return PBHelper.convert( + rpcProxy.distributedUpgradeProgress(null, req).getReport()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) + throws IOException { + ListCorruptFileBlocksRequestProto req = ListCorruptFileBlocksRequestProto + .newBuilder().setPath(path).setCookie(cookie).build(); + try { + return PBHelper.convert( + rpcProxy.listCorruptFileBlocks(null, req).getCorrupt()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void metaSave(String filename) throws IOException { + MetaSaveRequestProto req = MetaSaveRequestProto.newBuilder() + .setFilename(filename).build(); + try { + rpcProxy.metaSave(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + + } + + @Override + public HdfsFileStatus getFileInfo(String src) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + GetFileInfoRequestProto req = GetFileInfoRequestProto.newBuilder() + .setSrc(src).build(); + try { + return PBHelper.convert(rpcProxy.getFileInfo(null, req).getFs()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public HdfsFileStatus getFileLinkInfo(String src) + throws AccessControlException, UnresolvedLinkException, IOException { + GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder() + .setSrc(src).build(); + try { + return PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ContentSummary getContentSummary(String path) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + GetContentSummaryRequestProto req = GetContentSummaryRequestProto + .newBuilder() + .setPath(path) + .build(); + try { + return PBHelper.convert(rpcProxy.getContentSummary(null, req) + .getSummary()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void setQuota(String path, long namespaceQuota, long diskspaceQuota) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + SetQuotaRequestProto req = SetQuotaRequestProto.newBuilder() + .setPath(path) + .setNamespaceQuota(namespaceQuota) + .setDiskspaceQuota(diskspaceQuota) + .build(); + try { + rpcProxy.setQuota(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void fsync(String src, String client) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + FsyncRequestProto req = FsyncRequestProto.newBuilder() + .setSrc(src) + .setClient(client) + .build(); + try { + rpcProxy.fsync(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void setTimes(String src, long mtime, long atime) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + SetTimesRequestProto req = SetTimesRequestProto.newBuilder() + .setSrc(src) + .setMtime(mtime) + .setAtime(atime) + .build(); + try { + rpcProxy.setTimes(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void createSymlink(String target, String link, FsPermission dirPerm, + boolean createParent) throws AccessControlException, + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, + IOException { + CreateSymlinkRequestProto req = CreateSymlinkRequestProto.newBuilder() + .setTarget(target) + .setLink(link) + .setDirPerm(PBHelper.convert(dirPerm)) + .setCreateParent(createParent) + .build(); + try { + rpcProxy.createSymlink(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public String getLinkTarget(String path) throws AccessControlException, + FileNotFoundException, IOException { + GetLinkTargetRequestProto req = GetLinkTargetRequestProto.newBuilder() + .setPath(path).build(); + try { + return rpcProxy.getLinkTarget(null, req).getTargetPath(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public LocatedBlock updateBlockForPipeline(ExtendedBlock block, + String clientName) throws IOException { + UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto + .newBuilder() + .setBlock(PBHelper.convert(block)) + .setClientName(clientName) + .build(); + try { + return PBHelper.convert( + rpcProxy.updateBlockForPipeline(null, req).getBlock()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void updatePipeline(String clientName, ExtendedBlock oldBlock, + ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException { + UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() + .setClientName(clientName) + .setOldBlock(PBHelper.convert(oldBlock)) + .setNewBlock(PBHelper.convert(newBlock)) + .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes))) + .build(); + try { + rpcProxy.updatePipeline(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public Token getDelegationToken(Text renewer) + throws IOException { + GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto + .newBuilder() + .setRenewer(renewer.toString()) + .build(); + try { + return PBHelper.convertDelegationToken(rpcProxy.getDelegationToken(null, req).getToken()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public long renewDelegationToken(Token token) + throws IOException { + RenewDelegationTokenRequestProto req = RenewDelegationTokenRequestProto.newBuilder(). + setToken(PBHelper.convert(token)). + build(); + try { + return rpcProxy.renewDelegationToken(null, req).getNewExireTime(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void cancelDelegationToken(Token token) + throws IOException { + CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto + .newBuilder() + .setToken(PBHelper.convert(token)) + .build(); + try { + rpcProxy.cancelDelegationToken(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void setBalancerBandwidth(long bandwidth) throws IOException { + SetBalancerBandwidthRequestProto req = SetBalancerBandwidthRequestProto.newBuilder() + .setBandwidth(bandwidth) + .build(); + try { + rpcProxy.setBalancerBandwidth(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 2745990bf9..e05a884cd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -72,8 +72,8 @@ /** * This class is the client side translator to translate the requests made on - * {@link DatanodeProtocolProtocol} interfaces to the RPC server implementing - * {@link DatanodeProtocolProtocolPB}. + * {@link DatanodeProtocol} interfaces to the RPC server implementing + * {@link DatanodeProtocolPB}. */ @InterfaceAudience.Private @InterfaceStability.Stable diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 15aa1ee721..ea0df18d73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -19,14 +19,34 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; @@ -44,14 +64,22 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsPermissionProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsServerDefaultsProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto.FileType; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto; @@ -61,12 +89,15 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; @@ -88,6 +119,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; @@ -97,6 +129,10 @@ * Utilities for converting protobuf classes to and from implementation classes. */ public class PBHelper { + private static final RegisterCommandProto REG_CMD_PROTO = + RegisterCommandProto.newBuilder().build(); + private static final RegisterCommand REG_CMD = new RegisterCommand(); + private PBHelper() { /** Hidden constructor */ } @@ -152,6 +188,7 @@ public static NamenodeRegistration convert(NamenodeRegistrationProto reg) { convert(reg.getStorageInfo()), convert(reg.getRole())); } + // DatanodeId public static DatanodeID convert(DatanodeIDProto dn) { return new DatanodeID(dn.getName(), dn.getStorageID(), dn.getInfoPort(), dn.getIpcPort()); @@ -163,6 +200,28 @@ public static DatanodeIDProto convert(DatanodeID dn) { .setStorageID(dn.getStorageID()).build(); } + // Arrays of DatanodeId + public static DatanodeIDProto[] convert(DatanodeID[] did) { + if (did == null) return null; + final int len = did.length; + DatanodeIDProto[] result = new DatanodeIDProto[len]; + for (int i = 0; i < len; ++i) { + result[i] = convert(did[i]); + } + return result; + } + + public static DatanodeID[] convert(DatanodeIDProto[] did) { + if (did == null) return null; + final int len = did.length; + DatanodeID[] result = new DatanodeID[len]; + for (int i = 0; i < len; ++i) { + result[i] = convert(did[i]); + } + return result; + } + + // Block public static BlockProto convert(Block b) { return BlockProto.newBuilder().setBlockId(b.getBlockId()) .setGenStamp(b.getGenerationStamp()).setNumBytes(b.getNumBytes()) @@ -317,18 +376,23 @@ public static NamenodeCommand convert(NamenodeCommandProto cmd) { return new NamenodeCommand(cmd.getAction()); } } - - public static ExtendedBlockProto convert(ExtendedBlock b) { - return ExtendedBlockProto.newBuilder().setBlockId(b.getBlockId()) - .setGenerationStamp(b.getGenerationStamp()) - .setNumBytes(b.getNumBytes()).setPoolId(b.getBlockPoolId()).build(); + + public static ExtendedBlock convert(ExtendedBlockProto eb) { + if (eb == null) return null; + return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(), + eb.getGenerationStamp()); } - - public static ExtendedBlock convert(ExtendedBlockProto b) { - return new ExtendedBlock(b.getPoolId(), b.getBlockId(), b.getNumBytes(), - b.getGenerationStamp()); + + public static ExtendedBlockProto convert(final ExtendedBlock b) { + if (b == null) return null; + return ExtendedBlockProto.newBuilder(). + setPoolId(b.getBlockPoolId()). + setBlockId(b.getBlockId()). + setNumBytes(b.getNumBytes()). + setGenerationStamp(b.getGenerationStamp()). + build(); } - + public static RecoveringBlockProto convert(RecoveringBlock b) { if (b == null) { return null; @@ -343,6 +407,62 @@ public static RecoveringBlock convert(RecoveringBlockProto b) { DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); return new RecoveringBlock(block, locs, b.getNewGenStamp()); } + + public static DatanodeInfoProto.AdminState convert( + final DatanodeInfo.AdminStates inAs) { + switch (inAs) { + case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; + case DECOMMISSION_INPROGRESS: + return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; + case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; + default: return DatanodeInfoProto.AdminState.NORMAL; + } + } + + static public DatanodeInfo convert(DatanodeInfoProto di) { + if (di == null) return null; + return new DatanodeInfo( + PBHelper.convert(di.getId()), + di.getLocation(), di.getHostName(), + di.getCapacity(), di.getDfsUsed(), di.getRemaining(), + di.getBlockPoolUsed() , di.getLastUpdate() , di.getXceiverCount() , + PBHelper.convert(di.getAdminState())); + } + + static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { + if (di == null) return null; + return DatanodeInfoProto.newBuilder(). + setId(PBHelper.convert((DatanodeID) di)). + setLocation(di.getNetworkLocation()). + setHostName(di.getHostName()). + setCapacity(di.getCapacity()). + setDfsUsed(di.getDfsUsed()). + setRemaining(di.getRemaining()). + setBlockPoolUsed(di.getBlockPoolUsed()). + setLastUpdate(di.getLastUpdate()). + setXceiverCount(di.getXceiverCount()). + setAdminState(PBHelper.convert(di.getAdminState())). + build(); + } + + + static public DatanodeInfo[] convert(DatanodeInfoProto di[]) { + if (di == null) return null; + DatanodeInfo[] result = new DatanodeInfo[di.length]; + for (int i = 0; i < di.length; i++) { + result[i] = convert(di[i]); + } + return result; + } + + static public DatanodeInfoProto[] convert(DatanodeInfo[] di) { + if (di == null) return null; + DatanodeInfoProto[] result = new DatanodeInfoProto[di.length]; + for (int i = 0; i < di.length; i++) { + result[i] = PBHelper.convertDatanodeInfo(di[i]); + } + return result; + } public static DatanodeInfo[] convert(List list) { DatanodeInfo[] info = new DatanodeInfo[list.size()]; @@ -351,15 +471,6 @@ public static DatanodeInfo[] convert(List list) { } return info; } - - public static DatanodeInfo convert(DatanodeInfoProto info) { - DatanodeIDProto dnId = info.getId(); - return new DatanodeInfo(dnId.getName(), dnId.getStorageID(), - dnId.getInfoPort(), dnId.getIpcPort(), info.getCapacity(), - info.getDfsUsed(), info.getRemaining(), info.getBlockPoolUsed(), - info.getLastUpdate(), info.getXceiverCount(), info.getLocation(), - info.getHostName(), convert(info.getAdminState())); - } public static DatanodeInfoProto convert(DatanodeInfo info) { DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); @@ -389,22 +500,8 @@ public static AdminStates convert(AdminState adminState) { } } - public static AdminState convert(AdminStates adminState) { - switch(adminState) { - case DECOMMISSION_INPROGRESS: - return AdminState.DECOMMISSION_INPROGRESS; - case DECOMMISSIONED: - return AdminState.DECOMMISSIONED; - case NORMAL: - default: - return AdminState.NORMAL; - } - } - public static LocatedBlockProto convert(LocatedBlock b) { - if (b == null) { - return null; - } + if (b == null) return null; Builder builder = LocatedBlockProto.newBuilder(); DatanodeInfo[] locs = b.getLocations(); for (int i = 0; i < locs.length; i++) { @@ -416,6 +513,7 @@ public static LocatedBlockProto convert(LocatedBlock b) { } public static LocatedBlock convert(LocatedBlockProto proto) { + if (proto == null) return null; List locs = proto.getLocsList(); DatanodeInfo[] targets = new DatanodeInfo[locs.size()]; for (int i = 0; i < locs.size(); i++) { @@ -427,13 +525,12 @@ public static LocatedBlock convert(LocatedBlockProto proto) { return lb; } - public static BlockTokenIdentifierProto convert( - Token token) { - ByteString tokenId = ByteString.copyFrom(token.getIdentifier()); - ByteString password = ByteString.copyFrom(token.getPassword()); - return BlockTokenIdentifierProto.newBuilder().setIdentifier(tokenId) - .setKind(token.getKind().toString()).setPassword(password) - .setService(token.getService().toString()).build(); + public static BlockTokenIdentifierProto convert(Token tok) { + return BlockTokenIdentifierProto.newBuilder(). + setIdentifier(ByteString.copyFrom(tok.getIdentifier())). + setPassword(ByteString.copyFrom(tok.getPassword())). + setKind(tok.getKind().toString()). + setService(tok.getService().toString()).build(); } public static Token convert( @@ -443,6 +540,14 @@ public static Token convert( blockToken.getKind()), new Text(blockToken.getService())); } + + public static Token convertDelegationToken( + BlockTokenIdentifierProto blockToken) { + return new Token(blockToken.getIdentifier() + .toByteArray(), blockToken.getPassword().toByteArray(), new Text( + blockToken.getKind()), new Text(blockToken.getService())); + } + public static ReplicaState convert(ReplicaStateProto state) { switch (state) { case RBW: @@ -503,7 +608,7 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) { case KeyUpdateCommand: return PBHelper.convert(proto.getKeyUpdateCmd()); case RegisterCommand: - return PBHelper.convert(proto.getRegisterCmd()); + return REG_CMD; case UpgradeCommand: return PBHelper.convert(proto.getUpgradeCmd()); } @@ -534,10 +639,6 @@ public static FinalizeCommandProto convert(FinalizeCommand cmd) { return FinalizeCommandProto.newBuilder() .setBlockPoolId(cmd.getBlockPoolId()).build(); } - - public static RegisterCommandProto convert(RegisterCommand cmd) { - return RegisterCommandProto.newBuilder().build(); - } public static BlockCommandProto convert(BlockCommand cmd) { BlockCommandProto.Builder builder = BlockCommandProto.newBuilder() @@ -554,19 +655,17 @@ public static BlockCommandProto convert(BlockCommand cmd) { for (int i = 0; i < blocks.length; i++) { builder.addBlocks(PBHelper.convert(blocks[i])); } - DatanodeInfo[][] infos = cmd.getTargets(); - for (int i = 0; i < infos.length; i++) { - builder.addTargets(PBHelper.convert(infos[i])); - } + builder.addAllTargets(PBHelper.convert(cmd.getTargets())); return builder.build(); } - public static DatanodeInfosProto convert(DatanodeInfo[] datanodeInfos) { - DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); - for (int i = 0; i < datanodeInfos.length; i++) { - builder.addDatanodes(PBHelper.convert(datanodeInfos[i])); + private static List convert(DatanodeInfo[][] targets) { + DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; + for (int i = 0; i < targets.length; i++) { + ret[i] = DatanodeInfosProto.newBuilder() + .addAllDatanodes(Arrays.asList(PBHelper.convert(targets[i]))).build(); } - return builder.build(); + return Arrays.asList(ret); } public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { @@ -593,7 +692,7 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { break; case DatanodeProtocol.DNA_REGISTER: builder.setCmdType(DatanodeCommandProto.Type.RegisterCommand) - .setRegisterCmd(PBHelper.convert((RegisterCommand) datanodeCommand)); + .setRegisterCmd(REG_CMD_PROTO); break; case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_INVALIDATE: @@ -619,10 +718,6 @@ public static UpgradeCommand convert(UpgradeCommandProto upgradeCmd) { (short) upgradeCmd.getUpgradeStatus()); } - public static RegisterCommand convert(RegisterCommandProto registerCmd) { - return new RegisterCommand(); - } - public static KeyUpdateCommand convert(KeyUpdateCommandProto keyUpdateCmd) { return new KeyUpdateCommand(PBHelper.convert(keyUpdateCmd.getKeys())); } @@ -715,4 +810,382 @@ public static NamespaceInfoProto convert(NamespaceInfo info) { .setDistUpgradeVersion(info.getDistributedUpgradeVersion()) .setStorageInfo(PBHelper.convert((StorageInfo)info)).build(); } + + // Located Block Arrays and Lists + public static LocatedBlockProto[] convertLocatedBlock(LocatedBlock[] lb) { + if (lb == null) return null; + final int len = lb.length; + LocatedBlockProto[] result = new LocatedBlockProto[len]; + for (int i = 0; i < len; ++i) { + result[i] = PBHelper.convert(lb[i]); + } + return result; + } + + public static LocatedBlock[] convertLocatedBlock(LocatedBlockProto[] lb) { + if (lb == null) return null; + final int len = lb.length; + LocatedBlock[] result = new LocatedBlock[len]; + for (int i = 0; i < len; ++i) { + result[i] = new LocatedBlock( + PBHelper.convert(lb[i].getB()), + PBHelper.convert(lb[i].getLocsList()), + lb[i].getOffset(), lb[i].getCorrupt()); + } + return result; + } + + public static List convertLocatedBlock( + List lb) { + if (lb == null) return null; + final int len = lb.size(); + List result = + new ArrayList(len); + for (int i = 0; i < len; ++i) { + result.add(PBHelper.convert(lb.get(i))); + } + return result; + } + + public static List convertLocatedBlock2(List lb) { + if (lb == null) return null; + final int len = lb.size(); + List result = new ArrayList(len); + for (int i = 0; i < len; ++i) { + result.add(PBHelper.convert(lb.get(i))); + } + return result; + } + + + // LocatedBlocks + public static LocatedBlocks convert(LocatedBlocksProto lb) { + if (lb == null) { + return null; + } + return new LocatedBlocks( + lb.getFileLength(), lb.getUnderConstruction(), + PBHelper.convertLocatedBlock(lb.getBlocksList()), + PBHelper.convert(lb.getLastBlock()), + lb.getIsLastBlockComplete()); + } + + public static LocatedBlocksProto convert(LocatedBlocks lb) { + if (lb == null) { + return null; + } + return LocatedBlocksProto.newBuilder(). + setFileLength(lb.getFileLength()). + setUnderConstruction(lb.isUnderConstruction()). + addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks())). + setLastBlock(PBHelper.convert(lb.getLastLocatedBlock())).setIsLastBlockComplete(lb.isLastBlockComplete()).build(); + } + + public static FsServerDefaults convert(FsServerDefaultsProto fs) { + if (fs == null) return null; + return new FsServerDefaults( + fs.getBlockSize(), fs.getBytesPerChecksum(), + fs.getWritePacketSize(), (short) fs.getReplication(), + fs.getFileBufferSize()); + } + + public static FsServerDefaultsProto convert(FsServerDefaults fs) { + if (fs == null) return null; + return FsServerDefaultsProto.newBuilder(). + setBlockSize(fs.getBlockSize()). + setBytesPerChecksum(fs.getBytesPerChecksum()). + setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build(); + } + + public static FsPermissionProto convert(FsPermission p) { + if (p == null) return null; + return FsPermissionProto.newBuilder().setPerm(p.toShort()).build(); + } + + public static FsPermission convert(FsPermissionProto p) { + if (p == null) return null; + return new FsPermission((short)p.getPerm()); + } + + + // The creatFlag field in PB is a bitmask whose values are the same a the + // emum values of CreateFlag + public static int convertCreateFlag(EnumSetWritable flag) { + int value = 0; + if (flag.contains(CreateFlag.APPEND)) { + value |= CreateFlagProto.APPEND.getNumber(); + } + if (flag.contains(CreateFlag.CREATE)) { + value |= CreateFlagProto.CREATE.getNumber(); + } + if (flag.contains(CreateFlag.OVERWRITE)) { + value |= CreateFlagProto.OVERWRITE.getNumber(); + } + return value; + } + + public static EnumSetWritable convert(int flag) { + EnumSet result = + EnumSet.noneOf(CreateFlag.class); + if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) { + result.add(CreateFlag.APPEND); + } + return new EnumSetWritable(result); + } + + + public static HdfsFileStatus convert(HdfsFileStatusProto fs) { + if (fs == null) + return null; + if (fs.hasLocations()) { + return new HdfsLocatedFileStatus( + fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), + fs.getBlockReplication(), fs.getBlocksize(), + fs.getModificationTime(), fs.getAccessTime(), + PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), + fs.getFileType().equals(FileType.IS_SYMLINK) ? + fs.getSymlink().toByteArray() : null, + fs.getPath().toByteArray(), + PBHelper.convert(fs.hasLocations() ? fs.getLocations() : null)); + } + return new HdfsFileStatus( + fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), + fs.getBlockReplication(), fs.getBlocksize(), + fs.getModificationTime(), fs.getAccessTime(), + PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), + fs.getFileType().equals(FileType.IS_SYMLINK) ? + fs.getSymlink().toByteArray() : null, + fs.getPath().toByteArray()); + } + + public static HdfsFileStatusProto convert(HdfsFileStatus fs) { + if (fs == null) + return null; + FileType fType = FileType.IS_DIR;; + if (fs.isDir()) { + fType = FileType.IS_DIR; + } else if (fs.isSymlink()) { + fType = FileType.IS_SYMLINK; + } + + HdfsFileStatusProto.Builder builder = + HdfsFileStatusProto.newBuilder(). + setLength(fs.getLen()). + setFileType(fType). + setBlockReplication(fs.getReplication()). + setBlocksize(fs.getBlockSize()). + setModificationTime(fs.getModificationTime()). + setAccessTime(fs.getAccessTime()). + setPermission(PBHelper.convert(fs.getPermission())). + setOwner(fs.getOwner()). + setGroup(fs.getGroup()). + setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes())). + setPath(ByteString.copyFrom(fs.getLocalNameInBytes())); + LocatedBlocks locations = null; + if (fs instanceof HdfsLocatedFileStatus) { + builder.setLocations(PBHelper.convert(locations)); + } + return builder.build(); + } + + public static HdfsFileStatusProto[] convert(HdfsFileStatus[] fs) { + if (fs == null) return null; + final int len = fs.length; + HdfsFileStatusProto[] result = new HdfsFileStatusProto[len]; + for (int i = 0; i < len; ++i) { + result[i] = PBHelper.convert(fs[i]); + } + return result; + } + + public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) { + if (fs == null) return null; + final int len = fs.length; + HdfsFileStatus[] result = new HdfsFileStatus[len]; + for (int i = 0; i < len; ++i) { + PBHelper.convert(fs[i]); + } + return result; + } + + public static DirectoryListing convert(DirectoryListingProto dl) { + if (dl == null) + return null; + return new DirectoryListing( + PBHelper.convert((HdfsFileStatusProto[]) + dl.getPartialListingList().toArray()), + dl.getRemainingEntries()); + } + + public static DirectoryListingProto convert(DirectoryListing d) { + if (d == null) + return null; + return DirectoryListingProto.newBuilder(). + addAllPartialListing(Arrays.asList( + PBHelper.convert(d.getPartialListing()))). + setRemainingEntries(d.getRemainingEntries()). + build(); + } + + public static long[] convert(GetFsStatsResponseProto res) { + long[] result = new long[6]; + result[ClientProtocol.GET_STATS_CAPACITY_IDX] = res.getCapacity(); + result[ClientProtocol.GET_STATS_USED_IDX] = res.getUsed(); + result[ClientProtocol.GET_STATS_REMAINING_IDX] = res.getRemaining(); + result[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = res.getUnderReplicated(); + result[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = res.getCorruptBlocks(); + result[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = res.getMissingBlocks(); + return result; + } + + public static GetFsStatsResponseProto convert(long[] fsStats) { + GetFsStatsResponseProto.Builder result = GetFsStatsResponseProto + .newBuilder(); + if (fsStats.length >= ClientProtocol.GET_STATS_CAPACITY_IDX + 1) + result.setCapacity(fsStats[ClientProtocol.GET_STATS_CAPACITY_IDX]); + if (fsStats.length >= ClientProtocol.GET_STATS_USED_IDX + 1) + result.setUsed(fsStats[ClientProtocol.GET_STATS_USED_IDX]); + if (fsStats.length >= ClientProtocol.GET_STATS_REMAINING_IDX + 1) + result.setRemaining(fsStats[ClientProtocol.GET_STATS_REMAINING_IDX]); + if (fsStats.length >= ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX + 1) + result.setUnderReplicated( + fsStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]); + if (fsStats.length >= ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX + 1) + result.setCorruptBlocks( + fsStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]); + if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX + 1) + result.setMissingBlocks( + fsStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]); + return result.build(); + } + + public static DatanodeReportTypeProto + convert(DatanodeReportType t) { + switch (t) { + case ALL: return DatanodeReportTypeProto.ALL; + case LIVE: return DatanodeReportTypeProto.LIVE; + case DEAD: return DatanodeReportTypeProto.DEAD; + default: + throw new IllegalArgumentException("Unexpected data type report:" + t); + } + } + + public static DatanodeReportType + convert(DatanodeReportTypeProto t) { + switch (t) { + case ALL: return DatanodeReportType.ALL; + case LIVE: return DatanodeReportType.LIVE; + case DEAD: return DatanodeReportType.DEAD; + default: + throw new IllegalArgumentException("Unexpected data type report:" + t); + } + } + + public static SafeModeActionProto convert( + SafeModeAction a) { + switch (a) { + case SAFEMODE_LEAVE: + return SafeModeActionProto.SAFEMODE_LEAVE; + case SAFEMODE_ENTER: + return SafeModeActionProto.SAFEMODE_ENTER; + case SAFEMODE_GET: + return SafeModeActionProto.SAFEMODE_GET; + default: + throw new IllegalArgumentException("Unexpected SafeModeAction :" + a); + } + } + + public static SafeModeAction convert( + ClientNamenodeProtocolProtos.SafeModeActionProto a) { + switch (a) { + case SAFEMODE_LEAVE: + return SafeModeAction.SAFEMODE_LEAVE; + case SAFEMODE_ENTER: + return SafeModeAction.SAFEMODE_ENTER; + case SAFEMODE_GET: + return SafeModeAction.SAFEMODE_GET; + default: + throw new IllegalArgumentException("Unexpected SafeModeAction :" + a); + } + } + + public static UpgradeActionProto convert( + UpgradeAction a) { + switch (a) { + case GET_STATUS: + return UpgradeActionProto.GET_STATUS; + case DETAILED_STATUS: + return UpgradeActionProto.DETAILED_STATUS; + case FORCE_PROCEED: + return UpgradeActionProto.FORCE_PROCEED; + default: + throw new IllegalArgumentException("Unexpected UpgradeAction :" + a); + } + } + + + public static UpgradeAction convert( + UpgradeActionProto a) { + switch (a) { + case GET_STATUS: + return UpgradeAction.GET_STATUS; + case DETAILED_STATUS: + return UpgradeAction.DETAILED_STATUS; + case FORCE_PROCEED: + return UpgradeAction.FORCE_PROCEED; + default: + throw new IllegalArgumentException("Unexpected UpgradeAction :" + a); + } + } + + public static UpgradeStatusReportProto convert(UpgradeStatusReport r) { + if (r == null) + return null; + return UpgradeStatusReportProto.newBuilder() + .setVersion(r.getVersion()) + .setUpgradeStatus(r.getUpgradeStatus()) + .setFinalized(r.isFinalized()) + .build(); + } + + public static UpgradeStatusReport convert(UpgradeStatusReportProto r) { + if (r == null) return null; + return new UpgradeStatusReport(r.getVersion(), + (short) r.getUpgradeStatus(), r.getFinalized()); + } + + public static CorruptFileBlocks convert(CorruptFileBlocksProto c) { + if (c == null) + return null; + return new CorruptFileBlocks((String[]) c.getFilesList().toArray(), + c.getCookie()); + } + + public static CorruptFileBlocksProto convert(CorruptFileBlocks c) { + if (c == null) + return null; + return CorruptFileBlocksProto.newBuilder(). + addAllFiles(Arrays.asList(c.getFiles())). + setCookie(c.getCookie()). + build(); + } + + public static ContentSummary convert(ContentSummaryProto cs) { + if (cs == null) return null; + return new ContentSummary( + cs.getLength(), cs.getFileCount(), cs.getDirectoryCount(), cs.getQuota(), + cs.getSpaceConsumed(), cs.getSpaceQuota()); + } + + public static ContentSummaryProto convert(ContentSummary cs) { + if (cs == null) return null; + return ContentSummaryProto.newBuilder(). + setLength(cs.getLength()). + setFileCount(cs.getFileCount()). + setDirectoryCount(cs.getDirectoryCount()). + setQuota(cs.getQuota()). + setSpaceConsumed(cs.getSpaceConsumed()). + setSpaceQuota(cs.getSpaceQuota()). + build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java index e9c053943e..4b424e977a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolServerSideTranslatorR23.java @@ -322,7 +322,8 @@ public void saveNamespace() throws AccessControlException, IOException { } @Override - public boolean restoreFailedStorage(String arg) throws AccessControlException { + public boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException { return server.restoreFailedStorage(arg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java index d9a5525670..44c8eb7b4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeProtocolTranslatorR23.java @@ -338,7 +338,8 @@ public void saveNamespace() throws AccessControlException, IOException { } @Override - public boolean restoreFailedStorage(String arg) throws AccessControlException { + public boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException{ return rpcProxy.restoreFailedStorage(arg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java index c2199e6425..6224fe9d6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientNamenodeWireProtocol.java @@ -325,7 +325,8 @@ public boolean setSafeMode(HdfsConstants.SafeModeAction action) * The specification of this method matches that of * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String)} */ - public boolean restoreFailedStorage(String arg) throws AccessControlException; + public boolean restoreFailedStorage(String arg) + throws AccessControlException, IOException; /** * The specification of this method matches that of diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 9940a299ec..17b4d1d90e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -49,7 +49,7 @@ message GetServerDefaultsResponseProto { required FsServerDefaultsProto serverDefaults = 1; } -enum CreateFlag { +enum CreateFlagProto { CREATE = 0x01; // Create a file OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC APPEND = 0x04; // Append to a file @@ -234,14 +234,14 @@ message GetFsStatsResponseProto { required uint64 missing_blocks = 6; } -enum DatanodeReportType { // type of the datanode report +enum DatanodeReportTypeProto { // type of the datanode report ALL = 1; - LIVE = 3; + LIVE = 2; DEAD = 3; } message GetDatanodeReportRequestProto { - required DatanodeReportType type = 1; + required DatanodeReportTypeProto type = 1; } message GetDatanodeReportResponseProto { @@ -256,14 +256,14 @@ message GetPreferredBlockSizeResponseProto { required uint64 bsize = 1; } -enum SafeModeAction { +enum SafeModeActionProto { SAFEMODE_LEAVE = 1; SAFEMODE_ENTER = 2; SAFEMODE_GET = 3; } message SetSafeModeRequestProto { - required SafeModeAction action = 1; + required SafeModeActionProto action = 1; } message SetSafeModeResponseProto { @@ -296,14 +296,14 @@ message FinalizeUpgradeRequestProto { // no parameters message FinalizeUpgradeResponseProto { // void response } -enum UpgradeAction { +enum UpgradeActionProto { GET_STATUS = 1; DETAILED_STATUS = 2; FORCE_PROCEED = 3; } message DistributedUpgradeProgressRequestProto { - required UpgradeAction action = 1; + required UpgradeActionProto action = 1; } message DistributedUpgradeProgressResponseProto { required UpgradeStatusReportProto report = 1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 1c19b3dc11..9fbf2b969a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -193,6 +193,7 @@ message DirectoryListingProto { message UpgradeStatusReportProto { required uint32 version = 1;; required uint32 upgradeStatus = 2; // % completed in range 0 & 100 + required bool finalized = 3; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index b7add293b2..ee873a9d4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.hdfs.protocolPB; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.*; import java.util.ArrayList; import java.util.Arrays;