YARN-6683. Invalid event: COLLECTOR_UPDATE at KILLED. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2017-06-05 13:15:08 -07:00
parent 6aeda55bb8
commit 7311015ace
3 changed files with 2 additions and 80 deletions

View File

@ -69,7 +69,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -651,10 +650,7 @@ private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
String previousCollectorAddr = rmApp.getCollectorAddr();
if (previousCollectorAddr == null
|| !previousCollectorAddr.equals(collectorAddr)) {
// sending collector update event.
RMAppCollectorUpdateEvent event =
new RMAppCollectorUpdateEvent(appId, collectorAddr);
rmContext.getDispatcher().getEventHandler().handle(event);
rmApp.setCollectorAddr(collectorAddr);
}
}
}

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* Event used for updating collector address in RMApp on node heartbeat.
*/
public class RMAppCollectorUpdateEvent extends RMAppEvent {
private final String appCollectorAddr;
public RMAppCollectorUpdateEvent(ApplicationId appId,
String appCollectorAddr) {
super(appId, RMAppEventType.COLLECTOR_UPDATE);
this.appCollectorAddr = appCollectorAddr;
}
public String getAppCollectorAddr(){
return this.appCollectorAddr;
}
}

View File

@ -165,7 +165,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long storedFinishTime = 0;
private int firstAttemptIdInStateStore = 1;
private int nextAttemptId = 1;
private String collectorAddr;
private volatile String collectorAddr;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@ -217,8 +217,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@ -235,8 +233,6 @@ RMAppEventType.RECOVER, new RMAppRecoveredTransition())
// Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@ -251,8 +247,6 @@ RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@ -267,8 +261,6 @@ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
@ -294,8 +286,6 @@ RMAppEventType.KILL, new KillAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@ -325,8 +315,6 @@ RMAppEventType.KILL, new KillAttemptTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@ -338,8 +326,6 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@ -351,8 +337,6 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@ -1020,24 +1004,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
private static final class RMAppCollectorUpdateTransition
extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
LOG.info("Updating collector info for app: " + app.getApplicationId());
RMAppCollectorUpdateEvent appCollectorUpdateEvent =
(RMAppCollectorUpdateEvent) event;
// Update collector address
app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
}
};
}
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;