YARN-79. Implement close on all clients to YARN so that RPC clients don't throw exceptions on shut-down. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1380942 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
658e7b5818
commit
ab74b1adde
@ -56,6 +56,9 @@ Release 2.1.0-alpha - Unreleased
|
||||
YARN-37. Change TestRMAppTransitions to use the DrainDispatcher.
|
||||
(Mayank Bansal via sseth)
|
||||
|
||||
YARN-79. Implement close on all clients to YARN so that RPC clients don't
|
||||
throw exceptions on shut-down. (Vinod Kumar Vavilapalli)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -33,5 +33,10 @@
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.hadoop.yarn.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestYarnClient {
|
||||
@ -27,4 +29,16 @@ public class TestYarnClient {
|
||||
// More to come later.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientStop() {
|
||||
Configuration conf = new Configuration();
|
||||
ResourceManager rm = new ResourceManager(null);
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
YarnClient client = new YarnClientImpl();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.impl.pb.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@ -46,16 +47,19 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterR
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
public class AMRMProtocolPBClientImpl implements AMRMProtocol {
|
||||
public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
|
||||
|
||||
private AMRMProtocolPB proxy;
|
||||
|
||||
public AMRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
|
||||
public AMRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
|
||||
Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, AMRMProtocolPB.class, ProtobufRpcEngine.class);
|
||||
proxy = (AMRMProtocolPB)RPC.getProxy(
|
||||
AMRMProtocolPB.class, clientVersion, addr, conf);
|
||||
proxy =
|
||||
(AMRMProtocolPB) RPC.getProxy(AMRMProtocolPB.class, clientVersion,
|
||||
addr, conf);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.proxy != null) {
|
||||
RPC.stopProxy(this.proxy);
|
||||
@ -65,7 +69,8 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol {
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnRemoteException {
|
||||
AllocateRequestProto requestProto = ((AllocateRequestPBImpl)request).getProto();
|
||||
AllocateRequestProto requestProto =
|
||||
((AllocateRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new AllocateResponsePBImpl(proxy.allocate(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
@ -73,14 +78,14 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request) throws YarnRemoteException {
|
||||
FinishApplicationMasterRequestProto requestProto = ((FinishApplicationMasterRequestPBImpl)request).getProto();
|
||||
FinishApplicationMasterRequestProto requestProto =
|
||||
((FinishApplicationMasterRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new FinishApplicationMasterResponsePBImpl(proxy.finishApplicationMaster(null, requestProto));
|
||||
return new FinishApplicationMasterResponsePBImpl(
|
||||
proxy.finishApplicationMaster(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -89,9 +94,11 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol {
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnRemoteException {
|
||||
RegisterApplicationMasterRequestProto requestProto = ((RegisterApplicationMasterRequestPBImpl)request).getProto();
|
||||
RegisterApplicationMasterRequestProto requestProto =
|
||||
((RegisterApplicationMasterRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new RegisterApplicationMasterResponsePBImpl(proxy.registerApplicationMaster(null, requestProto));
|
||||
return new RegisterApplicationMasterResponsePBImpl(
|
||||
proxy.registerApplicationMaster(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.impl.pb.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@ -81,22 +82,35 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestPr
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
|
||||
Closeable {
|
||||
|
||||
private ClientRMProtocolPB proxy;
|
||||
|
||||
public ClientRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, ClientRMProtocolPB.class, ProtobufRpcEngine.class);
|
||||
proxy = (ClientRMProtocolPB)RPC.getProxy(
|
||||
ClientRMProtocolPB.class, clientVersion, addr, conf);
|
||||
|
||||
public ClientRMProtocolPBClientImpl(long clientVersion,
|
||||
InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, ClientRMProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
proxy =
|
||||
(ClientRMProtocolPB) RPC.getProxy(ClientRMProtocolPB.class,
|
||||
clientVersion, addr, conf);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.proxy != null) {
|
||||
RPC.stopProxy(this.proxy);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnRemoteException {
|
||||
KillApplicationRequestProto requestProto = ((KillApplicationRequestPBImpl)request).getProto();
|
||||
KillApplicationRequestProto requestProto =
|
||||
((KillApplicationRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto));
|
||||
return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -105,9 +119,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
@Override
|
||||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnRemoteException {
|
||||
GetApplicationReportRequestProto requestProto = ((GetApplicationReportRequestPBImpl)request).getProto();
|
||||
GetApplicationReportRequestProto requestProto =
|
||||
((GetApplicationReportRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport(null, requestProto));
|
||||
return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -116,9 +132,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
@Override
|
||||
public GetClusterMetricsResponse getClusterMetrics(
|
||||
GetClusterMetricsRequest request) throws YarnRemoteException {
|
||||
GetClusterMetricsRequestProto requestProto = ((GetClusterMetricsRequestPBImpl)request).getProto();
|
||||
GetClusterMetricsRequestProto requestProto =
|
||||
((GetClusterMetricsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null, requestProto));
|
||||
return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -127,9 +145,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnRemoteException {
|
||||
GetNewApplicationRequestProto requestProto = ((GetNewApplicationRequestPBImpl)request).getProto();
|
||||
GetNewApplicationRequestProto requestProto =
|
||||
((GetNewApplicationRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, requestProto));
|
||||
return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -138,9 +158,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnRemoteException {
|
||||
SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
|
||||
SubmitApplicationRequestProto requestProto =
|
||||
((SubmitApplicationRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
|
||||
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -149,24 +171,25 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
@Override
|
||||
public GetAllApplicationsResponse getAllApplications(
|
||||
GetAllApplicationsRequest request) throws YarnRemoteException {
|
||||
GetAllApplicationsRequestProto requestProto =
|
||||
((GetAllApplicationsRequestPBImpl)request).getProto();
|
||||
GetAllApplicationsRequestProto requestProto =
|
||||
((GetAllApplicationsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetAllApplicationsResponsePBImpl(
|
||||
proxy.getAllApplications(null, requestProto));
|
||||
return new GetAllApplicationsResponsePBImpl(proxy.getAllApplications(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodesResponse getClusterNodes(
|
||||
GetClusterNodesRequest request) throws YarnRemoteException {
|
||||
public GetClusterNodesResponse
|
||||
getClusterNodes(GetClusterNodesRequest request)
|
||||
throws YarnRemoteException {
|
||||
GetClusterNodesRequestProto requestProto =
|
||||
((GetClusterNodesRequestPBImpl)request).getProto();
|
||||
((GetClusterNodesRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetClusterNodesResponsePBImpl(
|
||||
proxy.getClusterNodes(null, requestProto));
|
||||
return new GetClusterNodesResponsePBImpl(proxy.getClusterNodes(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -176,10 +199,10 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
||||
throws YarnRemoteException {
|
||||
GetQueueInfoRequestProto requestProto =
|
||||
((GetQueueInfoRequestPBImpl)request).getProto();
|
||||
((GetQueueInfoRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetQueueInfoResponsePBImpl(
|
||||
proxy.getQueueInfo(null, requestProto));
|
||||
return new GetQueueInfoResponsePBImpl(proxy.getQueueInfo(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -189,10 +212,10 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
|
||||
GetQueueUserAclsInfoRequestProto requestProto =
|
||||
((GetQueueUserAclsInfoRequestPBImpl)request).getProto();
|
||||
((GetQueueUserAclsInfoRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetQueueUserAclsInfoResponsePBImpl(
|
||||
proxy.getQueueUserAcls(null, requestProto));
|
||||
return new GetQueueUserAclsInfoResponsePBImpl(proxy.getQueueUserAcls(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -202,12 +225,12 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
|
||||
public GetDelegationTokenResponse getDelegationToken(
|
||||
GetDelegationTokenRequest request) throws YarnRemoteException {
|
||||
GetDelegationTokenRequestProto requestProto =
|
||||
((GetDelegationTokenRequestPBImpl)request).getProto();
|
||||
try {
|
||||
return new GetDelegationTokenResponsePBImpl(
|
||||
proxy.getDelegationToken(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
((GetDelegationTokenRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetDelegationTokenResponsePBImpl(proxy.getDelegationToken(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,9 +18,9 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.impl.pb.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.io.Closeable;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
@ -58,22 +58,26 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
|
||||
+ "rpc.nm-command-timeout";
|
||||
|
||||
/**
|
||||
* Maximum of 1 minute timeout for a Node to react to the command
|
||||
* Maximum of 1 minute timeout for a Node to react to the command
|
||||
*/
|
||||
static final int DEFAULT_COMMAND_TIMEOUT = 60000;
|
||||
|
||||
private ContainerManagerPB proxy;
|
||||
|
||||
public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
|
||||
|
||||
public ContainerManagerPBClientImpl(long clientVersion,
|
||||
InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, ContainerManagerPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
|
||||
int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
|
||||
proxy = (ContainerManagerPB)RPC.getProxy(
|
||||
ContainerManagerPB.class, clientVersion, addr, ugi, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf), expireIntvl);
|
||||
proxy =
|
||||
(ContainerManagerPB) RPC.getProxy(ContainerManagerPB.class,
|
||||
clientVersion, addr, ugi, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf), expireIntvl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.proxy != null) {
|
||||
RPC.stopProxy(this.proxy);
|
||||
@ -83,9 +87,11 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
|
||||
@Override
|
||||
public GetContainerStatusResponse getContainerStatus(
|
||||
GetContainerStatusRequest request) throws YarnRemoteException {
|
||||
GetContainerStatusRequestProto requestProto = ((GetContainerStatusRequestPBImpl)request).getProto();
|
||||
GetContainerStatusRequestProto requestProto =
|
||||
((GetContainerStatusRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetContainerStatusResponsePBImpl(proxy.getContainerStatus(null, requestProto));
|
||||
return new GetContainerStatusResponsePBImpl(proxy.getContainerStatus(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -94,9 +100,11 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
|
||||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
StartContainerRequestProto requestProto = ((StartContainerRequestPBImpl)request).getProto();
|
||||
StartContainerRequestProto requestProto =
|
||||
((StartContainerRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new StartContainerResponsePBImpl(proxy.startContainer(null, requestProto));
|
||||
return new StartContainerResponsePBImpl(proxy.startContainer(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
@ -105,11 +113,11 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
|
||||
@Override
|
||||
public StopContainerResponse stopContainer(StopContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
StopContainerRequestProto requestProto = ((StopContainerRequestPBImpl) request)
|
||||
.getProto();
|
||||
StopContainerRequestProto requestProto =
|
||||
((StopContainerRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new StopContainerResponsePBImpl(proxy.stopContainer(null,
|
||||
requestProto));
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user