YARN-170. Change NodeManager stop to be reentrant. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1429796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-01-07 13:43:52 +00:00
parent b8a7a17191
commit 2cd41855d5
6 changed files with 135 additions and 40 deletions

View File

@ -92,6 +92,8 @@ Release 2.0.3-alpha - Unreleased
YARN-315. Using the common security token protobuf definition from hadoop
common. (Suresh Srinivas via vinodkv)
YARN-170. Change NodeManager stop to be reentrant. (Sandy Ryza via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,6 +45,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -54,11 +56,10 @@
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.util.Records;
public class NodeManager extends CompositeService implements
ServiceStateChangeListener {
public class NodeManager extends CompositeService
implements EventHandler<NodeManagerEvent> {
/**
* Priority of the NodeManager shutdown hook.
@ -82,6 +83,8 @@ public class NodeManager extends CompositeService implements
private long waitForContainersOnShutdownMillis;
private AtomicBoolean isStopping = new AtomicBoolean(false);
public NodeManager() {
super(NodeManager.class.getName());
}
@ -152,7 +155,6 @@ public void init(Configuration conf) {
NodeStatusUpdater nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
nodeStatusUpdater.register(this);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
@ -167,6 +169,7 @@ public void init(Configuration conf) {
addService(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
DefaultMetricsSystem.initialize("NodeManager");
@ -198,13 +201,17 @@ public void start() {
@Override
public void stop() {
if (isStopping.getAndSet(true)) {
return;
}
cleanupContainers();
super.stop();
DefaultMetricsSystem.shutdown();
}
@SuppressWarnings("unchecked")
private void cleanupContainers() {
protected void cleanupContainers() {
Map<ContainerId, Container> containers = context.getContainers();
if (containers.isEmpty()) {
return;
@ -293,24 +300,10 @@ public NodeHealthCheckerService getNodeHealthChecker() {
return nodeHealthChecker;
}
@Override
public void stateChanged(Service service) {
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
&& STATE.STOPPED.equals(service.getServiceState())) {
boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
stop();
// Reboot the whole node-manager if NodeStatusUpdater got a reboot command
// from the RM.
if (hasToReboot) {
LOG.info("Rebooting the node manager.");
NodeManager nodeManager = createNewNodeManager();
nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot);
}
}
private void reboot() {
LOG.info("Rebooting the node manager.");
NodeManager nodeManager = createNewNodeManager();
nodeManager.initAndStartNodeManager(this.getConfig(), true);
}
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
@ -333,6 +326,21 @@ private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
}
}
@Override
public void handle(NodeManagerEvent event) {
switch (event.getType()) {
case SHUTDOWN:
stop();
break;
case REBOOT:
stop();
reboot();
break;
default:
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
}
}
// For testing
NodeManager createNewNodeManager() {
return new NodeManager();

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class NodeManagerEvent extends
AbstractEvent<NodeManagerEventType>{
public NodeManagerEvent(NodeManagerEventType type) {
super(type);
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
public enum NodeManagerEventType {
SHUTDOWN, REBOOT
}

View File

@ -88,8 +88,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
private boolean hasToRebootNode;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName());
@ -149,18 +147,6 @@ public synchronized void stop() {
super.stop();
}
private synchronized void reboot() {
this.hasToRebootNode = true;
// Stop the status-updater. This will trigger a sub-service state change in
// the NodeManager which will then decide to reboot or not based on
// isRebooted.
this.stop();
}
synchronized boolean hasToRebootNode() {
return this.hasToRebootNode;
}
private boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
}
@ -348,13 +334,15 @@ public void run() {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
" hence shutting down.");
NodeStatusUpdaterImpl.this.stop();
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
}
if (response.getNodeAction() == NodeAction.REBOOT) {
LOG.info("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
NodeStatusUpdaterImpl.this.reboot();
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.REBOOT));
break;
}

View File

@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -97,7 +98,7 @@ public class TestNodeStatusUpdater {
public void tearDown() {
this.registeredNodes.clear();
heartBeatID = 0;
if (nm != null) {
if (nm != null && nm.getServiceState() == STATE.STARTED) {
nm.stop();
}
DefaultMetricsSystem.shutdown();
@ -446,6 +447,52 @@ public void run() {
nm.stop();
}
@Test
public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0);
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN;
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
@Override
protected void cleanupContainers() {
super.cleanupContainers();
numCleanups.incrementAndGet();
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// Meanwhile call stop directly as the shutdown hook would
nm.stop();
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
Assert.assertEquals(numCleanups.get(), 1);
}
@Test
public void testNodeDecommision() throws Exception {
nm = getNodeManager(NodeAction.SHUTDOWN);