HADOOP-18487. Make protobuf 2.5 an optional runtime dependency. (#4996)

Protobuf 2.5 JAR is no longer needed at runtime. 

The option common.protobuf.scope defines whether the protobuf 2.5.0
dependency is marked as provided or not.

* New package org.apache.hadoop.ipc.internal for internal only protobuf classes
  ...with a ShadedProtobufHelper in there which has shaded protobuf refs
  only, so guaranteed not to need protobuf-2.5 on the CP
* All uses of org.apache.hadoop.ipc.ProtobufHelper have
  been replaced by uses of org.apache.hadoop.ipc.internal.ShadedProtobufHelper
* The scope of protobuf-2.5 is set by the option common.protobuf2.scope
  In this patch is it is still "compile"
* There is explicit reference to it in modules where it may be needed.
*  The maven scope of the dependency can be set with the common.protobuf2.scope
   option. It can be set to "provided" in a build:
       -Dcommon.protobuf2.scope=provided
* Add new ipc(callable) method to catch and convert shaded protobuf
  exceptions raised during invocation of the supplied lambda expression
* This is adopted in the code where the migration is not traumatically
  over-complex. RouterAdminProtocolTranslatorPB is left alone for this
  reason.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2023-10-13 13:48:38 +01:00 committed by GitHub
parent 81edbebdd8
commit 9bc159f4ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 1264 additions and 1633 deletions

View File

@ -311,6 +311,30 @@ Maven build goals:
package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided, package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
the build will fail if -Dpmdk.lib is not specified. the build will fail if -Dpmdk.lib is not specified.
Controlling the redistribution of the protobuf-2.5 dependency
The protobuf 2.5.0 library is used at compile time to compile the class
org.apache.hadoop.ipc.ProtobufHelper; this class known to have been used by
external projects in the past. Protobuf 2.5 is not used elsewhere in
the Hadoop codebase; alongside the move to Protobuf 3.x a private successor
class, org.apache.hadoop.ipc.internal.ShadedProtobufHelper is now used.
The hadoop-common JAR still declares a dependency on protobuf-2.5, but this
is likely to change in the future. The maven scope of the dependency can be
set with the common.protobuf2.scope option.
It can be set to "provided" in a build:
-Dcommon.protobuf2.scope=provided
If this is done then protobuf-2.5.0.jar will no longer be exported as a dependency,
and will then be omitted from the share/hadoop/common/lib/ directory of
any Hadoop distribution built. Any application declaring a dependency on hadoop-commmon
will no longer get the dependency; if they need it then they must explicitly declare it:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
---------------------------------------------------------------------------------- ----------------------------------------------------------------------------------
Building components separately Building components separately

View File

@ -451,8 +451,7 @@
</Match> </Match>
<Match> <Match>
<Class name="org.apache.hadoop.ipc.ProtobufHelper" /> <Class name="org.apache.hadoop.ipc.internal.ShadedProtobufHelper" />
<Method name="getFixedByteString" />
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" /> <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
</Match> </Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -263,10 +263,11 @@
<artifactId>re2j</artifactId> <artifactId>re2j</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!-- Needed for compilation, though no longer in production. -->
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>compile</scope> <scope>${common.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.code.gson</groupId> <groupId>com.google.code.gson</groupId>
@ -504,11 +505,11 @@
<!--These classes have direct Protobuf references for backward compatibility reasons--> <!--These classes have direct Protobuf references for backward compatibility reasons-->
<excludes> <excludes>
<exclude>**/ProtobufHelper.java</exclude> <exclude>**/ProtobufHelper.java</exclude>
<exclude>**/RpcWritable.java</exclude>
<exclude>**/ProtobufRpcEngineCallback.java</exclude> <exclude>**/ProtobufRpcEngineCallback.java</exclude>
<exclude>**/ProtobufRpcEngine.java</exclude> <exclude>**/ProtobufRpcEngine.java</exclude>
<exclude>**/ProtobufRpcEngine2.java</exclude> <exclude>**/ProtobufRpcEngine2.java</exclude>
<exclude>**/ProtobufRpcEngineProtos.java</exclude> <exclude>**/ProtobufRpcEngineProtos.java</exclude>
<exclude>**/ProtobufWrapperLegacy.java</exclude>
</excludes> </excludes>
</configuration> </configuration>
</execution> </execution>

View File

@ -37,14 +37,13 @@
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverRequestProto; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -84,60 +83,39 @@ public HAServiceProtocolClientSideTranslatorPB(
@Override @Override
public void monitorHealth() throws IOException { public void monitorHealth() throws IOException {
try { ipc(() -> rpcProxy.monitorHealth(NULL_CONTROLLER, MONITOR_HEALTH_REQ));
rpcProxy.monitorHealth(NULL_CONTROLLER, MONITOR_HEALTH_REQ);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException { public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException {
try { TransitionToActiveRequestProto req =
TransitionToActiveRequestProto req = TransitionToActiveRequestProto.newBuilder()
TransitionToActiveRequestProto.newBuilder()
.setReqInfo(convert(reqInfo)).build(); .setReqInfo(convert(reqInfo)).build();
ipc(() -> rpcProxy.transitionToActive(NULL_CONTROLLER, req));
rpcProxy.transitionToActive(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException { public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException {
try { TransitionToStandbyRequestProto req =
TransitionToStandbyRequestProto req =
TransitionToStandbyRequestProto.newBuilder() TransitionToStandbyRequestProto.newBuilder()
.setReqInfo(convert(reqInfo)).build(); .setReqInfo(convert(reqInfo)).build();
rpcProxy.transitionToStandby(NULL_CONTROLLER, req); ipc(() -> rpcProxy.transitionToStandby(NULL_CONTROLLER, req));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void transitionToObserver(StateChangeRequestInfo reqInfo) public void transitionToObserver(StateChangeRequestInfo reqInfo)
throws IOException { throws IOException {
try { TransitionToObserverRequestProto req =
TransitionToObserverRequestProto req = TransitionToObserverRequestProto.newBuilder()
TransitionToObserverRequestProto.newBuilder() .setReqInfo(convert(reqInfo)).build();
.setReqInfo(convert(reqInfo)).build(); ipc(() -> rpcProxy.transitionToObserver(NULL_CONTROLLER, req));
rpcProxy.transitionToObserver(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public HAServiceStatus getServiceStatus() throws IOException { public HAServiceStatus getServiceStatus() throws IOException {
GetServiceStatusResponseProto status; GetServiceStatusResponseProto status;
try { status = ipc(() -> rpcProxy.getServiceStatus(NULL_CONTROLLER,
status = rpcProxy.getServiceStatus(NULL_CONTROLLER, GET_SERVICE_STATUS_REQ));
GET_SERVICE_STATUS_REQ);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
HAServiceStatus ret = new HAServiceStatus( HAServiceStatus ret = new HAServiceStatus(
convert(status.getState())); convert(status.getState()));

View File

@ -27,15 +27,14 @@
import org.apache.hadoop.ha.ZKFCProtocol; import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto; import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto; import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class ZKFCProtocolClientSideTranslatorPB implements public class ZKFCProtocolClientSideTranslatorPB implements
@ -57,24 +56,16 @@ public ZKFCProtocolClientSideTranslatorPB(
@Override @Override
public void cedeActive(int millisToCede) throws IOException, public void cedeActive(int millisToCede) throws IOException,
AccessControlException { AccessControlException {
try { CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder()
CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder() .setMillisToCede(millisToCede)
.setMillisToCede(millisToCede) .build();
.build(); ipc(() -> rpcProxy.cedeActive(NULL_CONTROLLER, req));
rpcProxy.cedeActive(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void gracefulFailover() throws IOException, AccessControlException { public void gracefulFailover() throws IOException, AccessControlException {
try { ipc(() -> rpcProxy.gracefulFailover(NULL_CONTROLLER,
rpcProxy.gracefulFailover(NULL_CONTROLLER, GracefulFailoverRequestProto.getDefaultInstance()));
GracefulFailoverRequestProto.getDefaultInstance());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -30,31 +30,37 @@
import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.thirdparty.protobuf.ServiceException;
/** /**
* Helper methods for protobuf related RPC implementation * Helper methods for protobuf related RPC implementation.
* This is deprecated because it references protobuf 2.5 classes
* as well as the shaded ones -and so needs an unshaded protobuf-2.5
* JAR on the classpath during execution.
* It MUST NOT be used internally; it is retained in case existing,
* external applications already use it.
* @deprecated hadoop code MUST use {@link ShadedProtobufHelper}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@Deprecated
public class ProtobufHelper { public class ProtobufHelper {
private ProtobufHelper() { private ProtobufHelper() {
// Hidden constructor for class with only static helper methods // Hidden constructor for class with only static helper methods
} }
/** /**
* Return the IOException thrown by the remote server wrapped in * Return the IOException thrown by the remote server wrapped in
* ServiceException as cause. * ServiceException as cause.
* @param se ServiceException that wraps IO exception thrown by the server * @param se ServiceException that wraps IO exception thrown by the server
* @return Exception wrapped in ServiceException or * @return Exception wrapped in ServiceException or
* a new IOException that wraps the unexpected ServiceException. * a new IOException that wraps the unexpected ServiceException.
*/ */
public static IOException getRemoteException(ServiceException se) { public static IOException getRemoteException(ServiceException se) {
Throwable e = se.getCause(); return ShadedProtobufHelper.getRemoteException(se);
if (e == null) {
return new IOException(se);
}
return e instanceof IOException ? (IOException) e : new IOException(se);
} }
/** /**
* Kept for backward compatible. * Extract the remote exception from an unshaded version of the protobuf
* libraries.
* Kept for backward compatibility.
* Return the IOException thrown by the remote server wrapped in * Return the IOException thrown by the remote server wrapped in
* ServiceException as cause. * ServiceException as cause.
* @param se ServiceException that wraps IO exception thrown by the server * @param se ServiceException that wraps IO exception thrown by the server
@ -71,29 +77,13 @@ public static IOException getRemoteException(
return e instanceof IOException ? (IOException) e : new IOException(se); return e instanceof IOException ? (IOException) e : new IOException(se);
} }
/**
* Map used to cache fixed strings to ByteStrings. Since there is no
* automatic expiration policy, only use this for strings from a fixed, small
* set.
* <p/>
* This map should not be accessed directly. Used the getFixedByteString
* methods instead.
*/
private final static ConcurrentHashMap<Object, ByteString>
FIXED_BYTESTRING_CACHE = new ConcurrentHashMap<>();
/** /**
* Get the ByteString for frequently used fixed and small set strings. * Get the ByteString for frequently used fixed and small set strings.
* @param key string * @param key string
* @return the ByteString for frequently used fixed and small set strings. * @return the ByteString for frequently used fixed and small set strings.
*/ */
public static ByteString getFixedByteString(Text key) { public static ByteString getFixedByteString(Text key) {
ByteString value = FIXED_BYTESTRING_CACHE.get(key); return ShadedProtobufHelper.getFixedByteString(key);
if (value == null) {
value = ByteString.copyFromUtf8(key.toString());
FIXED_BYTESTRING_CACHE.put(new Text(key.copyBytes()), value);
}
return value;
} }
/** /**
@ -102,34 +92,40 @@ public static ByteString getFixedByteString(Text key) {
* @return ByteString for frequently used fixed and small set strings. * @return ByteString for frequently used fixed and small set strings.
*/ */
public static ByteString getFixedByteString(String key) { public static ByteString getFixedByteString(String key) {
ByteString value = FIXED_BYTESTRING_CACHE.get(key); return ShadedProtobufHelper.getFixedByteString(key);
if (value == null) {
value = ByteString.copyFromUtf8(key);
FIXED_BYTESTRING_CACHE.put(key, value);
}
return value;
} }
/**
* Get the byte string of a non-null byte array.
* If the array is 0 bytes long, return a singleton to reduce object allocation.
* @param bytes bytes to convert.
* @return a value
*/
public static ByteString getByteString(byte[] bytes) { public static ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation // return singleton to reduce object allocation
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); return ShadedProtobufHelper.getByteString(bytes);
} }
/**
* Get a token from a TokenProto payload.
* @param tokenProto marshalled token
* @return the token.
*/
public static Token<? extends TokenIdentifier> tokenFromProto( public static Token<? extends TokenIdentifier> tokenFromProto(
TokenProto tokenProto) { TokenProto tokenProto) {
Token<? extends TokenIdentifier> token = new Token<>( return ShadedProtobufHelper.tokenFromProto(tokenProto);
tokenProto.getIdentifier().toByteArray(),
tokenProto.getPassword().toByteArray(), new Text(tokenProto.getKind()),
new Text(tokenProto.getService()));
return token;
} }
/**
* Create a {@code TokenProto} instance
* from a hadoop token.
* This builds and caches the fields
* (identifier, password, kind, service) but not
* renewer or any payload.
* @param tok token
* @return a marshallable protobuf class.
*/
public static TokenProto protoFromToken(Token<?> tok) { public static TokenProto protoFromToken(Token<?> tok) {
TokenProto.Builder builder = TokenProto.newBuilder(). return ShadedProtobufHelper.protoFromToken(tok);
setIdentifier(getByteString(tok.getIdentifier())).
setPassword(getByteString(tok.getPassword())).
setKindBytes(getFixedByteString(tok.getKind())).
setServiceBytes(getFixedByteString(tok.getService()));
return builder.build();
} }
} }

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.Preconditions;
/**
* A RpcWritable wrapper for unshaded protobuf messages.
* This class isolates unshaded protobuf classes from
* the rest of the RPC codebase, so it can operate without
* needing that on the classpath <i>at runtime</i>.
* The classes are needed at compile time; and if
* unshaded protobuf messages are to be marshalled, they
* will need to be on the classpath then.
* That is implicit: it is impossible to pass in a class
* which is a protobuf message unless that condition is met.
*/
@InterfaceAudience.Private
public class ProtobufWrapperLegacy extends RpcWritable {
private com.google.protobuf.Message message;
/**
* Construct.
* The type of the parameter is Object so as to keep the casting internal
* to this class.
* @param message message to wrap.
* @throws IllegalArgumentException if the class is not a protobuf message.
*/
public ProtobufWrapperLegacy(Object message) {
Preconditions.checkArgument(isUnshadedProtobufMessage(message),
"message class is not an unshaded protobuf message %s",
message.getClass());
this.message = (com.google.protobuf.Message) message;
}
public com.google.protobuf.Message getMessage() {
return message;
}
@Override
public void writeTo(ResponseBuffer out) throws IOException {
int length = message.getSerializedSize();
length += com.google.protobuf.CodedOutputStream.
computeUInt32SizeNoTag(length);
out.ensureCapacity(length);
message.writeDelimitedTo(out);
}
@SuppressWarnings("unchecked")
@Override
protected <T> T readFrom(ByteBuffer bb) throws IOException {
// using the parser with a byte[]-backed coded input stream is the
// most efficient way to deserialize a protobuf. it has a direct
// path to the PB ctor that doesn't create multi-layered streams
// that internally buffer.
com.google.protobuf.CodedInputStream cis =
com.google.protobuf.CodedInputStream.newInstance(
bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
try {
cis.pushLimit(cis.readRawVarint32());
message = message.getParserForType().parseFrom(cis);
cis.checkLastTagWas(0);
} finally {
// advance over the bytes read.
bb.position(bb.position() + cis.getTotalBytesRead());
}
return (T) message;
}
/**
* Has protobuf been looked for and is known as absent?
* Saves a check on every message.
*/
private static final AtomicBoolean PROTOBUF_KNOWN_NOT_FOUND =
new AtomicBoolean(false);
/**
* Is a message an unshaded protobuf message?
* @param payload payload
* @return true if protobuf.jar is on the classpath and the payload is a Message
*/
public static boolean isUnshadedProtobufMessage(Object payload) {
if (PROTOBUF_KNOWN_NOT_FOUND.get()) {
// protobuf is known to be absent. fail fast without examining
// jars or generating exceptions.
return false;
}
// load the protobuf message class.
// if it does not load, then the payload is guaranteed not to be
// an unshaded protobuf message
// this relies on classloader caching for performance
try {
Class<?> protobufMessageClazz =
Class.forName("com.google.protobuf.Message");
return protobufMessageClazz.isAssignableFrom(payload.getClass());
} catch (ClassNotFoundException e) {
PROTOBUF_KNOWN_NOT_FOUND.set(true);
return false;
}
}
}

View File

@ -31,9 +31,9 @@
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class maintains a cache of protocol versions and corresponding protocol * This class maintains a cache of protocol versions and corresponding protocol
@ -122,12 +122,8 @@ public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol,
builder.setProtocol(protocol.getName()); builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString()); builder.setRpcKind(rpcKind.toString());
GetProtocolSignatureResponseProto resp; GetProtocolSignatureResponseProto resp;
try { resp = ipc(() -> protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER,
resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER, builder.build()));
builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
versionMap = convertProtocolSignatureProtos(resp versionMap = convertProtocolSignatureProtos(resp
.getProtocolSignatureList()); .getProtocolSignatureList());
putVersionSignatureMap(serverAddress, protocol.getName(), putVersionSignatureMap(serverAddress, protocol.getName(),

View File

@ -41,9 +41,11 @@ static RpcWritable wrap(Object o) {
if (o instanceof RpcWritable) { if (o instanceof RpcWritable) {
return (RpcWritable)o; return (RpcWritable)o;
} else if (o instanceof Message) { } else if (o instanceof Message) {
// hadoop shaded protobuf
return new ProtobufWrapper((Message)o); return new ProtobufWrapper((Message)o);
} else if (o instanceof com.google.protobuf.Message) { } else if (ProtobufWrapperLegacy.isUnshadedProtobufMessage(o)) {
return new ProtobufWrapperLegacy((com.google.protobuf.Message) o); // unshaded protobuf
return new ProtobufWrapperLegacy(o);
} else if (o instanceof Writable) { } else if (o instanceof Writable) {
return new WritableWrapper((Writable)o); return new WritableWrapper((Writable)o);
} }
@ -134,49 +136,6 @@ <T> T readFrom(ByteBuffer bb) throws IOException {
} }
} }
// adapter for Protobufs.
static class ProtobufWrapperLegacy extends RpcWritable {
private com.google.protobuf.Message message;
ProtobufWrapperLegacy(com.google.protobuf.Message message) {
this.message = message;
}
com.google.protobuf.Message getMessage() {
return message;
}
@Override
void writeTo(ResponseBuffer out) throws IOException {
int length = message.getSerializedSize();
length += com.google.protobuf.CodedOutputStream.
computeUInt32SizeNoTag(length);
out.ensureCapacity(length);
message.writeDelimitedTo(out);
}
@SuppressWarnings("unchecked")
@Override
<T> T readFrom(ByteBuffer bb) throws IOException {
// using the parser with a byte[]-backed coded input stream is the
// most efficient way to deserialize a protobuf. it has a direct
// path to the PB ctor that doesn't create multi-layered streams
// that internally buffer.
com.google.protobuf.CodedInputStream cis =
com.google.protobuf.CodedInputStream.newInstance(
bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
try {
cis.pushLimit(cis.readRawVarint32());
message = message.getParserForType().parseFrom(cis);
cis.checkLastTagWas(0);
} finally {
// advance over the bytes read.
bb.position(bb.position() + cis.getTotalBytesRead());
}
return (T)message;
}
}
/** /**
* adapter to allow decoding of writables and protobufs from a byte buffer. * adapter to allow decoding of writables and protobufs from a byte buffer.
*/ */

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc.internal;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
/**
* Helper methods for protobuf related RPC implementation using the
* hadoop {@code org.apache.hadoop.thirdparty.protobuf} shaded version.
* This is <i>absolutely private to hadoop-* modules</i>.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class ShadedProtobufHelper {
private ShadedProtobufHelper() {
// Hidden constructor for class with only static helper methods
}
/**
* Return the IOException thrown by the remote server wrapped in
* ServiceException as cause.
* The signature of this method changes with updates to the hadoop-thirdparty
* shaded protobuf library.
* @param se ServiceException that wraps IO exception thrown by the server
* @return Exception wrapped in ServiceException or
* a new IOException that wraps the unexpected ServiceException.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static IOException getRemoteException(ServiceException se) {
Throwable e = se.getCause();
if (e == null) {
return new IOException(se);
}
return e instanceof IOException
? (IOException) e
: new IOException(se);
}
/**
* Map used to cache fixed strings to ByteStrings. Since there is no
* automatic expiration policy, only use this for strings from a fixed, small
* set.
* <p>
* This map should not be accessed directly. Used the getFixedByteString
* methods instead.
*/
private static final ConcurrentHashMap<Object, ByteString>
FIXED_BYTESTRING_CACHE = new ConcurrentHashMap<>();
/**
* Get the ByteString for frequently used fixed and small set strings.
* @param key Hadoop Writable Text string
* @return the ByteString for frequently used fixed and small set strings.
*/
public static ByteString getFixedByteString(Text key) {
ByteString value = FIXED_BYTESTRING_CACHE.get(key);
if (value == null) {
value = ByteString.copyFromUtf8(key.toString());
FIXED_BYTESTRING_CACHE.put(new Text(key.copyBytes()), value);
}
return value;
}
/**
* Get the ByteString for frequently used fixed and small set strings.
* @param key string
* @return ByteString for frequently used fixed and small set strings.
*/
public static ByteString getFixedByteString(String key) {
ByteString value = FIXED_BYTESTRING_CACHE.get(key);
if (value == null) {
value = ByteString.copyFromUtf8(key);
FIXED_BYTESTRING_CACHE.put(key, value);
}
return value;
}
/**
* Get the byte string of a non-null byte array.
* If the array is 0 bytes long, return a singleton to reduce object allocation.
* @param bytes bytes to convert.
* @return the protobuf byte string representation of the array.
*/
public static ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation
return (bytes.length == 0)
? ByteString.EMPTY
: ByteString.copyFrom(bytes);
}
/**
* Create a hadoop token from a protobuf token.
* @param tokenProto token
* @return a new token
*/
public static Token<? extends TokenIdentifier> tokenFromProto(
TokenProto tokenProto) {
Token<? extends TokenIdentifier> token = new Token<>(
tokenProto.getIdentifier().toByteArray(),
tokenProto.getPassword().toByteArray(),
new Text(tokenProto.getKind()),
new Text(tokenProto.getService()));
return token;
}
/**
* Create a {@code TokenProto} instance
* from a hadoop token.
* This builds and caches the fields
* (identifier, password, kind, service) but not
* renewer or any payload.
* @param tok token
* @return a marshallable protobuf class.
*/
public static TokenProto protoFromToken(Token<?> tok) {
TokenProto.Builder builder = TokenProto.newBuilder().
setIdentifier(getByteString(tok.getIdentifier())).
setPassword(getByteString(tok.getPassword())).
setKindBytes(getFixedByteString(tok.getKind())).
setServiceBytes(getFixedByteString(tok.getService()));
return builder.build();
}
/**
* Evaluate a protobuf call, converting any ServiceException to an IOException.
* @param call invocation to make
* @return the result of the call
* @param <T> type of the result
* @throws IOException any translated protobuf exception
*/
public static <T> T ipc(IpcCall<T> call) throws IOException {
try {
return call.call();
} catch (ServiceException e) {
throw getRemoteException(e);
}
}
@FunctionalInterface
public interface IpcCall<T> {
T call() throws ServiceException;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* IPC internal classes not for any use by libraries outside
* the apache hadoop source tree.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "YARN"})
@InterfaceStability.Unstable
package org.apache.hadoop.ipc.internal;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -18,8 +18,12 @@
/** /**
* Tools to help define network clients and servers. * Tools to help define network clients and servers.
* Other ASF projects use this package, often with their own shaded/unshaded
* versions of protobuf messages.
* Changes to the API signatures will break things, especially changes to
* {@link org.apache.hadoop.ipc.RPC} and {@link org.apache.hadoop.ipc.RpcEngine}.
*/ */
@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce"}) @InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce", "YARN", "Hive", "Ozone"})
@InterfaceStability.Evolving @InterfaceStability.Evolving
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -25,7 +25,6 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.ipc.RefreshResponse;
@ -34,9 +33,9 @@
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto; import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto; import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto; import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class GenericRefreshProtocolClientSideTranslatorPB implements public class GenericRefreshProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GenericRefreshProtocol, Closeable { ProtocolMetaInterface, GenericRefreshProtocol, Closeable {
@ -59,17 +58,13 @@ public void close() throws IOException {
public Collection<RefreshResponse> refresh(String identifier, String[] args) throws IOException { public Collection<RefreshResponse> refresh(String identifier, String[] args) throws IOException {
List<String> argList = Arrays.asList(args); List<String> argList = Arrays.asList(args);
try { GenericRefreshRequestProto request = GenericRefreshRequestProto.newBuilder()
GenericRefreshRequestProto request = GenericRefreshRequestProto.newBuilder() .setIdentifier(identifier).addAllArgs(argList).build();
.setIdentifier(identifier)
.addAllArgs(argList) GenericRefreshResponseCollectionProto resp = ipc(() ->
.build(); rpcProxy.refresh(NULL_CONTROLLER, request));
return unpack(resp);
GenericRefreshResponseCollectionProto resp = rpcProxy.refresh(NULL_CONTROLLER, request);
return unpack(resp);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
private Collection<RefreshResponse> unpack(GenericRefreshResponseCollectionProto collection) { private Collection<RefreshResponse> unpack(GenericRefreshResponseCollectionProto collection) {

View File

@ -21,16 +21,14 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueRequestProto; import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueRequestProto;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class RefreshCallQueueProtocolClientSideTranslatorPB implements public class RefreshCallQueueProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshCallQueueProtocol, Closeable { ProtocolMetaInterface, RefreshCallQueueProtocol, Closeable {
@ -55,12 +53,8 @@ public void close() throws IOException {
@Override @Override
public void refreshCallQueue() throws IOException { public void refreshCallQueue() throws IOException {
try { ipc(() -> rpcProxy.refreshCallQueue(NULL_CONTROLLER,
rpcProxy.refreshCallQueue(NULL_CONTROLLER, VOID_REFRESH_CALL_QUEUE_REQUEST));
VOID_REFRESH_CALL_QUEUE_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.security; package org.apache.hadoop.security;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.thirdparty.protobuf.ByteString; import org.apache.hadoop.thirdparty.protobuf.ByteString;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -46,7 +47,6 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.CredentialsKVProto; import org.apache.hadoop.security.proto.SecurityProtos.CredentialsKVProto;
@ -382,7 +382,7 @@ void writeProto(DataOutput out) throws IOException {
CredentialsKVProto.Builder kv = CredentialsKVProto.newBuilder(). CredentialsKVProto.Builder kv = CredentialsKVProto.newBuilder().
setAliasBytes(ByteString.copyFrom( setAliasBytes(ByteString.copyFrom(
e.getKey().getBytes(), 0, e.getKey().getLength())). e.getKey().getBytes(), 0, e.getKey().getLength())).
setToken(ProtobufHelper.protoFromToken(e.getValue())); setToken(ShadedProtobufHelper.protoFromToken(e.getValue()));
storage.addTokens(kv.build()); storage.addTokens(kv.build());
} }
@ -404,7 +404,7 @@ void readProto(DataInput in) throws IOException {
CredentialsProto storage = CredentialsProto.parseDelimitedFrom((DataInputStream)in); CredentialsProto storage = CredentialsProto.parseDelimitedFrom((DataInputStream)in);
for (CredentialsKVProto kv : storage.getTokensList()) { for (CredentialsKVProto kv : storage.getTokensList()) {
addToken(new Text(kv.getAliasBytes().toByteArray()), addToken(new Text(kv.getAliasBytes().toByteArray()),
ProtobufHelper.tokenFromProto(kv.getToken())); ShadedProtobufHelper.tokenFromProto(kv.getToken()));
} }
for (CredentialsKVProto kv : storage.getSecretsList()) { for (CredentialsKVProto kv : storage.getSecretsList()) {
addSecretKey(new Text(kv.getAliasBytes().toByteArray()), addSecretKey(new Text(kv.getAliasBytes().toByteArray()),

View File

@ -21,16 +21,14 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshServiceAclRequestProto; import org.apache.hadoop.security.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshServiceAclRequestProto;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class RefreshAuthorizationPolicyProtocolClientSideTranslatorPB implements public class RefreshAuthorizationPolicyProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshAuthorizationPolicyProtocol, Closeable { ProtocolMetaInterface, RefreshAuthorizationPolicyProtocol, Closeable {
@ -55,12 +53,8 @@ public void close() throws IOException {
@Override @Override
public void refreshServiceAcl() throws IOException { public void refreshServiceAcl() throws IOException {
try { ipc(() -> rpcProxy.refreshServiceAcl(NULL_CONTROLLER,
rpcProxy.refreshServiceAcl(NULL_CONTROLLER, VOID_REFRESH_SERVICE_ACL_REQUEST));
VOID_REFRESH_SERVICE_ACL_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override

View File

@ -21,16 +21,15 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto; import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto; import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class RefreshUserMappingsProtocolClientSideTranslatorPB implements public class RefreshUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshUserMappingsProtocol, Closeable { ProtocolMetaInterface, RefreshUserMappingsProtocol, Closeable {
@ -59,22 +58,14 @@ public void close() throws IOException {
@Override @Override
public void refreshUserToGroupsMappings() throws IOException { public void refreshUserToGroupsMappings() throws IOException {
try { ipc(() -> rpcProxy.refreshUserToGroupsMappings(NULL_CONTROLLER,
rpcProxy.refreshUserToGroupsMappings(NULL_CONTROLLER, VOID_REFRESH_USER_TO_GROUPS_MAPPING_REQUEST));
VOID_REFRESH_USER_TO_GROUPS_MAPPING_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
public void refreshSuperUserGroupsConfiguration() throws IOException { public void refreshSuperUserGroupsConfiguration() throws IOException {
try { ipc(() -> rpcProxy.refreshSuperUserGroupsConfiguration(NULL_CONTROLLER,
rpcProxy.refreshSuperUserGroupsConfiguration(NULL_CONTROLLER, VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_REQUEST));
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override

View File

@ -20,7 +20,7 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
@ -29,7 +29,8 @@
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class GetUserMappingsProtocolClientSideTranslatorPB implements public class GetUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GetUserMappingsProtocol, Closeable { ProtocolMetaInterface, GetUserMappingsProtocol, Closeable {
@ -53,11 +54,7 @@ public String[] getGroupsForUser(String user) throws IOException {
GetGroupsForUserRequestProto request = GetGroupsForUserRequestProto GetGroupsForUserRequestProto request = GetGroupsForUserRequestProto
.newBuilder().setUser(user).build(); .newBuilder().setUser(user).build();
GetGroupsForUserResponseProto resp; GetGroupsForUserResponseProto resp;
try { resp = ipc(() -> rpcProxy.getGroupsForUser(NULL_CONTROLLER, request));
resp = rpcProxy.getGroupsForUser(NULL_CONTROLLER, request);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return resp.getGroupsList().toArray(new String[resp.getGroupsCount()]); return resp.getGroupsList().toArray(new String[resp.getGroupsCount()]);
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import org.junit.Test;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
/**
* Test methods in {@link ShadedProtobufHelper}.
*/
public class TestShadedProtobufHelper extends AbstractHadoopTestBase {
@Test
public void testExtractRemoteExceptionNoCause() throws Throwable {
ServiceException source = new ServiceException("empty");
IOException ex = ShadedProtobufHelper.getRemoteException(source);
verifyCause(ServiceException.class, ex);
}
@Test
public void testExtractRemoteExceptionIOECause() throws Throwable {
IOException source = new IOException("ioe");
IOException ex = ShadedProtobufHelper.getRemoteException(
new ServiceException(source));
// if not the same, throw
if (!(ex == source)) {
throw ex;
}
}
@Test
public void testExtractRemoteExceptionOtherCause() throws Throwable {
NullPointerException source = new NullPointerException("npe");
IOException ex = ShadedProtobufHelper.getRemoteException(
new ServiceException(source));
// if not the same, throw
ServiceException c1 = verifyCause(ServiceException.class, ex);
verifyCause(NullPointerException.class, c1);
}
@Test
public void testIPCWrapperServiceException() throws Throwable {
intercept(IOException.class, "expected", () -> {
ipc(() -> {
throw new ServiceException("expected");
});
});
}
@Test
public void testIPCWrapperNPE() throws Throwable {
final IOException ex = intercept(IOException.class, "npe", () -> {
ipc(() -> {
throw new ServiceException(new NullPointerException("npe"));
});
});
ServiceException c1 = verifyCause(ServiceException.class, ex);
verifyCause(NullPointerException.class, c1);
}
}

View File

@ -819,7 +819,7 @@ public static <E extends Throwable> E verifyCause(
if (cause == null || !clazz.isAssignableFrom(cause.getClass())) { if (cause == null || !clazz.isAssignableFrom(cause.getClass())) {
throw caught; throw caught;
} else { } else {
return (E) caught; return (E) cause;
} }
} }

View File

@ -67,7 +67,6 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
@ -76,13 +75,13 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
* {@link ClientDatanodeProtocol} interfaces to the RPC server implementing * {@link ClientDatanodeProtocol} interfaces to the RPC server implementing
@ -197,31 +196,19 @@ public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto req =
GetReplicaVisibleLengthRequestProto.newBuilder() GetReplicaVisibleLengthRequestProto.newBuilder()
.setBlock(PBHelperClient.convert(b)).build(); .setBlock(PBHelperClient.convert(b)).build();
try { return ipc(() -> rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength());
return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void refreshNamenodes() throws IOException { public void refreshNamenodes() throws IOException {
try { ipc(() -> rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES));
rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void deleteBlockPool(String bpid, boolean force) throws IOException { public void deleteBlockPool(String bpid, boolean force) throws IOException {
DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder() DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder()
.setBlockPool(bpid).setForce(force).build(); .setBlockPool(bpid).setForce(force).build();
try { ipc(() -> rpcProxy.deleteBlockPool(NULL_CONTROLLER, req));
rpcProxy.deleteBlockPool(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -232,11 +219,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
.setBlock(PBHelperClient.convert(block)) .setBlock(PBHelperClient.convert(block))
.setToken(PBHelperClient.convert(token)).build(); .setToken(PBHelperClient.convert(token)).build();
GetBlockLocalPathInfoResponseProto resp; GetBlockLocalPathInfoResponseProto resp;
try { resp = ipc(() -> rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req));
resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()), return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()),
resp.getLocalPath(), resp.getLocalMetaPath()); resp.getLocalPath(), resp.getLocalMetaPath());
} }
@ -257,94 +240,61 @@ public Object getUnderlyingProxyObject() {
public void shutdownDatanode(boolean forUpgrade) throws IOException { public void shutdownDatanode(boolean forUpgrade) throws IOException {
ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto
.newBuilder().setForUpgrade(forUpgrade).build(); .newBuilder().setForUpgrade(forUpgrade).build();
try { ipc(() -> rpcProxy.shutdownDatanode(NULL_CONTROLLER, request));
rpcProxy.shutdownDatanode(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void evictWriters() throws IOException { public void evictWriters() throws IOException {
try { ipc(() -> rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS));
rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public DatanodeLocalInfo getDatanodeInfo() throws IOException { public DatanodeLocalInfo getDatanodeInfo() throws IOException {
GetDatanodeInfoResponseProto response; GetDatanodeInfoResponseProto response;
try { response = ipc(() -> rpcProxy.getDatanodeInfo(NULL_CONTROLLER,
response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO));
VOID_GET_DATANODE_INFO); return PBHelperClient.convert(response.getLocalInfo());
return PBHelperClient.convert(response.getLocalInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void startReconfiguration() throws IOException { public void startReconfiguration() throws IOException {
try { ipc(() -> rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG));
rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public ReconfigurationTaskStatus getReconfigurationStatus() public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException { throws IOException {
try { return ReconfigurationProtocolUtils.getReconfigurationStatus(
return ReconfigurationProtocolUtils.getReconfigurationStatus( ipc(() -> rpcProxy.getReconfigurationStatus(
rpcProxy NULL_CONTROLLER,
.getReconfigurationStatus( VOID_GET_RECONFIG_STATUS)));
NULL_CONTROLLER,
VOID_GET_RECONFIG_STATUS));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public List<String> listReconfigurableProperties() throws IOException { public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response; ListReconfigurablePropertiesResponseProto response;
try { response = ipc(() -> rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER, VOID_LIST_RECONFIGURABLE_PROPERTIES));
VOID_LIST_RECONFIGURABLE_PROPERTIES); return response.getNameList();
return response.getNameList();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void triggerBlockReport(BlockReportOptions options) public void triggerBlockReport(BlockReportOptions options)
throws IOException { throws IOException {
try { TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder().
TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder(). setIncremental(options.isIncremental());
setIncremental(options.isIncremental()); if (options.getNamenodeAddr() != null) {
if (options.getNamenodeAddr() != null) { builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr()));
builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr()));
}
rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
ipc(() -> rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build()));
} }
@Override @Override
public long getBalancerBandwidth() throws IOException { public long getBalancerBandwidth() throws IOException {
GetBalancerBandwidthResponseProto response; GetBalancerBandwidthResponseProto response;
try { response = ipc(() -> rpcProxy.getBalancerBandwidth(NULL_CONTROLLER,
response = rpcProxy.getBalancerBandwidth(NULL_CONTROLLER, VOID_GET_BALANCER_BANDWIDTH));
VOID_GET_BALANCER_BANDWIDTH); return response.getBandwidth();
return response.getBandwidth();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
/** /**
@ -363,19 +313,15 @@ public long getBalancerBandwidth() throws IOException {
public void submitDiskBalancerPlan(String planID, long planVersion, public void submitDiskBalancerPlan(String planID, long planVersion,
String planFile, String planData, boolean skipDateCheck) String planFile, String planData, boolean skipDateCheck)
throws IOException { throws IOException {
try { SubmitDiskBalancerPlanRequestProto request =
SubmitDiskBalancerPlanRequestProto request = SubmitDiskBalancerPlanRequestProto.newBuilder()
SubmitDiskBalancerPlanRequestProto.newBuilder() .setPlanID(planID)
.setPlanID(planID) .setPlanVersion(planVersion)
.setPlanVersion(planVersion) .setPlanFile(planFile)
.setPlanFile(planFile) .setPlan(planData)
.setPlan(planData) .setIgnoreDateCheck(skipDateCheck)
.setIgnoreDateCheck(skipDateCheck) .build();
.build(); ipc(() -> rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request));
rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
/** /**
@ -387,13 +333,9 @@ public void submitDiskBalancerPlan(String planID, long planVersion,
@Override @Override
public void cancelDiskBalancePlan(String planID) public void cancelDiskBalancePlan(String planID)
throws IOException { throws IOException {
try { CancelPlanRequestProto request = CancelPlanRequestProto.newBuilder()
CancelPlanRequestProto request = CancelPlanRequestProto.newBuilder() .setPlanID(planID).build();
.setPlanID(planID).build(); ipc(() -> rpcProxy.cancelDiskBalancerPlan(NULL_CONTROLLER, request));
rpcProxy.cancelDiskBalancerPlan(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
/** /**
@ -401,56 +343,44 @@ public void cancelDiskBalancePlan(String planID)
*/ */
@Override @Override
public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException { public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
try { QueryPlanStatusRequestProto request =
QueryPlanStatusRequestProto request = QueryPlanStatusRequestProto.newBuilder().build();
QueryPlanStatusRequestProto.newBuilder().build(); QueryPlanStatusResponseProto response =
QueryPlanStatusResponseProto response = ipc(() -> rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request));
rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request); DiskBalancerWorkStatus.Result result = Result.NO_PLAN;
DiskBalancerWorkStatus.Result result = Result.NO_PLAN; if(response.hasResult()) {
if(response.hasResult()) { result = DiskBalancerWorkStatus.Result.values()[
result = DiskBalancerWorkStatus.Result.values()[ response.getResult()];
response.getResult()];
}
return new DiskBalancerWorkStatus(result,
response.hasPlanID() ? response.getPlanID() : null,
response.hasPlanFile() ? response.getPlanFile() : null,
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return new DiskBalancerWorkStatus(result,
response.hasPlanID() ? response.getPlanID() : null,
response.hasPlanFile() ? response.getPlanFile() : null,
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
} }
@Override @Override
public String getDiskBalancerSetting(String key) throws IOException { public String getDiskBalancerSetting(String key) throws IOException {
try { DiskBalancerSettingRequestProto request =
DiskBalancerSettingRequestProto request = DiskBalancerSettingRequestProto.newBuilder().setKey(key).build();
DiskBalancerSettingRequestProto.newBuilder().setKey(key).build(); DiskBalancerSettingResponseProto response =
DiskBalancerSettingResponseProto response = ipc(() -> rpcProxy.getDiskBalancerSetting(NULL_CONTROLLER, request));
rpcProxy.getDiskBalancerSetting(NULL_CONTROLLER, request); return response.hasValue() ? response.getValue() : null;
return response.hasValue() ? response.getValue() : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public List<DatanodeVolumeInfo> getVolumeReport() throws IOException { public List<DatanodeVolumeInfo> getVolumeReport() throws IOException {
try { List<DatanodeVolumeInfo> volumeInfoList = new ArrayList<>();
List<DatanodeVolumeInfo> volumeInfoList = new ArrayList<>(); GetVolumeReportResponseProto volumeReport = ipc(() -> rpcProxy.getVolumeReport(
GetVolumeReportResponseProto volumeReport = rpcProxy.getVolumeReport( NULL_CONTROLLER, VOID_GET_DATANODE_STORAGE_INFO));
NULL_CONTROLLER, VOID_GET_DATANODE_STORAGE_INFO); List<DatanodeVolumeInfoProto> volumeProtoList = volumeReport
List<DatanodeVolumeInfoProto> volumeProtoList = volumeReport .getVolumeInfoList();
.getVolumeInfoList(); for (DatanodeVolumeInfoProto proto : volumeProtoList) {
for (DatanodeVolumeInfoProto proto : volumeProtoList) { volumeInfoList.add(new DatanodeVolumeInfo(proto.getPath(), proto
volumeInfoList.add(new DatanodeVolumeInfo(proto.getPath(), proto .getUsedSpace(), proto.getFreeSpace(), proto.getReservedSpace(),
.getUsedSpace(), proto.getFreeSpace(), proto.getReservedSpace(), proto.getReservedSpaceForReplicas(), proto.getNumBlocks(),
proto.getReservedSpaceForReplicas(), proto.getNumBlocks(), PBHelperClient.convertStorageType(proto.getStorageType())));
PBHelperClient.convertStorageType(proto.getStorageType())));
}
return volumeInfoList;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return volumeInfoList;
} }
} }

View File

@ -209,7 +209,7 @@
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ChunkedArrayList; import org.apache.hadoop.util.ChunkedArrayList;
@ -237,7 +237,7 @@ public class PBHelperClient {
FsAction.values(); FsAction.values();
private static ByteString getFixedByteString(String key) { private static ByteString getFixedByteString(String key) {
return ProtobufHelper.getFixedByteString(key); return ShadedProtobufHelper.getFixedByteString(key);
} }
/** /**
@ -260,7 +260,8 @@ private PBHelperClient() {
public static ByteString getByteString(byte[] bytes) { public static ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation // return singleton to reduce object allocation
return ProtobufHelper.getByteString(bytes); // return singleton to reduce object allocation
return ShadedProtobufHelper.getByteString(bytes);
} }
public static ShmId convert(ShortCircuitShmIdProto shmId) { public static ShmId convert(ShortCircuitShmIdProto shmId) {
@ -328,7 +329,7 @@ public static ExtendedBlockProto convert(final ExtendedBlock b) {
} }
public static TokenProto convert(Token<?> tok) { public static TokenProto convert(Token<?> tok) {
return ProtobufHelper.protoFromToken(tok); return ShadedProtobufHelper.protoFromToken(tok);
} }
public static ShortCircuitShmIdProto convert(ShmId shmId) { public static ShortCircuitShmIdProto convert(ShmId shmId) {
@ -814,8 +815,8 @@ public static StorageType[] convertStorageTypes(
public static Token<BlockTokenIdentifier> convert( public static Token<BlockTokenIdentifier> convert(
TokenProto blockToken) { TokenProto blockToken) {
return (Token<BlockTokenIdentifier>) ProtobufHelper return (Token<BlockTokenIdentifier>) ShadedProtobufHelper.tokenFromProto(
.tokenFromProto(blockToken); blockToken);
} }
// DatanodeId // DatanodeId

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
@ -44,7 +43,8 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -102,37 +102,25 @@ public Object getUnderlyingProxyObject() {
@Override @Override
public void startReconfiguration() throws IOException { public void startReconfiguration() throws IOException {
try { ipc(() -> rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG));
rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public ReconfigurationTaskStatus getReconfigurationStatus() public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException { throws IOException {
try { return ReconfigurationProtocolUtils.getReconfigurationStatus(
return ReconfigurationProtocolUtils.getReconfigurationStatus( ipc(() -> rpcProxy
rpcProxy .getReconfigurationStatus(
.getReconfigurationStatus( NULL_CONTROLLER,
NULL_CONTROLLER, VOID_GET_RECONFIG_STATUS)));
VOID_GET_RECONFIG_STATUS));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public List<String> listReconfigurableProperties() throws IOException { public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response; ListReconfigurablePropertiesResponseProto response;
try { response = ipc(() -> rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER, VOID_LIST_RECONFIGURABLE_PROPERTIES));
VOID_LIST_RECONFIGURABLE_PROPERTIES); return response.getNameList();
return response.getNameList();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override

View File

@ -146,7 +146,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>compile</scope> <scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>

View File

@ -103,7 +103,6 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -111,6 +110,8 @@
import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.getRemoteException;
/** /**
* This class forwards RouterAdminProtocol calls as RPC calls to the RouterAdmin server * This class forwards RouterAdminProtocol calls as RPC calls to the RouterAdmin server
* while translating from the parameter types used in RouterAdminProtocol to the * while translating from the parameter types used in RouterAdminProtocol to the
@ -156,7 +157,8 @@ public AddMountTableEntryResponse addMountTableEntry(
rpcProxy.addMountTableEntry(null, proto); rpcProxy.addMountTableEntry(null, proto);
return new AddMountTableEntryResponsePBImpl(response); return new AddMountTableEntryResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -169,7 +171,7 @@ public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesReq
AddMountTableEntriesResponseProto response = rpcProxy.addMountTableEntries(null, proto); AddMountTableEntriesResponseProto response = rpcProxy.addMountTableEntries(null, proto);
return new AddMountTableEntriesResponsePBImpl(response); return new AddMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -184,7 +186,8 @@ public UpdateMountTableEntryResponse updateMountTableEntry(
rpcProxy.updateMountTableEntry(null, proto); rpcProxy.updateMountTableEntry(null, proto);
return new UpdateMountTableEntryResponsePBImpl(response); return new UpdateMountTableEntryResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -199,7 +202,8 @@ public RemoveMountTableEntryResponse removeMountTableEntry(
rpcProxy.removeMountTableEntry(null, proto); rpcProxy.removeMountTableEntry(null, proto);
return new RemoveMountTableEntryResponsePBImpl(responseProto); return new RemoveMountTableEntryResponsePBImpl(responseProto);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -214,7 +218,8 @@ public GetMountTableEntriesResponse getMountTableEntries(
rpcProxy.getMountTableEntries(null, proto); rpcProxy.getMountTableEntries(null, proto);
return new GetMountTableEntriesResponsePBImpl(response); return new GetMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -228,7 +233,8 @@ public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request)
rpcProxy.enterSafeMode(null, proto); rpcProxy.enterSafeMode(null, proto);
return new EnterSafeModeResponsePBImpl(response); return new EnterSafeModeResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -242,7 +248,8 @@ public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request)
rpcProxy.leaveSafeMode(null, proto); rpcProxy.leaveSafeMode(null, proto);
return new LeaveSafeModeResponsePBImpl(response); return new LeaveSafeModeResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -256,7 +263,8 @@ public GetSafeModeResponse getSafeMode(GetSafeModeRequest request)
rpcProxy.getSafeMode(null, proto); rpcProxy.getSafeMode(null, proto);
return new GetSafeModeResponsePBImpl(response); return new GetSafeModeResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -271,7 +279,8 @@ public DisableNameserviceResponse disableNameservice(
rpcProxy.disableNameservice(null, proto); rpcProxy.disableNameservice(null, proto);
return new DisableNameserviceResponsePBImpl(response); return new DisableNameserviceResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -286,7 +295,8 @@ public EnableNameserviceResponse enableNameservice(
rpcProxy.enableNameservice(null, proto); rpcProxy.enableNameservice(null, proto);
return new EnableNameserviceResponsePBImpl(response); return new EnableNameserviceResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -300,7 +310,8 @@ public GetDisabledNameservicesResponse getDisabledNameservices(
rpcProxy.getDisabledNameservices(null, proto); rpcProxy.getDisabledNameservices(null, proto);
return new GetDisabledNameservicesResponsePBImpl(response); return new GetDisabledNameservicesResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -315,7 +326,8 @@ public RefreshMountTableEntriesResponse refreshMountTableEntries(
rpcProxy.refreshMountTableEntries(null, proto); rpcProxy.refreshMountTableEntries(null, proto);
return new RefreshMountTableEntriesResponsePBImpl(response); return new RefreshMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -330,7 +342,8 @@ public GetDestinationResponse getDestination(
rpcProxy.getDestination(null, proto); rpcProxy.getDestination(null, proto);
return new GetDestinationResponsePBImpl(response); return new GetDestinationResponsePBImpl(response);
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
@ -344,7 +357,8 @@ public boolean refreshSuperUserGroupsConfiguration() throws IOException {
return new RefreshSuperUserGroupsConfigurationResponsePBImpl(response) return new RefreshSuperUserGroupsConfigurationResponsePBImpl(response)
.getStatus(); .getStatus();
} catch (ServiceException e) { } catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
throw new IOException(getRemoteException(e).getMessage());
} }
} }
} }

View File

@ -211,9 +211,9 @@ public RouterAdminServer(Configuration conf, Router router)
RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService. RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService.
newReflectiveBlockingService(refreshCallQueueXlator); newReflectiveBlockingService(refreshCallQueueXlator);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, adminServer); genericRefreshService, adminServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, adminServer); refreshCallQueueService, adminServer);
registerRefreshFairnessPolicyControllerHandler(); registerRefreshFairnessPolicyControllerHandler();

View File

@ -341,11 +341,11 @@ public RouterRpcServer(Configuration conf, Router router,
.build(); .build();
// Add all the RPC protocols that the Router implements // Add all the RPC protocols that the Router implements
DFSUtil.addPBProtocol( DFSUtil.addInternalPBProtocol(
conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, this.rpcServer); refreshUserMappingService, this.rpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, this.rpcServer); getUserMappingService, this.rpcServer);
// Set service-level authorization security policy // Set service-level authorization security policy

View File

@ -196,7 +196,7 @@ private void setupRPCServer(final Configuration conf) throws IOException {
BlockingService nnProtoPbService = BlockingService nnProtoPbService =
NamenodeProtocolService.newReflectiveBlockingService( NamenodeProtocolService.newReflectiveBlockingService(
nnProtoXlator); nnProtoXlator);
DFSUtil.addPBProtocol( DFSUtil.addInternalPBProtocol(
conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer); conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);
DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator = DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
@ -204,7 +204,7 @@ private void setupRPCServer(final Configuration conf) throws IOException {
BlockingService dnProtoPbService = BlockingService dnProtoPbService =
DatanodeProtocolService.newReflectiveBlockingService( DatanodeProtocolService.newReflectiveBlockingService(
dnProtoPbXlator); dnProtoPbXlator);
DFSUtil.addPBProtocol( DFSUtil.addInternalPBProtocol(
conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer); conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);
HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator = HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
@ -212,7 +212,7 @@ private void setupRPCServer(final Configuration conf) throws IOException {
BlockingService haProtoPbService = BlockingService haProtoPbService =
HAServiceProtocolService.newReflectiveBlockingService( HAServiceProtocolService.newReflectiveBlockingService(
haServiceProtoXlator); haServiceProtoXlator);
DFSUtil.addPBProtocol( DFSUtil.addInternalPBProtocol(
conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer); conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
this.rpcServer.addTerseExceptions( this.rpcServer.addTerseExceptions(

View File

@ -130,7 +130,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>compile</scope> <scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>

View File

@ -67,6 +67,7 @@
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@ -1361,7 +1362,30 @@ static URI trimUri(URI uri) {
} }
/** /**
* Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server} * Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}.
* This method is for exclusive use by the hadoop libraries, as its signature
* changes with the version of the shaded protobuf library it has been built with.
* @param conf configuration
* @param protocol Protocol interface
* @param service service that implements the protocol
* @param server RPC server to which the protocol &amp; implementation is
* added to
* @throws IOException failure
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static void addInternalPBProtocol(Configuration conf,
Class<?> protocol,
BlockingService service,
RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine2.class);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
/**
* Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}.
* Deprecated as it will only reliably compile if an unshaded protobuf library
* is also on the classpath.
* @param conf configuration * @param conf configuration
* @param protocol Protocol interface * @param protocol Protocol interface
* @param service service that implements the protocol * @param service service that implements the protocol
@ -1369,17 +1393,17 @@ static URI trimUri(URI uri) {
* added to * added to
* @throws IOException * @throws IOException
*/ */
@Deprecated
public static void addPBProtocol(Configuration conf, Class<?> protocol, public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException { BlockingService service, RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine2.class); addInternalPBProtocol(conf, protocol, service, server);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
} }
/** /**
* Add protobuf based protocol to the {@link RPC.Server}. * Add protobuf based protocol to the {@link RPC.Server}.
* This engine uses Protobuf 2.5.0. Recommended to upgrade to * This engine uses Protobuf 2.5.0. Recommended to upgrade to
* Protobuf 3.x from hadoop-thirdparty and use * Protobuf 3.x from hadoop-thirdparty and use
* {@link DFSUtil#addPBProtocol(Configuration, Class, BlockingService, * {@link DFSUtil#addInternalPBProtocol(Configuration, Class, BlockingService,
* RPC.Server)}. * RPC.Server)}.
* @param conf configuration * @param conf configuration
* @param protocol Protocol interface * @param protocol Protocol interface

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -38,7 +37,8 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -96,11 +96,7 @@ public void sendLifeline(DatanodeRegistration registration,
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary( builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
volumeFailureSummary)); volumeFailureSummary));
} }
try { ipc(() -> rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build()));
rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override // ProtocolMetaInterface @Override // ProtocolMetaInterface

View File

@ -61,7 +61,6 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -71,10 +70,11 @@
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
* {@link DatanodeProtocol} interfaces to the RPC server implementing * {@link DatanodeProtocol} interfaces to the RPC server implementing
@ -123,11 +123,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration)); .newBuilder().setRegistration(PBHelper.convert(registration));
RegisterDatanodeResponseProto resp; RegisterDatanodeResponseProto resp;
try { resp = ipc(() -> rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build()));
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getRegistration()); return PBHelper.convert(resp.getRegistration());
} }
@ -164,11 +161,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
} }
HeartbeatResponseProto resp; HeartbeatResponseProto resp;
try { resp = ipc(() -> rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()));
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
DatanodeCommand[] cmds = new DatanodeCommand[resp.getCmdsList().size()]; DatanodeCommand[] cmds = new DatanodeCommand[resp.getCmdsList().size()];
int index = 0; int index = 0;
for (DatanodeCommandProto p : resp.getCmdsList()) { for (DatanodeCommandProto p : resp.getCmdsList()) {
@ -215,11 +209,7 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
} }
builder.setContext(PBHelper.convert(context)); builder.setContext(PBHelper.convert(context));
BlockReportResponseProto resp; BlockReportResponseProto resp;
try { resp = ipc(() -> rpcProxy.blockReport(NULL_CONTROLLER, builder.build()));
resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null; return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null;
} }
@ -235,11 +225,7 @@ public DatanodeCommand cacheReport(DatanodeRegistration registration,
} }
CacheReportResponseProto resp; CacheReportResponseProto resp;
try { resp = ipc(() -> rpcProxy.cacheReport(NULL_CONTROLLER, builder.build()));
resp = rpcProxy.cacheReport(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
if (resp.hasCmd()) { if (resp.hasCmd()) {
return PBHelper.convert(resp.getCmd()); return PBHelper.convert(resp.getCmd());
} }
@ -264,11 +250,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration registration,
} }
builder.addBlocks(repBuilder.build()); builder.addBlocks(repBuilder.build());
} }
try { ipc(() -> rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build()));
rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
@ -277,21 +259,13 @@ public void errorReport(DatanodeRegistration registration, int errorCode,
ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder() ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
.setRegistartion(PBHelper.convert(registration)) .setRegistartion(PBHelper.convert(registration))
.setErrorCode(errorCode).setMsg(msg).build(); .setErrorCode(errorCode).setMsg(msg).build();
try { ipc(() -> rpcProxy.errorReport(NULL_CONTROLLER, req));
rpcProxy.errorReport(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
public NamespaceInfo versionRequest() throws IOException { public NamespaceInfo versionRequest() throws IOException {
try { return PBHelper.convert(ipc(() -> rpcProxy.versionRequest(NULL_CONTROLLER,
return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER, VOID_VERSION_REQUEST).getInfo()));
VOID_VERSION_REQUEST).getInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -302,11 +276,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
builder.addBlocks(i, PBHelperClient.convertLocatedBlock(blocks[i])); builder.addBlocks(i, PBHelperClient.convertLocatedBlock(blocks[i]));
} }
ReportBadBlocksRequestProto req = builder.build(); ReportBadBlocksRequestProto req = builder.build();
try { ipc(() -> rpcProxy.reportBadBlocks(NULL_CONTROLLER, req));
rpcProxy.reportBadBlocks(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
@ -327,11 +297,7 @@ public void commitBlockSynchronization(ExtendedBlock block,
builder.addNewTargetStorages(newtargetstorages[i]); builder.addNewTargetStorages(newtargetstorages[i]);
} }
CommitBlockSynchronizationRequestProto req = builder.build(); CommitBlockSynchronizationRequestProto req = builder.build();
try { ipc(() -> rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req));
rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override // ProtocolMetaInterface @Override // ProtocolMetaInterface

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.hadoop.hdfs.protocolPB; package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -30,7 +29,6 @@
import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -54,6 +52,7 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*; import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*; import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate requests made to the * This class is the client side translator to translate requests made to the
@ -136,29 +135,24 @@ public InMemoryAliasMap.IterationResult list(Optional<Block> marker)
builder.setMarker(PBHelperClient.convert(marker.get())); builder.setMarker(PBHelperClient.convert(marker.get()));
} }
ListRequestProto request = builder.build(); ListRequestProto request = builder.build();
try { ListResponseProto response = ipc(() -> rpcProxy.list(null, request));
ListResponseProto response = rpcProxy.list(null, request); List<KeyValueProto> fileRegionsList = response.getFileRegionsList();
List<KeyValueProto> fileRegionsList = response.getFileRegionsList();
List<FileRegion> fileRegions = fileRegionsList List<FileRegion> fileRegions = fileRegionsList
.stream() .stream()
.map(kv -> new FileRegion( .map(kv -> new FileRegion(
PBHelperClient.convert(kv.getKey()), PBHelperClient.convert(kv.getKey()),
PBHelperClient.convert(kv.getValue()) PBHelperClient.convert(kv.getValue())
)) ))
.collect(Collectors.toList()); .collect(Collectors.toList());
BlockProto nextMarker = response.getNextMarker(); BlockProto nextMarker = response.getNextMarker();
if (nextMarker.isInitialized()) { if (nextMarker.isInitialized()) {
return new InMemoryAliasMap.IterationResult(fileRegions, return new InMemoryAliasMap.IterationResult(fileRegions,
Optional.of(PBHelperClient.convert(nextMarker))); Optional.of(PBHelperClient.convert(nextMarker)));
} else { } else {
return new InMemoryAliasMap.IterationResult(fileRegions, return new InMemoryAliasMap.IterationResult(fileRegions,
Optional.empty()); Optional.empty());
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
} }
@ -175,19 +169,15 @@ public Optional<ProvidedStorageLocation> read(@Nonnull Block block)
.newBuilder() .newBuilder()
.setKey(PBHelperClient.convert(block)) .setKey(PBHelperClient.convert(block))
.build(); .build();
try { ReadResponseProto response = ipc(() -> rpcProxy.read(null, request));
ReadResponseProto response = rpcProxy.read(null, request);
ProvidedStorageLocationProto providedStorageLocation = ProvidedStorageLocationProto providedStorageLocation =
response.getValue(); response.getValue();
if (providedStorageLocation.isInitialized()) { if (providedStorageLocation.isInitialized()) {
return Optional.of(PBHelperClient.convert(providedStorageLocation)); return Optional.of(PBHelperClient.convert(providedStorageLocation));
}
return Optional.empty();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return Optional.empty();
} }
@Override @Override
@ -206,22 +196,14 @@ public void write(@Nonnull Block block,
.build()) .build())
.build(); .build();
try { ipc(() -> rpcProxy.write(null, request));
rpcProxy.write(null, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public String getBlockPoolId() throws IOException { public String getBlockPoolId() throws IOException {
try { BlockPoolResponseProto response = ipc(() -> rpcProxy.getBlockPoolId(null,
BlockPoolResponseProto response = rpcProxy.getBlockPoolId(null, BlockPoolRequestProto.newBuilder().build()));
BlockPoolRequestProto.newBuilder().build()); return response.getBlockPoolId();
return response.getBlockPoolId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -42,7 +41,8 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -79,11 +79,7 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
InitReplicaRecoveryRequestProto req = InitReplicaRecoveryRequestProto InitReplicaRecoveryRequestProto req = InitReplicaRecoveryRequestProto
.newBuilder().setBlock(PBHelper.convert(rBlock)).build(); .newBuilder().setBlock(PBHelper.convert(rBlock)).build();
InitReplicaRecoveryResponseProto resp; InitReplicaRecoveryResponseProto resp;
try { resp = ipc(() -> rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req));
resp = rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (!resp.getReplicaFound()) { if (!resp.getReplicaFound()) {
// No replica found on the remote node. // No replica found on the remote node.
return null; return null;
@ -108,12 +104,9 @@ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
.setBlock(PBHelperClient.convert(oldBlock)) .setBlock(PBHelperClient.convert(oldBlock))
.setNewLength(newLength).setNewBlockId(newBlockId) .setNewLength(newLength).setNewBlockId(newBlockId)
.setRecoveryId(recoveryId).build(); .setRecoveryId(recoveryId).build();
try { return ipc(() -> rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req)
return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req .getStorageUuid());
).getStorageUuid();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override

View File

@ -29,13 +29,12 @@
import org.apache.hadoop.hdfs.server.protocol.FenceResponse; import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -69,11 +68,17 @@ public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
.setNumTxns(numTxns) .setNumTxns(numTxns)
.setRecords(PBHelperClient.getByteString(records)) .setRecords(PBHelperClient.getByteString(records))
.build(); .build();
try { ipc(() -> rpcProxy.journal(NULL_CONTROLLER, req));
rpcProxy.journal(NULL_CONTROLLER, req); }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); @Override
} public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
.setJournalInfo(PBHelper.convert(journalInfo)).build();
FenceResponseProto resp = ipc(() -> rpcProxy.fence(NULL_CONTROLLER, req));
return new FenceResponse(resp.getPreviousEpoch(),
resp.getLastTransactionId(), resp.getInSync());
} }
@Override @Override
@ -84,25 +89,7 @@ public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
.setEpoch(epoch) .setEpoch(epoch)
.setTxid(txid) .setTxid(txid)
.build(); .build();
try { ipc(() -> rpcProxy.startLogSegment(NULL_CONTROLLER, req));
rpcProxy.startLogSegment(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
.setJournalInfo(PBHelper.convert(journalInfo)).build();
try {
FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
return new FenceResponse(resp.getPreviousEpoch(),
resp.getLastTransactionId(), resp.getInSync());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override

View File

@ -51,14 +51,13 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -107,63 +106,39 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build(); .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
try { return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req)
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) .getBlocks()));
.getBlocks());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public ExportedBlockKeys getBlockKeys() throws IOException { public ExportedBlockKeys getBlockKeys() throws IOException {
try { GetBlockKeysResponseProto rsp = ipc(() -> rpcProxy.getBlockKeys(NULL_CONTROLLER,
GetBlockKeysResponseProto rsp = rpcProxy.getBlockKeys(NULL_CONTROLLER, VOID_GET_BLOCKKEYS_REQUEST));
VOID_GET_BLOCKKEYS_REQUEST); return rsp.hasKeys() ? PBHelper.convert(rsp.getKeys()) : null;
return rsp.hasKeys() ? PBHelper.convert(rsp.getKeys()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public long getTransactionID() throws IOException { public long getTransactionID() throws IOException {
try { return ipc(() -> rpcProxy.getTransactionId(NULL_CONTROLLER,
return rpcProxy.getTransactionId(NULL_CONTROLLER, VOID_GET_TRANSACTIONID_REQUEST).getTxId());
VOID_GET_TRANSACTIONID_REQUEST).getTxId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public long getMostRecentCheckpointTxId() throws IOException { public long getMostRecentCheckpointTxId() throws IOException {
try { return ipc(() -> rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER, GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId());
GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public CheckpointSignature rollEditLog() throws IOException { public CheckpointSignature rollEditLog() throws IOException {
try { return PBHelper.convert(ipc(() -> rpcProxy.rollEditLog(NULL_CONTROLLER,
return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER, VOID_ROLL_EDITLOG_REQUEST).getSignature()));
VOID_ROLL_EDITLOG_REQUEST).getSignature());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public NamespaceInfo versionRequest() throws IOException { public NamespaceInfo versionRequest() throws IOException {
try { return PBHelper.convert(ipc(() -> rpcProxy.versionRequest(NULL_CONTROLLER,
return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER, VOID_VERSION_REQUEST).getInfo()));
VOID_VERSION_REQUEST).getInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -172,11 +147,7 @@ public void errorReport(NamenodeRegistration registration, int errorCode,
ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder() ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
.setErrorCode(errorCode).setMsg(msg) .setErrorCode(errorCode).setMsg(msg)
.setRegistration(PBHelper.convert(registration)).build(); .setRegistration(PBHelper.convert(registration)).build();
try { ipc(() -> rpcProxy.errorReport(NULL_CONTROLLER, req));
rpcProxy.errorReport(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -184,13 +155,9 @@ public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException { NamenodeRegistration registration) throws IOException {
RegisterRequestProto req = RegisterRequestProto.newBuilder() RegisterRequestProto req = RegisterRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).build(); .setRegistration(PBHelper.convert(registration)).build();
try { return PBHelper.convert(
return PBHelper.convert( ipc(() -> rpcProxy.registerSubordinateNamenode(NULL_CONTROLLER, req)
rpcProxy.registerSubordinateNamenode(NULL_CONTROLLER, req) .getRegistration()));
.getRegistration());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -199,11 +166,7 @@ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
StartCheckpointRequestProto req = StartCheckpointRequestProto.newBuilder() StartCheckpointRequestProto req = StartCheckpointRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).build(); .setRegistration(PBHelper.convert(registration)).build();
NamenodeCommandProto cmd; NamenodeCommandProto cmd;
try { cmd = ipc(() -> rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand());
cmd = rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return PBHelper.convert(cmd); return PBHelper.convert(cmd);
} }
@ -213,11 +176,7 @@ public void endCheckpoint(NamenodeRegistration registration,
EndCheckpointRequestProto req = EndCheckpointRequestProto.newBuilder() EndCheckpointRequestProto req = EndCheckpointRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)) .setRegistration(PBHelper.convert(registration))
.setSignature(PBHelper.convert(sig)).build(); .setSignature(PBHelper.convert(sig)).build();
try { ipc(() -> rpcProxy.endCheckpoint(NULL_CONTROLLER, req));
rpcProxy.endCheckpoint(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -225,12 +184,8 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException { throws IOException {
GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto
.newBuilder().setSinceTxId(sinceTxId).build(); .newBuilder().setSinceTxId(sinceTxId).build();
try { return PBHelper.convert(ipc(() -> rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req) .getManifest()));
.getManifest());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -244,38 +199,26 @@ public boolean isMethodSupported(String methodName) throws IOException {
public boolean isUpgradeFinalized() throws IOException { public boolean isUpgradeFinalized() throws IOException {
IsUpgradeFinalizedRequestProto req = IsUpgradeFinalizedRequestProto IsUpgradeFinalizedRequestProto req = IsUpgradeFinalizedRequestProto
.newBuilder().build(); .newBuilder().build();
try { IsUpgradeFinalizedResponseProto response = ipc(() -> rpcProxy.isUpgradeFinalized(
IsUpgradeFinalizedResponseProto response = rpcProxy.isUpgradeFinalized( NULL_CONTROLLER, req));
NULL_CONTROLLER, req); return response.getIsUpgradeFinalized();
return response.getIsUpgradeFinalized();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public boolean isRollingUpgrade() throws IOException { public boolean isRollingUpgrade() throws IOException {
IsRollingUpgradeRequestProto req = IsRollingUpgradeRequestProto IsRollingUpgradeRequestProto req = IsRollingUpgradeRequestProto
.newBuilder().build(); .newBuilder().build();
try { IsRollingUpgradeResponseProto response = ipc(() -> rpcProxy.isRollingUpgrade(
IsRollingUpgradeResponseProto response = rpcProxy.isRollingUpgrade( NULL_CONTROLLER, req));
NULL_CONTROLLER, req); return response.getIsRollingUpgrade();
return response.getIsRollingUpgrade();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public Long getNextSPSPath() throws IOException { public Long getNextSPSPath() throws IOException {
GetNextSPSPathRequestProto req = GetNextSPSPathRequestProto req =
GetNextSPSPathRequestProto.newBuilder().build(); GetNextSPSPathRequestProto.newBuilder().build();
try { GetNextSPSPathResponseProto nextSPSPath =
GetNextSPSPathResponseProto nextSPSPath = ipc(() -> rpcProxy.getNextSPSPath(NULL_CONTROLLER, req));
rpcProxy.getNextSPSPath(NULL_CONTROLLER, req); return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
} }

View File

@ -20,14 +20,12 @@
package org.apache.hadoop.hdfs.qjournal.protocolPB; package org.apache.hadoop.hdfs.qjournal.protocolPB;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
@ -35,6 +33,8 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
* {@link InterQJournalProtocol} interfaces to the RPC server implementing * {@link InterQJournalProtocol} interfaces to the RPC server implementing
@ -63,21 +63,16 @@ public void close() {
public GetEditLogManifestResponseProto getEditLogManifestFromJournal( public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException { throws IOException {
try { GetEditLogManifestRequestProto.Builder req;
GetEditLogManifestRequestProto.Builder req; req = GetEditLogManifestRequestProto.newBuilder()
req = GetEditLogManifestRequestProto.newBuilder() .setJid(convertJournalId(jid))
.setJid(convertJournalId(jid)) .setSinceTxId(sinceTxId)
.setSinceTxId(sinceTxId) .setInProgressOk(inProgressOk);
.setInProgressOk(inProgressOk); if (nameServiceId !=null) {
if (nameServiceId !=null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER,
req.build()
);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return ipc(() -> rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER,
req.build()));
} }
private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) { private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {

View File

@ -63,13 +63,12 @@
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -97,36 +96,28 @@ public void close() {
@Override @Override
public boolean isFormatted(String journalId, public boolean isFormatted(String journalId,
String nameServiceId) throws IOException { String nameServiceId) throws IOException {
try { IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder()
IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder() .setJid(convertJournalId(journalId));
.setJid(convertJournalId(journalId)); if (nameServiceId != null) {
if (nameServiceId != null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
IsFormattedResponseProto resp = rpcProxy.isFormatted(
NULL_CONTROLLER, req.build());
return resp.getIsFormatted();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
IsFormattedResponseProto resp = ipc(() -> rpcProxy.isFormatted(
NULL_CONTROLLER, req.build()));
return resp.getIsFormatted();
} }
@Override @Override
public GetJournalStateResponseProto getJournalState(String jid, public GetJournalStateResponseProto getJournalState(String jid,
String nameServiceId) String nameServiceId)
throws IOException { throws IOException {
try { GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto
GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto .newBuilder()
.newBuilder() .setJid(convertJournalId(jid));
.setJid(convertJournalId(jid)); if (nameServiceId != null) {
if (nameServiceId != null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getJournalState(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return ipc(() -> rpcProxy.getJournalState(NULL_CONTROLLER, req.build()));
} }
private JournalIdProto convertJournalId(String jid) { private JournalIdProto convertJournalId(String jid) {
@ -140,19 +131,15 @@ public void format(String jid,
String nameServiceId, String nameServiceId,
NamespaceInfo nsInfo, NamespaceInfo nsInfo,
boolean force) throws IOException { boolean force) throws IOException {
try { FormatRequestProto.Builder req = FormatRequestProto.newBuilder()
FormatRequestProto.Builder req = FormatRequestProto.newBuilder() .setJid(convertJournalId(jid))
.setJid(convertJournalId(jid)) .setNsInfo(PBHelper.convert(nsInfo))
.setNsInfo(PBHelper.convert(nsInfo)) .setForce(force);
.setForce(force); if(nameServiceId != null) {
if(nameServiceId != null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
rpcProxy.format(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
ipc(() -> rpcProxy.format(NULL_CONTROLLER, req.build()));
} }
@Override @Override
@ -160,20 +147,16 @@ public NewEpochResponseProto newEpoch(String jid,
String nameServiceId, String nameServiceId,
NamespaceInfo nsInfo, NamespaceInfo nsInfo,
long epoch) throws IOException { long epoch) throws IOException {
try { NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder()
NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder() .setJid(convertJournalId(jid))
.setJid(convertJournalId(jid)) .setNsInfo(PBHelper.convert(nsInfo))
.setNsInfo(PBHelper.convert(nsInfo)) .setEpoch(epoch);
.setEpoch(epoch);
if(nameServiceId != null) { if(nameServiceId != null) {
req.setNameServiceId(nameServiceId); req.setNameServiceId(nameServiceId);
}
return rpcProxy.newEpoch(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return ipc(() -> rpcProxy.newEpoch(NULL_CONTROLLER, req.build()));
} }
@Override @Override
@ -187,22 +170,14 @@ public void journal(RequestInfo reqInfo,
.setNumTxns(numTxns) .setNumTxns(numTxns)
.setRecords(PBHelperClient.getByteString(records)) .setRecords(PBHelperClient.getByteString(records))
.build(); .build();
try { ipc(() -> rpcProxy.journal(NULL_CONTROLLER, req));
rpcProxy.journal(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void heartbeat(RequestInfo reqInfo) throws IOException { public void heartbeat(RequestInfo reqInfo) throws IOException {
try { ipc(() -> rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder() .setReqInfo(convert(reqInfo))
.setReqInfo(convert(reqInfo)) .build()));
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
private QJournalProtocolProtos.RequestInfoProto convert( private QJournalProtocolProtos.RequestInfoProto convert(
@ -227,11 +202,7 @@ public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion)
.setReqInfo(convert(reqInfo)) .setReqInfo(convert(reqInfo))
.setTxid(txid).setLayoutVersion(layoutVersion) .setTxid(txid).setLayoutVersion(layoutVersion)
.build(); .build();
try { ipc(() -> rpcProxy.startLogSegment(NULL_CONTROLLER, req));
rpcProxy.startLogSegment(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -243,11 +214,7 @@ public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
.setStartTxId(startTxId) .setStartTxId(startTxId)
.setEndTxId(endTxId) .setEndTxId(endTxId)
.build(); .build();
try { ipc(() -> rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req));
rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
@ -257,79 +224,58 @@ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
.setReqInfo(convert(reqInfo)) .setReqInfo(convert(reqInfo))
.setMinTxIdToKeep(minTxIdToKeep) .setMinTxIdToKeep(minTxIdToKeep)
.build(); .build();
try { ipc(() -> rpcProxy.purgeLogs(NULL_CONTROLLER, req));
rpcProxy.purgeLogs(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest( public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId, String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk) throws IOException { long sinceTxId, boolean inProgressOk) throws IOException {
try { GetEditLogManifestRequestProto.Builder req;
GetEditLogManifestRequestProto.Builder req; req = GetEditLogManifestRequestProto.newBuilder()
req = GetEditLogManifestRequestProto.newBuilder() .setJid(convertJournalId(jid))
.setJid(convertJournalId(jid)) .setSinceTxId(sinceTxId)
.setSinceTxId(sinceTxId) .setInProgressOk(inProgressOk);
.setInProgressOk(inProgressOk); if (nameServiceId !=null) {
if (nameServiceId !=null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
req.build()
);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
return ipc(() -> rpcProxy.getEditLogManifest(NULL_CONTROLLER,
req.build()));
} }
@Override @Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid, public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException { String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
try { GetJournaledEditsRequestProto.Builder req =
GetJournaledEditsRequestProto.Builder req = GetJournaledEditsRequestProto.newBuilder()
GetJournaledEditsRequestProto.newBuilder() .setJid(convertJournalId(jid))
.setJid(convertJournalId(jid)) .setSinceTxId(sinceTxId)
.setSinceTxId(sinceTxId) .setMaxTxns(maxTxns);
.setMaxTxns(maxTxns); if (nameServiceId != null) {
if (nameServiceId != null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
} }
return ipc(() -> rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build()));
} }
@Override @Override
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
long segmentTxId) throws IOException { long segmentTxId) throws IOException {
try { return ipc(() -> rpcProxy.prepareRecovery(NULL_CONTROLLER,
return rpcProxy.prepareRecovery(NULL_CONTROLLER, PrepareRecoveryRequestProto.newBuilder()
PrepareRecoveryRequestProto.newBuilder()
.setReqInfo(convert(reqInfo)) .setReqInfo(convert(reqInfo))
.setSegmentTxId(segmentTxId) .setSegmentTxId(segmentTxId)
.build()); .build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void acceptRecovery(RequestInfo reqInfo, public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException { SegmentStateProto stateToAccept, URL fromUrl) throws IOException {
try { ipc(() -> rpcProxy.acceptRecovery(NULL_CONTROLLER,
rpcProxy.acceptRecovery(NULL_CONTROLLER, AcceptRecoveryRequestProto.newBuilder()
AcceptRecoveryRequestProto.newBuilder()
.setReqInfo(convert(reqInfo)) .setReqInfo(convert(reqInfo))
.setStateToAccept(stateToAccept) .setStateToAccept(stateToAccept)
.setFromURL(fromUrl.toExternalForm()) .setFromURL(fromUrl.toExternalForm())
.build()); .build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
@ -340,42 +286,30 @@ public boolean isMethodSupported(String methodName) throws IOException {
@Override @Override
public void doPreUpgrade(String jid) throws IOException { public void doPreUpgrade(String jid) throws IOException {
try { DoPreUpgradeRequestProto.Builder req;
DoPreUpgradeRequestProto.Builder req; req = DoPreUpgradeRequestProto.newBuilder()
req = DoPreUpgradeRequestProto.newBuilder() .setJid(convertJournalId(jid));
.setJid(convertJournalId(jid)); ipc(() -> rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build()));
rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException { public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
try { ipc(() -> rpcProxy.doUpgrade(NULL_CONTROLLER,
rpcProxy.doUpgrade(NULL_CONTROLLER, DoUpgradeRequestProto.newBuilder()
DoUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(journalId)) .setJid(convertJournalId(journalId))
.setSInfo(PBHelper.convert(sInfo)) .setSInfo(PBHelper.convert(sInfo))
.build()); .build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
} }
@Override @Override
public void doFinalize(String jid, String nameServiceId) throws IOException { public void doFinalize(String jid, String nameServiceId) throws IOException {
try { DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto
DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto .newBuilder()
.newBuilder() .setJid(convertJournalId(jid));
.setJid(convertJournalId(jid)); if (nameServiceId != null) {
if (nameServiceId != null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
rpcProxy.doFinalize(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
ipc(() -> rpcProxy.doFinalize(NULL_CONTROLLER, req.build()));
} }
@Override @Override
@ -384,37 +318,29 @@ public Boolean canRollBack(String journalId,
StorageInfo storage, StorageInfo storage,
StorageInfo prevStorage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException { int targetLayoutVersion) throws IOException {
try { CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder()
CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder() .setJid(convertJournalId(journalId))
.setJid(convertJournalId(journalId)) .setStorage(PBHelper.convert(storage))
.setStorage(PBHelper.convert(storage)) .setPrevStorage(PBHelper.convert(prevStorage))
.setPrevStorage(PBHelper.convert(prevStorage)) .setTargetLayoutVersion(targetLayoutVersion);
.setTargetLayoutVersion(targetLayoutVersion); if (nameServiceId != null) {
if (nameServiceId != null) { req.setNameServiceId(nameServiceId);
req.setNameServiceId(nameServiceId);
}
CanRollBackResponseProto response = rpcProxy.canRollBack(
NULL_CONTROLLER, req.build());
return response.getCanRollBack();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
CanRollBackResponseProto response = ipc(() -> rpcProxy.canRollBack(
NULL_CONTROLLER, req.build()));
return response.getCanRollBack();
} }
@Override @Override
public void doRollback(String journalId, public void doRollback(String journalId,
String nameServiceId) throws IOException { String nameServiceId) throws IOException {
try { DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder()
DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder() .setJid(convertJournalId(journalId));
.setJid(convertJournalId(journalId));
if (nameServiceId != null) { if (nameServiceId != null) {
req.setNameserviceId(nameServiceId); req.setNameserviceId(nameServiceId);
}
rpcProxy.doRollback(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
ipc(() -> rpcProxy.doRollback(NULL_CONTROLLER, req.build()));
} }
@Override @Override
@ -422,37 +348,28 @@ public void discardSegments(String journalId,
String nameServiceId, String nameServiceId,
long startTxId) long startTxId)
throws IOException { throws IOException {
try { DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto
DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto .newBuilder()
.newBuilder() .setJid(convertJournalId(journalId)).setStartTxId(startTxId);
.setJid(convertJournalId(journalId)).setStartTxId(startTxId);
if (nameServiceId != null) { if (nameServiceId != null) {
req.setNameServiceId(nameServiceId); req.setNameServiceId(nameServiceId);
}
rpcProxy.discardSegments(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
ipc(() -> rpcProxy.discardSegments(NULL_CONTROLLER, req.build()));
} }
@Override @Override
public Long getJournalCTime(String journalId, public Long getJournalCTime(String journalId,
String nameServiceId) throws IOException { String nameServiceId) throws IOException {
try { GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto
.newBuilder()
GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto .setJid(convertJournalId(journalId));
.newBuilder() if(nameServiceId !=null) {
.setJid(convertJournalId(journalId)); req.setNameServiceId(nameServiceId);
if(nameServiceId !=null) {
req.setNameServiceId(nameServiceId);
}
GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
NULL_CONTROLLER, req.build());
return response.getResultCTime();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} }
GetJournalCTimeResponseProto response = ipc(() -> rpcProxy.getJournalCTime(
NULL_CONTROLLER, req.build()));
return response.getResultCTime();
} }
} }

View File

@ -125,7 +125,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
BlockingService interQJournalProtocolService = InterQJournalProtocolService BlockingService interQJournalProtocolService = InterQJournalProtocolService
.newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB); .newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB);
DFSUtil.addPBProtocol(confCopy, InterQJournalProtocolPB.class, DFSUtil.addInternalPBProtocol(confCopy, InterQJournalProtocolPB.class,
interQJournalProtocolService, server); interQJournalProtocolService, server);

View File

@ -1528,14 +1528,14 @@ private void initIpcServer() throws IOException {
= new ReconfigurationProtocolServerSideTranslatorPB(this); = new ReconfigurationProtocolServerSideTranslatorPB(this);
service = ReconfigurationProtocolService service = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator); .newReflectiveBlockingService(reconfigurationProtocolXlator);
DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service, DFSUtil.addInternalPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
ipcServer); ipcServer);
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this); new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService service = InterDatanodeProtocolService
.newReflectiveBlockingService(interDatanodeProtocolXlator); .newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(getConf(), InterDatanodeProtocolPB.class, service, DFSUtil.addInternalPBProtocol(getConf(), InterDatanodeProtocolPB.class, service,
ipcServer); ipcServer);
LOG.info("Opened IPC server at {}", ipcServer.getListenerAddress()); LOG.info("Opened IPC server at {}", ipcServer.getListenerAddress());

View File

@ -246,7 +246,7 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn)
new JournalProtocolServerSideTranslatorPB(this); new JournalProtocolServerSideTranslatorPB(this);
BlockingService service = JournalProtocolService BlockingService service = JournalProtocolService
.newReflectiveBlockingService(journalProtocolTranslator); .newReflectiveBlockingService(journalProtocolTranslator);
DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, DFSUtil.addInternalPBProtocol(conf, JournalProtocolPB.class, service,
this.clientRpcServer); this.clientRpcServer);
} }

View File

@ -371,24 +371,24 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
.build(); .build();
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, serviceRpcServer); reconfigurationPbService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, serviceRpcServer); refreshAuthService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, serviceRpcServer); refreshUserMappingService, serviceRpcServer);
// We support Refreshing call queue here in case the client RPC queue is full // We support Refreshing call queue here in case the client RPC queue is full
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, serviceRpcServer); refreshCallQueueService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, serviceRpcServer); genericRefreshService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer); getUserMappingService, serviceRpcServer);
// Update the address with the correct port // Update the address with the correct port
@ -431,7 +431,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
.setSecretManager(namesystem.getDelegationTokenSecretManager()) .setSecretManager(namesystem.getDelegationTokenSecretManager())
.build(); .build();
DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, DatanodeLifelineProtocolPB.class,
lifelineProtoPbService, lifelineRpcServer); lifelineProtoPbService, lifelineRpcServer);
// Update the address with the correct port // Update the address with the correct port
@ -474,23 +474,23 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
.build(); .build();
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, clientRpcServer); reconfigurationPbService, clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, clientRpcServer); refreshAuthService, clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, clientRpcServer); refreshUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, clientRpcServer); refreshCallQueueService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, clientRpcServer); genericRefreshService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer); getUserMappingService, clientRpcServer);
// set service-level authorization security policy // set service-level authorization security policy

View File

@ -36,6 +36,10 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId> <artifactId>hadoop-hdfs-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId> <artifactId>hadoop-hdfs</artifactId>

View File

@ -22,7 +22,6 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
@ -34,7 +33,8 @@
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
@Private @Private
public class HSAdminRefreshProtocolClientSideTranslatorPB implements public class HSAdminRefreshProtocolClientSideTranslatorPB implements
@ -73,43 +73,27 @@ public void close() throws IOException {
@Override @Override
public void refreshAdminAcls() throws IOException { public void refreshAdminAcls() throws IOException {
try { ipc(() -> rpcProxy.refreshAdminAcls(NULL_CONTROLLER,
rpcProxy.refreshAdminAcls(NULL_CONTROLLER, VOID_REFRESH_ADMIN_ACLS_REQUEST));
VOID_REFRESH_ADMIN_ACLS_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
public void refreshLoadedJobCache() throws IOException { public void refreshLoadedJobCache() throws IOException {
try { ipc(() -> rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER,
rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER, VOID_REFRESH_LOADED_JOB_CACHE_REQUEST));
VOID_REFRESH_LOADED_JOB_CACHE_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
public void refreshJobRetentionSettings() throws IOException { public void refreshJobRetentionSettings() throws IOException {
try { ipc(() -> rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER,
rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER, VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST));
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override
public void refreshLogRetentionSettings() throws IOException { public void refreshLogRetentionSettings() throws IOException {
try { ipc(() -> rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER,
rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER, VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST));
VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
} }
@Override @Override

View File

@ -39,6 +39,7 @@
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.avro</groupId> <groupId>org.apache.avro</groupId>

View File

@ -84,8 +84,14 @@
<!-- com.google.re2j version --> <!-- com.google.re2j version -->
<re2j.version>1.1</re2j.version> <re2j.version>1.1</re2j.version>
<!--Protobuf version for backward compatibility--> <!-- Protobuf version for backward compatibility -->
<!-- This is used in hadoop-common for compilation only -->
<protobuf.version>2.5.0</protobuf.version> <protobuf.version>2.5.0</protobuf.version>
<!-- Protobuf scope in hadoop common -->
<!-- set to "provided" and protobuf2 will no longer be exported as a dependency -->
<common.protobuf2.scope>compile</common.protobuf2.scope>
<!-- Protobuf scope in other modules which explicitly import the libarary -->
<transient.protobuf2.scope>${common.protobuf2.scope}</transient.protobuf2.scope>
<!-- ProtocolBuffer version, actually used in Hadoop --> <!-- ProtocolBuffer version, actually used in Hadoop -->
<hadoop.protobuf.version>3.7.1</hadoop.protobuf.version> <hadoop.protobuf.version>3.7.1</hadoop.protobuf.version>
<protoc.path>${env.HADOOP_PROTOC_PATH}</protoc.path> <protoc.path>${env.HADOOP_PROTOC_PATH}</protoc.path>

View File

@ -191,6 +191,7 @@
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -132,6 +132,7 @@
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.bouncycastle</groupId> <groupId>org.bouncycastle</groupId>

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -126,6 +125,8 @@
import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.getRemoteException;
@Private @Private
public class ResourceManagerAdministrationProtocolPBClientImpl implements ResourceManagerAdministrationProtocol, Closeable { public class ResourceManagerAdministrationProtocolPBClientImpl implements ResourceManagerAdministrationProtocol, Closeable {
@ -243,7 +244,7 @@ public String[] getGroupsForUser(String user) throws IOException {
return (String[]) responseProto.getGroupsList().toArray( return (String[]) responseProto.getGroupsList().toArray(
new String[responseProto.getGroupsCount()]); new String[responseProto.getGroupsCount()]);
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw getRemoteException(e);
} }
} }

View File

@ -38,6 +38,7 @@
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<version>${hadoop.protobuf.version}</version> <version>${hadoop.protobuf.version}</version>
<scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>

View File

@ -89,6 +89,7 @@
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -81,6 +81,7 @@
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<scope>${transient.protobuf2.scope}</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>