YARN-3634. TestMRTimelineEventHandling and TestApplication are broken. Contributed by Sangjin Lee.

(cherry picked from commit b059dd4882fd759e4762cc11c019be4b68fb74c1)
This commit is contained in:
Junping Du 2015-05-13 11:54:24 -07:00 committed by Sangjin Lee
parent 51d092faef
commit d275677e24
5 changed files with 42 additions and 24 deletions

View File

@ -1053,6 +1053,13 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId> <artifactId>hbase-client</artifactId>
<version>${hbase.version}</version> <version>${hbase.version}</version>
<exclusions>
<!-- exclude jdk.tools (1.7) as we're not managing it -->
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.phoenix</groupId> <groupId>org.apache.phoenix</groupId>
@ -1091,6 +1098,11 @@
<scope>test</scope> <scope>test</scope>
<optional>true</optional> <optional>true</optional>
<exclusions> <exclusions>
<!-- exclude jdk.tools (1.7) as we're not managing it -->
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>org.jruby</groupId> <groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId> <artifactId>jruby-complete</artifactId>

View File

@ -81,6 +81,11 @@ protected void serviceStart() throws Exception {
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT)); YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT));
server.start(); server.start();
collectorServerAddress = conf.updateConnectAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
server.getListenerAddress());
// start remaining services // start remaining services
super.serviceStart(); super.serviceStart();
LOG.info("NMCollectorService started at " + collectorServerAddress); LOG.info("NMCollectorService started at " + collectorServerAddress);

View File

@ -962,7 +962,7 @@ protected void startContainerInternal(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L; long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.valueOf(flowRunIdStr); flowRunId = Long.parseLong(flowRunIdStr);
} }
Application application = new ApplicationImpl(dispatcher, user, Application application = new ApplicationImpl(dispatcher, user,
flowName, flowVersion, flowRunId, applicationID, credentials, context); flowName, flowVersion, flowRunId, applicationID, credentials, context);

View File

@ -539,6 +539,7 @@ private class WrappedApplication {
new ApplicationACLsManager(conf)); new ApplicationACLsManager(conf));
when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr);
when(context.getNMStateStore()).thenReturn(stateStoreService); when(context.getNMStateStore()).thenReturn(stateStoreService);
when(context.getConf()).thenReturn(conf);
// Setting master key // Setting master key
MasterKey masterKey = new MasterKeyPBImpl(); MasterKey masterKey = new MasterKeyPBImpl();

View File

@ -69,9 +69,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private String timelineRestServerBindAddress; private String timelineRestServerBindAddress;
private CollectorNodemanagerProtocol nmCollectorService; private volatile CollectorNodemanagerProtocol nmCollectorService;
private InetSocketAddress nmCollectorServiceAddress;
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
@ -84,19 +82,8 @@ protected NodeTimelineCollectorManager() {
super(NodeTimelineCollectorManager.class.getName()); super(NodeTimelineCollectorManager.class.getName());
} }
@Override
public void serviceInit(Configuration conf) throws Exception {
this.nmCollectorServiceAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
super.serviceInit(conf);
}
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
nmCollectorService = getNMCollectorService();
startWebApp(); startWebApp();
super.serviceStart(); super.serviceStart();
} }
@ -176,7 +163,7 @@ private void reportNewCollectorToNM(ApplicationId appId)
this.timelineRestServerBindAddress); this.timelineRestServerBindAddress);
LOG.info("Report a new collector for application: " + appId + LOG.info("Report a new collector for application: " + appId +
" to the NM Collector Service."); " to the NM Collector Service.");
nmCollectorService.reportNewCollectorInfo(request); getNMCollectorService().reportNewCollectorInfo(request);
} }
private void updateTimelineCollectorContext( private void updateTimelineCollectorContext(
@ -186,7 +173,7 @@ private void updateTimelineCollectorContext(
GetTimelineCollectorContextRequest.newInstance(appId); GetTimelineCollectorContextRequest.newInstance(appId);
LOG.info("Get timeline collector context for " + appId); LOG.info("Get timeline collector context for " + appId);
GetTimelineCollectorContextResponse response = GetTimelineCollectorContextResponse response =
nmCollectorService.getTimelineCollectorContext(request); getNMCollectorService().getTimelineCollectorContext(request);
String userId = response.getUserId(); String userId = response.getUserId();
if (userId != null && !userId.isEmpty()) { if (userId != null && !userId.isEmpty()) {
collector.getTimelineEntityContext().setUserId(userId); collector.getTimelineEntityContext().setUserId(userId);
@ -207,13 +194,26 @@ private void updateTimelineCollectorContext(
@VisibleForTesting @VisibleForTesting
protected CollectorNodemanagerProtocol getNMCollectorService() { protected CollectorNodemanagerProtocol getNMCollectorService() {
Configuration conf = getConfig(); if (nmCollectorService == null) {
final YarnRPC rpc = YarnRPC.create(conf); synchronized (this) {
if (nmCollectorService == null) {
Configuration conf = getConfig();
InetSocketAddress nmCollectorServiceAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
LOG.info("nmCollectorServiceAddress: " + nmCollectorServiceAddress);
final YarnRPC rpc = YarnRPC.create(conf);
// TODO Security settings. // TODO Security settings.
return (CollectorNodemanagerProtocol) rpc.getProxy( nmCollectorService = (CollectorNodemanagerProtocol) rpc.getProxy(
CollectorNodemanagerProtocol.class, CollectorNodemanagerProtocol.class,
nmCollectorServiceAddress, conf); nmCollectorServiceAddress, conf);
}
}
}
return nmCollectorService;
} }
@VisibleForTesting @VisibleForTesting