diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1e2f13fd2d..4bf2997402 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -17,6 +17,8 @@ Trunk (unreleased changes) HDFS-2618. Implement protobuf service for NamenodeProtocol. (suresh) + HDFS-2629. Implement protobuf service for InterDatanodeProtocol. (suresh) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java index e6633ce3be..a722d8acba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java @@ -8,7 +8,7 @@ private HdfsProtos() {} public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } - public enum ReplicaState + public enum ReplicaStateProto implements com.google.protobuf.ProtocolMessageEnum { FINALIZED(0, 0), RBW(1, 1), @@ -26,7 +26,7 @@ public enum ReplicaState public final int getNumber() { return value; } - public static ReplicaState valueOf(int value) { + public static ReplicaStateProto valueOf(int value) { switch (value) { case 0: return FINALIZED; case 1: return RBW; @@ -37,15 +37,15 @@ public static ReplicaState valueOf(int value) { } } - public static com.google.protobuf.Internal.EnumLiteMap + public static com.google.protobuf.Internal.EnumLiteMap internalGetValueMap() { return internalValueMap; } - private static com.google.protobuf.Internal.EnumLiteMap + private static com.google.protobuf.Internal.EnumLiteMap internalValueMap = - new com.google.protobuf.Internal.EnumLiteMap() { - public ReplicaState findValueByNumber(int number) { - return ReplicaState.valueOf(number); + new com.google.protobuf.Internal.EnumLiteMap() { + public ReplicaStateProto findValueByNumber(int number) { + return ReplicaStateProto.valueOf(number); } }; @@ -62,11 +62,11 @@ public ReplicaState findValueByNumber(int number) { return org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.getDescriptor().getEnumTypes().get(0); } - private static final ReplicaState[] VALUES = { + private static final ReplicaStateProto[] VALUES = { FINALIZED, RBW, RWR, RUR, TEMPORARY, }; - public static ReplicaState valueOf( + public static ReplicaStateProto valueOf( com.google.protobuf.Descriptors.EnumValueDescriptor desc) { if (desc.getType() != getDescriptor()) { throw new java.lang.IllegalArgumentException( @@ -78,12 +78,12 @@ public static ReplicaState valueOf( private final int index; private final int value; - private ReplicaState(int index, int value) { + private ReplicaStateProto(int index, int value) { this.index = index; this.value = value; } - // @@protoc_insertion_point(enum_scope:ReplicaState) + // @@protoc_insertion_point(enum_scope:ReplicaStateProto) } public interface ExtendedBlockProtoOrBuilder @@ -20192,10 +20192,10 @@ public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProtoOrBuild "ntKey\030\004 \002(\0132\016.BlockKeyProto\022\037\n\007allKeys\030\005" + " \003(\0132\016.BlockKeyProto\"N\n\024RecoveringBlockP" + "roto\022\023\n\013newGenStamp\030\001 \002(\004\022!\n\005block\030\002 \002(\013" + - "2\022.LocatedBlockProto*G\n\014ReplicaState\022\r\n\t" + - "FINALIZED\020\000\022\007\n\003RBW\020\001\022\007\n\003RWR\020\002\022\007\n\003RUR\020\003\022\r" + - "\n\tTEMPORARY\020\004B6\n%org.apache.hadoop.hdfs.", - "protocol.protoB\nHdfsProtos\240\001\001" + "2\022.LocatedBlockProto*L\n\021ReplicaStateProt" + + "o\022\r\n\tFINALIZED\020\000\022\007\n\003RBW\020\001\022\007\n\003RWR\020\002\022\007\n\003RU" + + "R\020\003\022\r\n\tTEMPORARY\020\004B6\n%org.apache.hadoop.", + "hdfs.protocol.protoB\nHdfsProtos\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/InterDatanodeProtocolProtos.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/InterDatanodeProtocolProtos.java index 281a6a6670..70a01a687a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/InterDatanodeProtocolProtos.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/InterDatanodeProtocolProtos.java @@ -484,9 +484,9 @@ public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProtoOrBu public interface InitReplicaRecoveryResponseProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required .ReplicaState state = 1; + // required .ReplicaStateProto state = 1; boolean hasState(); - org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState getState(); + org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto getState(); // required .BlockProto block = 2; boolean hasBlock(); @@ -522,13 +522,13 @@ public InitReplicaRecoveryResponseProto getDefaultInstanceForType() { } private int bitField0_; - // required .ReplicaState state = 1; + // required .ReplicaStateProto state = 1; public static final int STATE_FIELD_NUMBER = 1; - private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState state_; + private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto state_; public boolean hasState() { return ((bitField0_ & 0x00000001) == 0x00000001); } - public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState getState() { + public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto getState() { return state_; } @@ -546,7 +546,7 @@ public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProtoOrBuilder getB } private void initFields() { - state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED; + state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED; block_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto.getDefaultInstance(); } private byte memoizedIsInitialized = -1; @@ -763,7 +763,7 @@ private static Builder create() { public Builder clear() { super.clear(); - state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED; + state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED; bitField0_ = (bitField0_ & ~0x00000001); if (blockBuilder_ == null) { block_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto.getDefaultInstance(); @@ -888,7 +888,7 @@ public Builder mergeFrom( } case 8: { int rawValue = input.readEnum(); - org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState value = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.valueOf(rawValue); + org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto value = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.valueOf(rawValue); if (value == null) { unknownFields.mergeVarintField(1, rawValue); } else { @@ -912,15 +912,15 @@ public Builder mergeFrom( private int bitField0_; - // required .ReplicaState state = 1; - private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED; + // required .ReplicaStateProto state = 1; + private org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED; public boolean hasState() { return ((bitField0_ & 0x00000001) == 0x00000001); } - public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState getState() { + public org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto getState() { return state_; } - public Builder setState(org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState value) { + public Builder setState(org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto value) { if (value == null) { throw new NullPointerException(); } @@ -931,7 +931,7 @@ public Builder setState(org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.Replica } public Builder clearState() { bitField0_ = (bitField0_ & ~0x00000001); - state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaState.FINALIZED; + state_ = org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto.FINALIZED; onChanged(); return this; } @@ -2448,22 +2448,23 @@ public org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateR java.lang.String[] descriptorData = { "\n\033InterDatanodeProtocol.proto\032\nhdfs.prot" + "o\"G\n\037InitReplicaRecoveryRequestProto\022$\n\005" + - "block\030\001 \002(\0132\025.RecoveringBlockProto\"\\\n In" + - "itReplicaRecoveryResponseProto\022\034\n\005state\030" + - "\001 \002(\0162\r.ReplicaState\022\032\n\005block\030\002 \002(\0132\013.Bl" + - "ockProto\"s\n&UpdateReplicaUnderRecoveryRe" + - "questProto\022\"\n\005block\030\001 \002(\0132\023.ExtendedBloc" + - "kProto\022\022\n\nrecoveryId\030\002 \002(\004\022\021\n\tnewLength\030" + - "\003 \002(\004\"M\n\'UpdateReplicaUnderRecoveryRespo" + - "nseProto\022\"\n\005block\030\001 \002(\0132\023.ExtendedBlockP", - "roto2\353\001\n\034InterDatanodeProtocolService\022Z\n" + - "\023initReplicaRecovery\022 .InitReplicaRecove" + - "ryRequestProto\032!.InitReplicaRecoveryResp" + - "onseProto\022o\n\032updateReplicaUnderRecovery\022" + - "\'.UpdateReplicaUnderRecoveryRequestProto" + - "\032(.UpdateReplicaUnderRecoveryResponsePro" + - "toBJ\n%org.apache.hadoop.hdfs.protocol.pr" + - "otoB\033InterDatanodeProtocolProtos\210\001\001\240\001\001" + "block\030\001 \002(\0132\025.RecoveringBlockProto\"a\n In" + + "itReplicaRecoveryResponseProto\022!\n\005state\030" + + "\001 \002(\0162\022.ReplicaStateProto\022\032\n\005block\030\002 \002(\013" + + "2\013.BlockProto\"s\n&UpdateReplicaUnderRecov" + + "eryRequestProto\022\"\n\005block\030\001 \002(\0132\023.Extende" + + "dBlockProto\022\022\n\nrecoveryId\030\002 \002(\004\022\021\n\tnewLe" + + "ngth\030\003 \002(\004\"M\n\'UpdateReplicaUnderRecovery" + + "ResponseProto\022\"\n\005block\030\001 \002(\0132\023.ExtendedB", + "lockProto2\353\001\n\034InterDatanodeProtocolServi" + + "ce\022Z\n\023initReplicaRecovery\022 .InitReplicaR" + + "ecoveryRequestProto\032!.InitReplicaRecover" + + "yResponseProto\022o\n\032updateReplicaUnderReco" + + "very\022\'.UpdateReplicaUnderRecoveryRequest" + + "Proto\032(.UpdateReplicaUnderRecoveryRespon" + + "seProtoBJ\n%org.apache.hadoop.hdfs.protoc" + + "ol.protoB\033InterDatanodeProtocolProtos\210\001\001" + + "\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolPB.java new file mode 100644 index 0000000000..38a148462c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolPB.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; + +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, + clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface InterDatanodeProtocolPB extends + InterDatanodeProtocolService.BlockingInterface, VersionedProtocol { + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000..1a1c887bcd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryResponseProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.VersionedProtocol; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link InterDatanodeProtocolPB} to the + * {@link InterDatanodeProtocol} server implementation. + */ +@InterfaceAudience.Private +public class InterDatanodeProtocolServerSideTranslatorPB implements + InterDatanodeProtocolPB { + private final InterDatanodeProtocol impl; + + public InterDatanodeProtocolServerSideTranslatorPB(InterDatanodeProtocol impl) { + this.impl = impl; + } + + @Override + public InitReplicaRecoveryResponseProto initReplicaRecovery( + RpcController unused, InitReplicaRecoveryRequestProto request) + throws ServiceException { + RecoveringBlock b = PBHelper.convert(request.getBlock()); + ReplicaRecoveryInfo r; + try { + r = impl.initReplicaRecovery(b); + } catch (IOException e) { + throw new ServiceException(e); + } + return InitReplicaRecoveryResponseProto.newBuilder() + .setBlock(PBHelper.convert(r)).build(); + } + + @Override + public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery( + RpcController unused, UpdateReplicaUnderRecoveryRequestProto request) + throws ServiceException { + ExtendedBlock b; + try { + b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()), + request.getRecoveryId(), request.getNewLength()); + } catch (IOException e) { + throw new ServiceException(e); + } + return UpdateReplicaUnderRecoveryResponseProto.newBuilder() + .setBlock(PBHelper.convert(b)).build(); + } + + /** @see VersionedProtocol#getProtocolVersion */ + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(InterDatanodeProtocolPB.class); + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client will never call this method. + * + * @see VersionedProtocol#getProtocolVersion + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link InterDatanodeProtocol} + */ + if (!protocol.equals(RPC.getProtocolName(InterDatanodeProtocol.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(InterDatanodeProtocol.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(InterDatanodeProtocolPB.class), + InterDatanodeProtocol.class); + } + + + @Override + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link InterDatanodeProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java new file mode 100644 index 0000000000..5f39043302 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is the client side translator to translate the requests made on + * {@link InterDatanodeProtocol} interfaces to the RPC server implementing + * {@link InterDatanodeProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class InterDatanodeProtocolTranslatorPB implements + InterDatanodeProtocol, Closeable { + /** RpcController is not used and hence is set to null */ + private final static RpcController NULL_CONTROLLER = null; + final private InterDatanodeProtocolPB rpcProxy; + + public InterDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr, + Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, InterDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + rpcProxy = RPC.getProxy(InterDatanodeProtocolPB.class, + RPC.getProtocolVersion(InterDatanodeProtocolPB.class), nameNodeAddr, + conf); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocol, clientVersion, clientMethodsHash)); + } + + @Override + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) + throws IOException { + InitReplicaRecoveryRequestProto req = InitReplicaRecoveryRequestProto + .newBuilder().setBlock(PBHelper.convert(rBlock)).build(); + InitReplicaRecoveryResponseProto resp; + try { + resp = rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + BlockProto b = resp.getBlock(); + return new ReplicaRecoveryInfo(b.getBlockId(), b.getNumBytes(), + b.getGenStamp(), PBHelper.convert(resp.getState())); + } + + @Override + public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, + long recoveryId, long newLength) throws IOException { + UpdateReplicaUnderRecoveryRequestProto req = + UpdateReplicaUnderRecoveryRequestProto.newBuilder() + .setBlock(PBHelper.convert(oldBlock)) + .setNewLength(newLength).setRecoveryId(recoveryId).build(); + try { + return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery( + NULL_CONTROLLER, req).getBlock()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java index 389bf154d3..27dda01547 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; @@ -37,6 +38,7 @@ * received on {@link JournalProtocolPB} to the * {@link JournalProtocol} server implementation. */ +@InterfaceAudience.Private public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB { /** Server side implementation to delegate the requests to */ private final JournalProtocol impl; @@ -118,4 +120,4 @@ public ProtocolSignatureWritable getProtocolSignature2(String protocol, return ProtocolSignatureWritable.convert( this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 86e0d9c81e..511b71b96b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; @@ -150,7 +151,7 @@ public long getProtocolVersion(String protocolName, long clientVersion) public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() - .setDatanode(PBHelper.convert(datanode)).setSize(size) + .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size) .build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 9e5ae94cfb..6886d6fa12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -23,26 +23,41 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -51,6 +66,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.security.token.Token; import com.google.protobuf.ByteString; @@ -155,7 +171,13 @@ public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { } public static BlocksWithLocations convert(BlocksWithLocationsProto blocks) { - return new BlocksWithLocations(convert(blocks.getBlocksList())); + List b = blocks.getBlocksList(); + BlockWithLocations[] ret = new BlockWithLocations[b.size()]; + int i = 0; + for (BlockWithLocationsProto entry : b) { + ret[i++] = convert(entry); + } + return new BlocksWithLocations(ret); } public static BlockKeyProto convert(BlockKey key) { @@ -247,15 +269,6 @@ public static NamenodeCommandProto convert(NamenodeCommand cmd) { return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()).build(); } - public static BlockWithLocations[] convert(List b) { - BlockWithLocations[] ret = new BlockWithLocations[b.size()]; - int i = 0; - for (BlockWithLocationsProto entry : b) { - ret[i++] = convert(entry); - } - return ret; - } - public static BlockKey[] convertBlockKeys(List list) { BlockKey[] ret = new BlockKey[list.size()]; int i = 0; @@ -281,4 +294,119 @@ public static NamenodeCommand convert(NamenodeCommandProto cmd) { return new NamenodeCommand(cmd.getAction()); } } -} \ No newline at end of file + + public static ExtendedBlockProto convert(ExtendedBlock b) { + return ExtendedBlockProto.newBuilder().setBlockId(b.getBlockId()) + .setGenerationStamp(b.getGenerationStamp()) + .setNumBytes(b.getNumBytes()).setPoolId(b.getBlockPoolId()).build(); + } + + public static ExtendedBlock convert(ExtendedBlockProto b) { + return new ExtendedBlock(b.getPoolId(), b.getBlockId(), b.getNumBytes(), + b.getGenerationStamp()); + } + + public static RecoveringBlockProto convert(RecoveringBlock b) { + LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b); + return RecoveringBlockProto.newBuilder().setBlock(lb) + .setNewGenStamp(b.getNewGenerationStamp()).build(); + } + + public static RecoveringBlock convert(RecoveringBlockProto b) { + ExtendedBlock block = convert(b.getBlock().getB()); + DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); + return new RecoveringBlock(block, locs, b.getNewGenStamp()); + } + + public static DatanodeInfo[] convert(List list) { + DatanodeInfo[] info = new DatanodeInfo[list.size()]; + for (int i = 0; i < info.length; i++) { + info[i] = convert(list.get(i)); + } + return info; + } + + public static DatanodeInfo convert(DatanodeInfoProto info) { + DatanodeIDProto dnId = info.getId(); + return new DatanodeInfo(dnId.getName(), dnId.getStorageID(), + dnId.getInfoPort(), dnId.getIpcPort(), info.getCapacity(), + info.getDfsUsed(), info.getRemaining(), info.getBlockPoolUsed(), + info.getLastUpdate(), info.getXceiverCount(), info.getLocation(), + info.getHostName(), convert(info.getAdminState())); + } + + public static DatanodeInfoProto convert(DatanodeInfo info) { + return DatanodeInfoProto.newBuilder() + .setAdminState(PBHelper.convert(info.getAdminState())) + .setBlockPoolUsed(info.getBlockPoolUsed()) + .setCapacity(info.getCapacity()) + .setDfsUsed(info.getDfsUsed()) + .setHostName(info.getHostName()) + .setId(PBHelper.convert((DatanodeID)info)) + .setLastUpdate(info.getLastUpdate()) + .setLocation(info.getNetworkLocation()) + .setRemaining(info.getRemaining()) + .setXceiverCount(info.getXceiverCount()) + .build(); + } + + public static AdminStates convert(AdminState adminState) { + switch(adminState) { + case DECOMMISSION_INPROGRESS: + return AdminStates.DECOMMISSION_INPROGRESS; + case DECOMMISSIONED: + return AdminStates.DECOMMISSIONED; + case NORMAL: + default: + return AdminStates.NORMAL; + } + } + + public static AdminState convert(AdminStates adminState) { + switch(adminState) { + case DECOMMISSION_INPROGRESS: + return AdminState.DECOMMISSION_INPROGRESS; + case DECOMMISSIONED: + return AdminState.DECOMMISSIONED; + case NORMAL: + default: + return AdminState.NORMAL; + } + } + + public static LocatedBlockProto convert(LocatedBlock b) { + Builder builder = LocatedBlockProto.newBuilder(); + DatanodeInfo[] locs = b.getLocations(); + for(DatanodeInfo loc : locs) { + builder.addLocs(PBHelper.convert(loc)); + } + return builder.setB(PBHelper.convert(b.getBlock())) + .setBlockToken(PBHelper.convert(b.getBlockToken())) + .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); + } + + public static BlockTokenIdentifierProto convert( + Token token) { + ByteString tokenId = ByteString.copyFrom(token.getIdentifier()); + ByteString password = ByteString.copyFrom(token.getPassword()); + return BlockTokenIdentifierProto.newBuilder().setIdentifier(tokenId) + .setKind(token.getKind().toString()).setPassword(password) + .setService(token.getService().toString()).build(); + } + + public static ReplicaState convert(ReplicaStateProto state) { + switch (state) { + case RBW: + return ReplicaState.RBW; + case RUR: + return ReplicaState.RUR; + case RWR: + return ReplicaState.RWR; + case TEMPORARY: + return ReplicaState.TEMPORARY; + case FINALIZED: + default: + return ReplicaState.FINALIZED; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto index 9f36e29e0f..e27824d13a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto @@ -38,7 +38,7 @@ message InitReplicaRecoveryRequestProto { * Repica recovery information */ message InitReplicaRecoveryResponseProto { - required ReplicaState state = 1; // State fo the replica + required ReplicaStateProto state = 1; // State fo the replica required BlockProto block = 2; // block information } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto index 3e8431829b..cc48e26b0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto @@ -329,7 +329,7 @@ message ExportedBlockKeysProto { /** * State of a block replica at a datanode */ -enum ReplicaState { +enum ReplicaStateProto { FINALIZED = 0; // State of a replica when it is not modified RBW = 1; // State of replica that is being written to RWR = 2; // State of replica that is waiting to be recovered diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index ddf0ad0723..166c252e2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; @@ -33,8 +35,10 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; @@ -43,11 +47,13 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.io.Text; import org.junit.Test; /** @@ -241,4 +247,59 @@ public void testConvertRemoteEditLogManifest() { compare(logs.get(i), logs1.get(i)); } } + + public ExtendedBlock getExtendedBlock() { + return new ExtendedBlock("bpid", 1, 100, 2); + } + + public DatanodeInfo getDNInfo() { + return new DatanodeInfo(new DatanodeID("node", "sid", 1, 2)); + } + + private void compare(DatanodeInfo dn1, DatanodeInfo dn2) { + assertEquals(dn1.getAdminState(), dn2.getAdminState()); + assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed()); + assertEquals(dn1.getBlockPoolUsedPercent(), dn2.getBlockPoolUsedPercent()); + assertEquals(dn1.getCapacity(), dn2.getCapacity()); + assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport()); + assertEquals(dn1.getDfsUsed(), dn1.getDfsUsed()); + assertEquals(dn1.getDfsUsedPercent(), dn1.getDfsUsedPercent()); + assertEquals(dn1.getHost(), dn2.getHost()); + assertEquals(dn1.getHostName(), dn2.getHostName()); + assertEquals(dn1.getInfoPort(), dn2.getInfoPort()); + assertEquals(dn1.getIpcPort(), dn2.getIpcPort()); + assertEquals(dn1.getLastUpdate(), dn2.getLastUpdate()); + assertEquals(dn1.getLevel(), dn2.getLevel()); + assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation()); + } + + @Test + public void testConvertExtendedBlock() { + ExtendedBlock b = getExtendedBlock(); + ExtendedBlockProto bProto = PBHelper.convert(b); + ExtendedBlock b1 = PBHelper.convert(bProto); + assertEquals(b, b1); + } + + @Test + public void testConvertRecoveringBlock() { + DatanodeInfo[] dnInfo = new DatanodeInfo[] { getDNInfo(), getDNInfo() }; + RecoveringBlock b = new RecoveringBlock(getExtendedBlock(), dnInfo, 3); + RecoveringBlockProto bProto = PBHelper.convert(b); + RecoveringBlock b1 = PBHelper.convert(bProto); + assertEquals(b.getBlock(), b1.getBlock()); + DatanodeInfo[] dnInfo1 = b1.getLocations(); + assertEquals(dnInfo.length, dnInfo1.length); + for (int i=0; i < dnInfo.length; i++) { + compare(dnInfo[0], dnInfo1[0]); + } + } + + @Test + public void testConvertText() { + Text t = new Text("abc".getBytes()); + String s = t.toString(); + Text t1 = new Text(s); + assertEquals(t, t1); + } }