YARN-1686. Fixed NodeManager to properly handle any errors during re-registration after a RESYNC and thus avoid hanging. Contributed by Rohith Sharma.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1571474 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1ab2f5a916
commit
d07f855892
@ -344,6 +344,10 @@ Release 2.4.0 - UNRELEASED
|
|||||||
YARN-1742. Fixed javadoc of configuration parameter
|
YARN-1742. Fixed javadoc of configuration parameter
|
||||||
DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION. (Akira Ajisaka via vinodkv)
|
DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION. (Akira Ajisaka via vinodkv)
|
||||||
|
|
||||||
|
YARN-1686. Fixed NodeManager to properly handle any errors during
|
||||||
|
re-registration after a RESYNC and thus avoid hanging. (Rohith Sharma via
|
||||||
|
vinodkv)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -224,11 +224,16 @@ public class NodeManager extends CompositeService
|
|||||||
new Thread() {
|
new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Notifying ContainerManager to block new container-requests");
|
try {
|
||||||
containerManager.setBlockNewContainerRequests(true);
|
LOG.info("Notifying ContainerManager to block new container-requests");
|
||||||
LOG.info("Cleaning up running containers on resync");
|
containerManager.setBlockNewContainerRequests(true);
|
||||||
containerManager.cleanupContainersOnNMResync();
|
LOG.info("Cleaning up running containers on resync");
|
||||||
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
|
containerManager.cleanupContainersOnNMResync();
|
||||||
|
((NodeStatusUpdaterImpl) nodeStatusUpdater).rebootNodeStatusUpdater();
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
LOG.fatal("Error while rebooting NodeStatusUpdater.", e);
|
||||||
|
shutDown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}.start();
|
}.start();
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
@ -65,6 +66,7 @@ public class TestNodeManagerResync {
|
|||||||
private FileContext localFS;
|
private FileContext localFS;
|
||||||
private CyclicBarrier syncBarrier;
|
private CyclicBarrier syncBarrier;
|
||||||
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
||||||
|
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws UnsupportedFileSystemException {
|
public void setup() throws UnsupportedFileSystemException {
|
||||||
@ -138,6 +140,30 @@ public class TestNodeManagerResync {
|
|||||||
nm.stop();
|
nm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testNMshutdownWhenResyncThrowException() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
NodeManager nm = new TestNodeManager3();
|
||||||
|
YarnConfiguration conf = createNMConfig();
|
||||||
|
nm.init(conf);
|
||||||
|
nm.start();
|
||||||
|
Assert.assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount());
|
||||||
|
nm.getNMDispatcher().getEventHandler()
|
||||||
|
.handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
||||||
|
|
||||||
|
synchronized (isNMShutdownCalled) {
|
||||||
|
while (isNMShutdownCalled.get() == false) {
|
||||||
|
try {
|
||||||
|
isNMShutdownCalled.wait();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get());
|
||||||
|
}
|
||||||
|
|
||||||
private YarnConfiguration createNMConfig() {
|
private YarnConfiguration createNMConfig() {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
|
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
|
||||||
@ -322,4 +348,44 @@ public class TestNodeManagerResync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TestNodeManager3 extends NodeManager {
|
||||||
|
|
||||||
|
private int registrationCount = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||||
|
return new TestNodeStatusUpdaterImpl3(context, dispatcher, healthChecker,
|
||||||
|
metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNMRegistrationCount() {
|
||||||
|
return registrationCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void shutDown() {
|
||||||
|
synchronized (isNMShutdownCalled) {
|
||||||
|
isNMShutdownCalled.set(true);
|
||||||
|
isNMShutdownCalled.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestNodeStatusUpdaterImpl3 extends MockNodeStatusUpdater {
|
||||||
|
|
||||||
|
public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher,
|
||||||
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
|
super(context, dispatcher, healthChecker, metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void registerWithRM() throws YarnException, IOException {
|
||||||
|
super.registerWithRM();
|
||||||
|
registrationCount++;
|
||||||
|
if (registrationCount > 1) {
|
||||||
|
throw new YarnRuntimeException("Registration with RM failed.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user