diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 179b01e607..6cb8560e0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -451,6 +451,7 @@ protected void serviceInit(Configuration conf) throws Exception { // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + nmStore.setNodeStatusUpdater(nodeStatusUpdater); super.serviceInit(conf); // TODO add local dirs to del diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 34558749dc..0f659d9075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; @@ -155,6 +156,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private DB db; private boolean isNewlyCreated; + private boolean isHealthy; private Timer compactionTimer; /** @@ -169,6 +171,8 @@ public NMLeveldbStateStoreService() { @Override protected void startStorage() throws IOException { + // Assume that we're healthy when we start + isHealthy = true; } @Override @@ -187,6 +191,36 @@ public boolean isNewlyCreated() { return isNewlyCreated; } + /** + * If the state store throws an error after recovery has been performed + * then we can not trust it any more to reflect the NM state. We need to + * mark the store and node unhealthy. + * Errors during the recovery will cause a service failure and thus a NM + * start failure. Do not need to mark the store unhealthy for those. + * @param dbErr Exception + */ + private void markStoreUnHealthy(DBException dbErr) { + // Always log the error here, we might not see the error in the caller + LOG.error("Statestore exception: ", dbErr); + // We have already been marked unhealthy so no need to do it again. + if (!isHealthy) { + return; + } + // Mark unhealthy, an out of band heartbeat will be sent and the state + // will remain unhealthy (not recoverable). + // No need to close the store: does not make any difference at this point. + isHealthy = false; + // We could get here before the nodeStatusUpdater is set + NodeStatusUpdater nsu = getNodeStatusUpdater(); + if (nsu != null) { + nsu.reportException(dbErr); + } + } + + @VisibleForTesting + boolean isHealthy() { + return isHealthy; + } @Override public List loadContainersState() @@ -354,6 +388,7 @@ public void storeContainer(ContainerId containerId, int containerVersion, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -378,6 +413,7 @@ public void storeContainerQueued(ContainerId containerId) throws IOException { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -393,6 +429,7 @@ private void removeContainerQueued(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -408,6 +445,7 @@ public void storeContainerPaused(ContainerId containerId) throws IOException { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -424,6 +462,7 @@ public void removeContainerPaused(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -441,6 +480,7 @@ public void storeContainerDiagnostics(ContainerId containerId, try { db.put(bytes(key), bytes(diagnostics.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -459,6 +499,7 @@ public void storeContainerLaunched(ContainerId containerId) try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -488,6 +529,7 @@ public void storeContainerUpdateToken(ContainerId containerId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -504,6 +546,7 @@ public void storeContainerKilled(ContainerId containerId) try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -520,6 +563,7 @@ public void storeContainerCompleted(ContainerId containerId, try { db.put(bytes(key), bytes(Integer.toString(exitCode))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -532,6 +576,7 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, try { db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -544,6 +589,7 @@ public void storeContainerWorkDir(ContainerId containerId, try { db.put(bytes(key), bytes(workDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -556,6 +602,7 @@ public void storeContainerLogDir(ContainerId containerId, try { db.put(bytes(key), bytes(logDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -589,6 +636,7 @@ public void removeContainer(ContainerId containerId) batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -638,6 +686,7 @@ public void storeApplication(ApplicationId appId, try { db.put(bytes(key), p.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -659,6 +708,7 @@ public void removeApplication(ApplicationId appId) batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -815,6 +865,7 @@ public void startResourceLocalization(String user, ApplicationId appId, try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -838,6 +889,7 @@ public void finishResourceLocalization(String user, ApplicationId appId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -861,6 +913,7 @@ public void removeLocalizedResource(String user, ApplicationId appId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -926,6 +979,7 @@ public void storeDeletionTask(int taskId, try { db.put(bytes(key), taskProto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -936,6 +990,7 @@ public void removeDeletionTask(int taskId) throws IOException { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1009,6 +1064,7 @@ public void removeNMTokenApplicationMasterKey( try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1023,6 +1079,7 @@ private void storeMasterKey(String dbKey, MasterKey key) try { db.put(bytes(dbKey), pb.getProto().toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1096,6 +1153,7 @@ public void storeContainerToken(ContainerId containerId, Long expTime) try { db.put(bytes(key), bytes(expTime.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1107,6 +1165,7 @@ public void removeContainerToken(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1157,6 +1216,7 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1167,6 +1227,7 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1198,6 +1259,7 @@ public void storeAssignedResources(Container container, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } @@ -1361,6 +1423,7 @@ public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { try { db.delete(bytes(dbkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } return; @@ -1375,6 +1438,7 @@ public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, try { db.put(bytes(fullkey), data); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1386,6 +1450,7 @@ public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, try { db.delete(bytes(fullkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1409,6 +1474,7 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) candidates.add(key); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } finally { if (iter != null) { @@ -1422,6 +1488,7 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) db.delete(bytes(key)); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1555,6 +1622,11 @@ DB getDB() { return db; } + @VisibleForTesting + void setDB(DB testDb) { + this.db = testDb; + } + /** * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. * 2) Any incompatible change of state-store is a major upgrade, and any diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 598ea9e319..f9b86bf84e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -51,10 +52,20 @@ @Unstable public abstract class NMStateStoreService extends AbstractService { + private NodeStatusUpdater nodeStatusUpdater = null; + public NMStateStoreService(String name) { super(name); } + protected NodeStatusUpdater getNodeStatusUpdater() { + return nodeStatusUpdater; + } + + public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } + public static class RecoveredApplicationsState { List applications; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 3cac5b40bc..de667d159e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestNMLeveldbStateStoreService { private static final File TMP_DIR = new File( @@ -1165,6 +1168,38 @@ public void testStateStoreForResourceMapping() throws IOException { resourceMappings.getAssignedResources("numa").equals(numaRes)); } + @Test + public void testStateStoreNodeHealth() throws IOException { + // keep the working DB clean, break a temp DB + DB keepDB = stateStore.getDB(); + DB myMocked = mock(DB.class); + stateStore.setDB(myMocked); + + ApplicationId appId = ApplicationId.newInstance(1234, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + DBException toThrow = new DBException(); + Mockito.doThrow(toThrow).when(myMocked). + put(any(byte[].class), any(byte[].class)); + // write some data + try { + // chosen a simple method could be any of the "void" methods + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + stateStore.storeContainerKilled(containerId); + } catch (IOException ioErr) { + // Cause should be wrapped DBException + assertTrue(ioErr.getCause() instanceof DBException); + // check the store is marked unhealthy + assertFalse("Statestore should have been unhealthy", + stateStore.isHealthy()); + return; + } finally { + // restore the working DB + stateStore.setDB(keepDB); + } + Assert.fail("Expected exception not thrown"); + } + private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException { // create a container request