From ac5f21dbef0f0ad4210e4027f53877760fa606a5 Mon Sep 17 00:00:00 2001 From: Eric Badger Date: Fri, 24 Jul 2020 22:35:16 +0000 Subject: [PATCH] YARN-4771. Some containers can be skipped during log aggregation after NM restart. Contributed by Jason Lowe and Jim Brennan. --- .../nodemanager/NodeStatusUpdaterImpl.java | 11 +++++---- .../nodemanager/TestNodeStatusUpdater.java | 24 +++++++++---------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0725d42309..37da31a322 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -777,8 +777,13 @@ public void removeVeryOldStoppedContainersFromCache() { while (i.hasNext()) { Entry mapEntry = i.next(); ContainerId cid = mapEntry.getKey(); - if (mapEntry.getValue() < currentTime) { - if (!context.getContainers().containsKey(cid)) { + if (mapEntry.getValue() >= currentTime) { + break; + } + if (!context.getContainers().containsKey(cid)) { + ApplicationId appId = + cid.getApplicationAttemptId().getApplicationId(); + if (isApplicationStopped(appId)) { i.remove(); try { context.getNMStateStore().removeContainer(cid); @@ -786,8 +791,6 @@ public void removeVeryOldStoppedContainersFromCache() { LOG.error("Unable to remove container " + cid + " in store", e); } } - } else { - break; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index c0831ee022..2477af2512 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -931,9 +931,8 @@ public void deleteBaseDir() throws IOException { public void testRecentlyFinishedContainers() throws Exception { NodeManager nm = new NodeManager(); YarnConfiguration conf = new YarnConfiguration(); - conf.set( - NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, - "10000"); + conf.setInt(NodeStatusUpdaterImpl. + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 1); nm.init(conf); NodeStatusUpdaterImpl nodeStatusUpdater = (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); @@ -948,18 +947,17 @@ public void testRecentlyFinishedContainers() throws Exception { nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + // verify container remains even after expiration if app + // is still active nm.getNMContext().getContainers().remove(cId); - long time1 = System.currentTimeMillis(); - int waitInterval = 15; - while (waitInterval-- > 0 - && nodeStatusUpdater.isContainerRecentlyStopped(cId)) { - nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); - Thread.sleep(1000); - } - long time2 = System.currentTimeMillis(); - // By this time the container will be removed from cache. need to verify. + Thread.sleep(10); + nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); + Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + + // complete the application and verify container is removed + nm.getNMContext().getApplications().remove(appId); + nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000); } @Test(timeout = 90000)