From a9a55db0650b5e6b7d88afb883b88aba7a13cd44 Mon Sep 17 00:00:00 2001 From: junping_du Date: Mon, 22 Sep 2014 22:45:06 -0700 Subject: [PATCH 01/10] YARN-2584. TestContainerManagerSecurity fails on trunk. (Contributed by Jian He) --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 2 +- .../server/TestContainerManagerSecurity.java | 28 +++++++++++++------ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2bc118de5d..ec9630296e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -421,6 +421,9 @@ Release 2.6.0 - UNRELEASED YARN-2540. FairScheduler: Queue filters not working on scheduler page in RM UI. (Ashwin Shankar via kasha) + YARN-2584. TestContainerManagerSecurity fails on trunk. (Jian He via + junping_du) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b9feacb6e8..b4dcf1f2d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -422,7 +422,7 @@ public void addCompletedContainer(ContainerId containerId) { @VisibleForTesting @Private public void removeCompletedContainersFromContext( - ListcontainerIds) throws IOException { + List containerIds) throws IOException { Set removedContainers = new HashSet(); // If the AM has pulled the completedContainer it can be removed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 9bb44ca54f..3f82d72abc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -27,10 +27,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.LinkedList; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteStreams; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -80,6 +79,9 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteStreams; + @RunWith(Parameterized.class) public class TestContainerManagerSecurity extends KerberosSecurityTestcase { @@ -137,7 +139,7 @@ public TestContainerManagerSecurity(Configuration conf) { this.conf = conf; } - @Test (timeout = 1000000) + @Test (timeout = 120000) public void testContainerManager() throws Exception { try { yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class @@ -162,7 +164,7 @@ public void testContainerManager() throws Exception { } } - @Test (timeout = 500000) + @Test (timeout = 120000) public void testContainerManagerWithEpoch() throws Exception { try { yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class @@ -311,7 +313,7 @@ private void testNMTokens(Configuration conf) throws Exception { // trying to stop the container. It should not throw any exception. testStopContainer(rpc, validAppAttemptId, validNode, validContainerId, validNMToken, false); - + // Rolling over master key twice so that we can check whether older keys // are used for authentication. rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); @@ -326,7 +328,7 @@ private void testNMTokens(Configuration conf) throws Exception { sb.append(" was recently stopped on node manager"); Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, validContainerId, validNMToken, true).contains(sb.toString())); - + // Now lets remove the container from nm-memory nm.getNodeStatusUpdater().clearFinishedContainersFromCache(); @@ -355,14 +357,22 @@ private void testNMTokens(Configuration conf) throws Exception { private void waitForContainerToFinishOnNM(ContainerId containerId) { Context nmContet = yarnCluster.getNodeManager(0).getNMContext(); int interval = 4 * 60; // Max time for container token to expire. + Assert.assertNotNull(nmContet.getContainers().containsKey(containerId)); while ((interval-- > 0) - && nmContet.getContainers().containsKey(containerId)) { + && !nmContet.getContainers().get(containerId) + .cloneAndGetContainerStatus().getState() + .equals(ContainerState.COMPLETE)) { try { + LOG.info("Waiting for " + containerId + " to complete."); Thread.sleep(1000); } catch (InterruptedException e) { } } - Assert.assertFalse(nmContet.getContainers().containsKey(containerId)); + // Normally, Containers will be removed from NM context after they are + // explicitly acked by RM. Now, manually remove it for testing. + yarnCluster.getNodeManager(0).getNodeStatusUpdater() + .addCompletedContainer(containerId); + nmContet.getContainers().remove(containerId); } protected void waitForNMToReceiveNMTokenKey( From 568d3dc2bbe43b7d2833d5da2b0e6d75eb86e5dd Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Mon, 22 Sep 2014 23:49:39 -0700 Subject: [PATCH 02/10] YARN-1959. Fix headroom calculation in FairScheduler. (Anubhav Dhoot via kasha) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSAppAttempt.java | 27 ++++++++ .../scheduler/fair/SchedulingPolicy.java | 15 +++++ .../DominantResourceFairnessPolicy.java | 17 ++++- .../fair/policies/FairSharePolicy.java | 11 ++++ .../scheduler/fair/policies/FifoPolicy.java | 12 ++++ .../scheduler/fair/TestFSAppAttempt.java | 63 +++++++++++++++++++ 7 files changed, 147 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ec9630296e..b8a938d46b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -244,6 +244,9 @@ Release 2.6.0 - UNRELEASED YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5. (Wei Yan via kasha) + YARN-1959. Fix headroom calculation in FairScheduler. + (Anubhav Dhoot via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 825c3985c7..b9966e7f55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -171,6 +171,33 @@ private synchronized void unreserveInternal( + priority + "; currentReservation " + currentReservation); } + @Override + public synchronized Resource getHeadroom() { + final FSQueue queue = (FSQueue) this.queue; + SchedulingPolicy policy = queue.getPolicy(); + + Resource queueFairShare = queue.getFairShare(); + Resource queueUsage = queue.getResourceUsage(); + Resource clusterResource = this.scheduler.getClusterResource(); + Resource clusterUsage = this.scheduler.getRootQueueMetrics() + .getAllocatedResources(); + Resource clusterAvailableResource = Resources.subtract(clusterResource, + clusterUsage); + Resource headroom = policy.getHeadroom(queueFairShare, + queueUsage, clusterAvailableResource); + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for " + this.getName() + ":" + + "Min(" + + "(queueFairShare=" + queueFairShare + + " - queueUsage=" + queueUsage + ")," + + " clusterAvailableResource=" + clusterAvailableResource + + "(clusterResource=" + clusterResource + + " - clusterUsage=" + clusterUsage + ")" + + "Headroom=" + headroom); + } + return headroom; + } + public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index ca006c580e..4f3123dffd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -175,4 +175,19 @@ public abstract boolean checkIfUsageOverFairShare( */ public abstract boolean checkIfAMResourceUsageOverLimit( Resource usage, Resource maxAMResource); + + /** + * Get headroom by calculating the min of clusterAvailable and + * (queueFairShare - queueUsage) resources that are + * applicable to this policy. For eg if only memory then leave other + * resources such as CPU to same as clusterAvailable. + * + * @param queueFairShare fairshare in the queue + * @param queueUsage resources used in the queue + * @param clusterAvailable available resource in cluster + * @return calculated headroom + */ + public abstract Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 42044bcaac..3f6cbd19ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -77,7 +77,7 @@ public void computeSteadyShares(Collection queues, ComputeFairShares.computeSteadyShares(queues, totalResources, type); } } - + @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { return !Resources.fitsIn(usage, fairShare); @@ -88,6 +88,21 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes return !Resources.fitsIn(usage, maxAMResource); } + @Override + public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, + Resource clusterAvailable) { + int queueAvailableMemory = + Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0); + int queueAvailableCPU = + Math.max(queueFairShare.getVirtualCores() - queueUsage + .getVirtualCores(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + Math.min(clusterAvailable.getVirtualCores(), + queueAvailableCPU)); + return headroom; + } + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 66bb88bf16..97669cb4e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -114,6 +114,17 @@ public Comparator getComparator() { return comparator; } + @Override + public Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable) { + int queueAvailableMemory = Math.max( + queueFairShare.getMemory() - queueUsage.getMemory(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + clusterAvailable.getVirtualCores()); + return headroom; + } + @Override public void computeShares(Collection schedulables, Resource totalResources) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 591ee4936b..a2e17ecb0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -107,6 +107,18 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes return usage.getMemory() > maxAMResource.getMemory(); } + @Override + public Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable) { + int queueAvailableMemory = Math.max( + queueFairShare.getMemory() - queueUsage.getMemory(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + clusterAvailable.getVirtualCores()); + return headroom; + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 0ab1f70147..f560690d93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import static org.junit.Assert.assertEquals; @@ -26,7 +27,12 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -185,4 +191,61 @@ public void testLocalityLevelWithoutDelays() { assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( prio, 10, -1.0, -1.0)); } + + @Test + public void testHeadroom() { + final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class); + Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock()); + + final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class); + final Resource queueFairShare = Resources.createResource(4096, 4); + final Resource queueUsage = Resource.newInstance(1024, 1); + final Resource clusterResource = Resources.createResource(8192, 8); + final Resource clusterUsage = Resources.createResource(6144, 2); + final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class); + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt schedulerApp = + new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue , + null, rmContext); + + Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); + Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); + Mockito.when(mockScheduler.getClusterResource()).thenReturn + (clusterResource); + Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn + (clusterUsage); + Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn + (fakeRootQueueMetrics); + + int minClusterAvailableMemory = 2048; + int minClusterAvailableCPU = 6; + int minQueueAvailableCPU = 3; + + // Min of Memory and CPU across cluster and queue is used in + // DominantResourceFairnessPolicy + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(DominantResourceFairnessPolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minQueueAvailableCPU); + + // Fair and Fifo ignore CPU of queue, so use cluster available CPU + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(FairSharePolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minClusterAvailableCPU); + + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(FifoPolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minClusterAvailableCPU); + } + + protected void verifyHeadroom(FSAppAttempt schedulerApp, + int expectedMemory, int expectedCPU) { + Resource headroom = schedulerApp.getHeadroom(); + assertEquals(expectedMemory, headroom.getMemory()); + assertEquals(expectedCPU, headroom.getVirtualCores()); + } } From f5578207d2d20aa71adf12aa9f6c37b00ee9b8dc Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 23 Sep 2014 00:03:16 -0700 Subject: [PATCH 03/10] YARN-2252. Intermittent failure of TestFairScheduler.testContinuousScheduling. (Ratandeep Ratti and kasha via kasha) --- hadoop-yarn-project/CHANGES.txt | 4 ++ .../scheduler/fair/TestFairScheduler.java | 72 ++++++++++--------- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b8a938d46b..a2d0536b09 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -427,6 +427,10 @@ Release 2.6.0 - UNRELEASED YARN-2584. TestContainerManagerSecurity fails on trunk. (Jian He via junping_du) + YARN-2252. Intermittent failure of + TestFairScheduler.testContinuousScheduling. + (Ratandeep Ratti and kasha via kasha) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a6e928a4f4..67164c6c0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -131,8 +131,14 @@ public void setUp() throws IOException { @After public void tearDown() { - scheduler = null; - resourceManager = null; + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } QueueMetrics.clearQueueMetrics(); DefaultMetricsSystem.shutdown(); } @@ -140,7 +146,7 @@ public void tearDown() { @Test (timeout = 30000) public void testConfValidation() throws Exception { - FairScheduler scheduler = new FairScheduler(); + scheduler = new FairScheduler(); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); @@ -212,7 +218,7 @@ public void testLoadConfigurationOnInitialize() throws IOException { @Test public void testNonMinZeroResourcesSettings() throws IOException { - FairScheduler fs = new FairScheduler(); + scheduler = new FairScheduler(); YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1); @@ -220,17 +226,17 @@ public void testNonMinZeroResourcesSettings() throws IOException { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.init(conf); - fs.reinitialize(conf, null); - Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); - Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); - Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); - Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); + scheduler.init(conf); + scheduler.reinitialize(conf, null); + Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(1, scheduler.getMinimumResourceCapability().getVirtualCores()); + Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores()); } @Test public void testMinZeroResourcesSettings() throws IOException { - FairScheduler fs = new FairScheduler(); + scheduler = new FairScheduler(); YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0); @@ -238,12 +244,12 @@ public void testMinZeroResourcesSettings() throws IOException { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.init(conf); - fs.reinitialize(conf, null); - Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); - Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); - Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); - Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); + scheduler.init(conf); + scheduler.reinitialize(conf, null); + Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores()); + Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores()); } @Test @@ -3293,49 +3299,49 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { @Test (timeout = 10000) public void testContinuousScheduling() throws Exception { // set continuous scheduling enabled - FairScheduler fs = new FairScheduler(); + scheduler = new FairScheduler(); Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); - fs.setRMContext(resourceManager.getRMContext()); - fs.init(conf); - fs.start(); - fs.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); Assert.assertTrue("Continuous scheduling should be enabled.", - fs.isContinuousSchedulingEnabled()); + scheduler.isContinuousSchedulingEnabled()); // Add two nodes RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - fs.handle(nodeEvent1); + scheduler.handle(nodeEvent1); RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - fs.handle(nodeEvent2); + scheduler.handle(nodeEvent2); // available resource - Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024); - Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16); + Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024); + Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16); // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); - fs.addApplicationAttempt(appAttemptId, false, false); + scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); + scheduler.addApplicationAttempt(appAttemptId, false, false); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); ask.add(request); - fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); // waiting for continuous_scheduler_sleep_time // at least one pass - Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); + Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500); - FSAppAttempt app = fs.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -3348,7 +3354,7 @@ public void testContinuousScheduling() throws Exception { createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); ask.clear(); ask.add(request); - fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); // Wait until app gets resources while (app.getCurrentConsumption() From df52fec21dfc18c354f8b0c1ef187d7e272ad334 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 23 Sep 2014 10:54:06 +0100 Subject: [PATCH 04/10] HADOOP-11111 MiniKDC to use locale EN_US for case conversions --- .../org/apache/hadoop/minikdc/MiniKdc.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java b/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java index d3ea2e70cf..7107b75aae 100644 --- a/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java +++ b/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java @@ -70,6 +70,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -109,6 +110,11 @@ */ public class MiniKdc { + public static final String JAVA_SECURITY_KRB5_CONF = + "java.security.krb5.conf"; + public static final String SUN_SECURITY_KRB5_DEBUG = + "sun.security.krb5.debug"; + public static void main(String[] args) throws Exception { if (args.length < 4) { System.out.println("Arguments: " + @@ -266,7 +272,8 @@ public MiniKdc(Properties conf, File workDir) throws Exception { } String orgName= conf.getProperty(ORG_NAME); String orgDomain = conf.getProperty(ORG_DOMAIN); - realm = orgName.toUpperCase() + "." + orgDomain.toUpperCase(); + realm = orgName.toUpperCase(Locale.ENGLISH) + "." + + orgDomain.toUpperCase(Locale.ENGLISH); } /** @@ -355,8 +362,8 @@ private void initDirectoryService() throws Exception { ds.addLast(new KeyDerivationInterceptor()); // create one partition - String orgName= conf.getProperty(ORG_NAME).toLowerCase(); - String orgDomain = conf.getProperty(ORG_DOMAIN).toLowerCase(); + String orgName= conf.getProperty(ORG_NAME).toLowerCase(Locale.ENGLISH); + String orgDomain = conf.getProperty(ORG_DOMAIN).toLowerCase(Locale.ENGLISH); JdbmPartition partition = new JdbmPartition(ds.getSchemaManager()); partition.setId(orgName); @@ -387,10 +394,10 @@ private void initKDCServer() throws Exception { String orgDomain = conf.getProperty(ORG_DOMAIN); String bindAddress = conf.getProperty(KDC_BIND_ADDRESS); final Map map = new HashMap(); - map.put("0", orgName.toLowerCase()); - map.put("1", orgDomain.toLowerCase()); - map.put("2", orgName.toUpperCase()); - map.put("3", orgDomain.toUpperCase()); + map.put("0", orgName.toLowerCase(Locale.ENGLISH)); + map.put("1", orgDomain.toLowerCase(Locale.ENGLISH)); + map.put("2", orgName.toUpperCase(Locale.ENGLISH)); + map.put("3", orgDomain.toUpperCase(Locale.ENGLISH)); map.put("4", bindAddress); ClassLoader cl = Thread.currentThread().getContextClassLoader(); @@ -455,9 +462,9 @@ private void initKDCServer() throws Exception { FileUtils.writeStringToFile(krb5conf, MessageFormat.format(sb.toString(), getRealm(), getHost(), Integer.toString(getPort()), System.getProperty("line.separator"))); - System.setProperty("java.security.krb5.conf", krb5conf.getAbsolutePath()); + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5conf.getAbsolutePath()); - System.setProperty("sun.security.krb5.debug", conf.getProperty(DEBUG, + System.setProperty(SUN_SECURITY_KRB5_DEBUG, conf.getProperty(DEBUG, "false")); // refresh the config @@ -481,8 +488,8 @@ private void initKDCServer() throws Exception { */ public synchronized void stop() { if (kdc != null) { - System.getProperties().remove("java.security.krb5.conf"); - System.getProperties().remove("sun.security.krb5.debug"); + System.getProperties().remove(JAVA_SECURITY_KRB5_CONF); + System.getProperties().remove(SUN_SECURITY_KRB5_DEBUG); kdc.stop(); try { ds.shutdown(); @@ -520,8 +527,8 @@ public synchronized void createPrincipal(String principal, String password) throws Exception { String orgName= conf.getProperty(ORG_NAME); String orgDomain = conf.getProperty(ORG_DOMAIN); - String baseDn = "ou=users,dc=" + orgName.toLowerCase() + ",dc=" + - orgDomain.toLowerCase(); + String baseDn = "ou=users,dc=" + orgName.toLowerCase(Locale.ENGLISH) + + ",dc=" + orgDomain.toLowerCase(Locale.ENGLISH); String content = "dn: uid=" + principal + "," + baseDn + "\n" + "objectClass: top\n" + "objectClass: person\n" + From 7aa667eefa255002cf7853ba51affbbd4a490c02 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 23 Sep 2014 10:56:55 +0100 Subject: [PATCH 05/10] HADOOP-11111 MiniKDC to use locale EN_US for case conversions: hadoop-common/CHANGES.TXT --- hadoop-common-project/hadoop-common/CHANGES.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 0b3757740c..e99a19d9e3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -596,6 +596,8 @@ Release 2.6.0 - UNRELEASED HADOOP-11112. TestKMSWithZK does not use KEY_PROVIDER_URI. (tucu via wang) + HADOOP-11111 MiniKDC to use locale EN_US for case conversions. (stevel) + BUG FIXES HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry From a1fd804a314481065c849cb8e1d7c75494e24660 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Tue, 23 Sep 2014 08:21:46 -0700 Subject: [PATCH 06/10] HDFS-7126. TestEncryptionZonesWithHA assumes Unix path separator for KMS key store path. Contributed by Xiaoyu Yao. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9389d370aa..7cb19a0000 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -926,6 +926,9 @@ Release 2.6.0 - UNRELEASED HDFS-7115. TestEncryptionZones assumes Unix path separator for KMS key store path. (Xiaoyu Yao via cnauroth) + HDFS-7115. TestEncryptionZonesWithHA assumes Unix path separator for KMS key + store path. (Xiaoyu Yao via cnauroth) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java index c74f99063e..04977d4597 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java @@ -60,7 +60,8 @@ public void setupCluster() throws Exception { String testRoot = fsHelper.getTestRootDir(); testRootDir = new File(testRoot).getAbsoluteFile(); conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, - JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks" + JavaKeyStoreProvider.SCHEME_NAME + "://file" + + new Path(testRootDir.toString(), "test.jks").toUri() ); cluster = new MiniDFSCluster.Builder(conf) From 5338ac416ab8ab3e7e0a7bfb4a53151fc457f673 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Tue, 23 Sep 2014 10:36:57 -0700 Subject: [PATCH 07/10] YARN-2569. Added the log handling APIs for the long running services. Contributed by Xuan Gong. --- hadoop-yarn-project/CHANGES.txt | 3 + .../records/ApplicationSubmissionContext.java | 36 +++++ .../api/records/LogAggregationContext.java | 121 ++++++++++++++++ .../src/main/proto/yarn_protos.proto | 7 + .../ApplicationSubmissionContextPBImpl.java | 40 ++++++ .../impl/pb/LogAggregationContextPBImpl.java | 134 ++++++++++++++++++ .../hadoop/yarn/api/TestPBImplRecords.java | 1 + 7 files changed, 342 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a2d0536b09..0e4909e8af 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED YARN-1250. Generic history service should support application-acls. (Zhijie Shen via junping_du) + YARN-2569. Added the log handling APIs for the long running services. (Xuan + Gong via zjshen) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 723a2e0908..22023807ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -54,6 +54,7 @@ * validityInterval into failure count. If failure count reaches to * maxAppAttempts, the application will be failed. * + *
  • Optional, application-specific {@link LogAggregationContext}
  • * *

    * @@ -128,6 +129,21 @@ public static ApplicationSubmissionContext newInstance( return context; } + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, LogAggregationContext logAggregationContext) { + ApplicationSubmissionContext context = + newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers); + context.setLogAggregationContext(logAggregationContext); + return context; + } /** * Get the ApplicationId of the submitted application. * @return ApplicationId of the submitted application @@ -381,4 +397,24 @@ public abstract void setKeepContainersAcrossApplicationAttempts( @Stable public abstract void setAttemptFailuresValidityInterval( long attemptFailuresValidityInterval); + + /** + * Get LogAggregationContext of the application + * + * @return LogAggregationContext of the application + */ + @Public + @Stable + public abstract LogAggregationContext getLogAggregationContext(); + + /** + * Set LogAggregationContext for the application + * + * @param logAggregationContext + * for the application + */ + @Public + @Stable + public abstract void setLogAggregationContext( + LogAggregationContext logAggregationContext); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java new file mode 100644 index 0000000000..9a0a15774f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + *

    LogAggregationContext represents all of the + * information needed by the NodeManager to handle + * the logs for an application.

    + * + *

    It includes details such as: + *

      + *
    • includePattern. It uses Java Regex to filter the log files + * which match the defined include pattern and those log files + * will be uploaded.
    • + *
    • excludePattern. It uses Java Regex to filter the log files + * which match the defined exclude pattern and those log files + * will not be uploaded. If the log file name matches both the + * include and the exclude pattern, this file will be excluded eventually
    • + *
    • rollingIntervalSeconds. The default value is -1. By default, + * the logAggregationService only uploads container logs when + * the application is finished. This configure defines + * how often the logAggregationSerivce uploads container logs in seconds. + * By setting this configure, the logAggregationSerivce can upload container + * logs periodically when the application is running. + *
    • + *
    + *

    + * + * @see ApplicationSubmissionContext + */ + +@Evolving +@Public +public abstract class LogAggregationContext { + + @Public + @Unstable + public static LogAggregationContext newInstance(String includePattern, + String excludePattern, long rollingIntervalSeconds) { + LogAggregationContext context = Records.newRecord(LogAggregationContext.class); + context.setIncludePattern(includePattern); + context.setExcludePattern(excludePattern); + context.setRollingIntervalSeconds(rollingIntervalSeconds); + return context; + } + + /** + * Get include pattern + * + * @return include pattern + */ + @Public + @Unstable + public abstract String getIncludePattern(); + + /** + * Set include pattern + * + * @param includePattern + */ + @Public + @Unstable + public abstract void setIncludePattern(String includePattern); + + /** + * Get exclude pattern + * + * @return exclude pattern + */ + @Public + @Unstable + public abstract String getExcludePattern(); + + /** + * Set exclude pattern + * + * @param excludePattern + */ + @Public + @Unstable + public abstract void setExcludePattern(String excludePattern); + + /** + * Get rollingIntervalSeconds + * + * @return the rollingIntervalSeconds + */ + @Public + @Unstable + public abstract long getRollingIntervalSeconds(); + + /** + * Set rollingIntervalSeconds + * + * @param rollingIntervalSeconds + */ + @Public + @Unstable + public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d8c42cc303..b3687466cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto { optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; optional int64 attempt_failures_validity_interval = 13 [default = -1]; + optional LogAggregationContextProto log_aggregation_context = 14; +} + +message LogAggregationContextProto { + optional string include_pattern = 1 [default = ".*"]; + optional string exclude_pattern = 2 [default = ""]; + optional int64 rolling_interval_seconds = 3 [default = -1]; } enum ApplicationAccessTypeProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 7b49a1654f..e4f183b902 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import com.google.common.base.CharMatcher; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -53,6 +56,7 @@ public class ApplicationSubmissionContextPBImpl private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set applicationTags = null; + private LogAggregationContext logAggregationContext = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -110,6 +114,10 @@ private void mergeLocalToBuilder() { builder.clearApplicationTags(); builder.addAllApplicationTags(this.applicationTags); } + if (this.logAggregationContext != null) { + builder.setLogAggregationContext( + convertToProtoFormat(this.logAggregationContext)); + } } private void mergeLocalToProto() { @@ -415,4 +423,36 @@ public void setAttemptFailuresValidityInterval( maybeInitBuilder(); builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); } + + private LogAggregationContextPBImpl convertFromProtoFormat( + LogAggregationContextProto p) { + return new LogAggregationContextPBImpl(p); + } + + private LogAggregationContextProto convertToProtoFormat( + LogAggregationContext t) { + return ((LogAggregationContextPBImpl) t).getProto(); + } + + @Override + public LogAggregationContext getLogAggregationContext() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.logAggregationContext != null) { + return this.logAggregationContext; + } // Else via proto + if (!p.hasLogAggregationContext()) { + return null; + } + logAggregationContext = convertFromProtoFormat(p.getLogAggregationContext()); + return logAggregationContext; + } + + @Override + public void setLogAggregationContext( + LogAggregationContext logAggregationContext) { + maybeInitBuilder(); + if (logAggregationContext == null) + builder.clearLogAggregationContext(); + this.logAggregationContext = logAggregationContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java new file mode 100644 index 0000000000..4406ef9fce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder; +import com.google.protobuf.TextFormat; + +public class LogAggregationContextPBImpl extends LogAggregationContext{ + + LogAggregationContextProto proto = LogAggregationContextProto.getDefaultInstance(); + LogAggregationContextProto.Builder builder = null; + boolean viaProto = false; + + public LogAggregationContextPBImpl() { + builder = LogAggregationContextProto.newBuilder(); + } + + public LogAggregationContextPBImpl(LogAggregationContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public LogAggregationContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LogAggregationContextProto.newBuilder(proto); + } + viaProto = false; + } + + + @Override + public String getIncludePattern() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasIncludePattern()) { + return null; + } + return p.getIncludePattern(); + } + + @Override + public void setIncludePattern(String includePattern) { + maybeInitBuilder(); + if (includePattern == null) { + builder.clearIncludePattern(); + return; + } + builder.setIncludePattern(includePattern); + } + + @Override + public String getExcludePattern() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasExcludePattern()) { + return null; + } + return p.getExcludePattern(); + } + + @Override + public void setExcludePattern(String excludePattern) { + maybeInitBuilder(); + if (excludePattern == null) { + builder.clearExcludePattern(); + return; + } + builder.setExcludePattern(excludePattern); + } + + @Override + public long getRollingIntervalSeconds() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasRollingIntervalSeconds()) { + return -1; + } + return p.getRollingIntervalSeconds(); + } + + @Override + public void setRollingIntervalSeconds(long rollingIntervalSeconds) { + maybeInitBuilder(); + builder.setRollingIntervalSeconds(rollingIntervalSeconds); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c6572e9f38..c463452a0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -178,6 +178,7 @@ public static void setup() throws Exception { "http", "localhost", 8080, "file0")); typeValueCache.put(SerializedException.class, SerializedException.newInstance(new IOException("exception for test"))); + generateByNewInstance(LogAggregationContext.class); generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationAttemptId.class); generateByNewInstance(ContainerId.class); From 3dc28e2052dd3a8e4cd5888fc4f9e7e37f8bc062 Mon Sep 17 00:00:00 2001 From: Allen Wittenauer Date: Tue, 23 Sep 2014 12:24:23 -0700 Subject: [PATCH 08/10] HADOOP-11092. hadoop shell commands should print usage if not given a class (aw) --- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../hadoop-common/src/main/bin/hadoop | 6 +++--- .../hadoop-common/src/main/bin/hadoop-functions.sh | 11 +++++++++++ hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs | 6 +++--- hadoop-mapreduce-project/bin/mapred | 7 +++++-- hadoop-yarn-project/hadoop-yarn/bin/yarn | 6 +++--- 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e99a19d9e3..5f4ae1ac10 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -126,6 +126,9 @@ Trunk (Unreleased) HADOOP-11041. VersionInfo specifies subversion (Tsuyoshi OZAWA via aw) + HADOOP-11092. hadoop shell commands should print usage if not given a + a class (aw) + BUG FIXES HADOOP-9451. Fault single-layer config if node group topology is enabled. diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop index 64c67587dc..ad6e4ee3d4 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop @@ -162,11 +162,11 @@ case ${COMMAND} in version) CLASS=org.apache.hadoop.util.VersionInfo ;; - -*|hdfs) - hadoop_exit_with_usage 1 - ;; *) CLASS="${COMMAND}" + if ! hadoop_validate_classname "${CLASS}"; then + hadoop_exit_with_usage 1 + fi ;; esac diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh index dfdb101663..efa42f6522 100644 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh @@ -279,6 +279,17 @@ function hadoop_connect_to_hosts fi } +function hadoop_validate_classname +{ + local class=$1 + shift 1 + + if [[ ! ${class} =~ \. ]]; then + return 1 + fi + return 0 +} + function hadoop_add_param { # diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index 22a0f0f8c0..087c67472a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -222,11 +222,11 @@ case ${COMMAND} in hadoop_debug "Appending HADOOP_ZKFC_OPTS onto HADOOP_OPTS" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_ZKFC_OPTS}" ;; - -*) - hadoop_exit_with_usage 1 - ;; *) CLASS="${COMMAND}" + if ! hadoop_validate_classname "${CLASS}"; then + hadoop_exit_with_usage 1 + fi ;; esac diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred index 8f3063774f..2163cada29 100755 --- a/hadoop-mapreduce-project/bin/mapred +++ b/hadoop-mapreduce-project/bin/mapred @@ -118,8 +118,11 @@ case ${COMMAND} in hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" ;; - -*|*) - hadoop_exit_with_usage 1 + *) + CLASS="${COMMAND}" + if ! hadoop_validate_classname "${CLASS}"; then + hadoop_exit_with_usage 1 + fi ;; esac diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn index 12f7bb501b..207fb4a41f 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -154,11 +154,11 @@ case "${COMMAND}" in hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS" YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}" ;; - -*) - hadoop_exit_with_usage 1 - ;; *) CLASS="${COMMAND}" + if ! hadoop_validate_classname "${CLASS}"; then + hadoop_exit_with_usage 1 + fi ;; esac From f48686a1ad81823000534665a76264bba51182f4 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 23 Sep 2014 14:10:02 -0700 Subject: [PATCH 09/10] HDFS-7132. hdfs namenode -metadataVersion command does not honor configured name dirs. Contributed by Charles Lamb. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hadoop/hdfs/server/namenode/NameNode.java | 3 ++ .../namenode/TestMetadataVersionOutput.java | 32 +++++++++++++------ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7cb19a0000..4b7e857785 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -795,6 +795,9 @@ Release 2.6.0 - UNRELEASED HDFS-7001. Tests in TestTracing depends on the order of execution (iwasakims via cmccabe) + HDFS-7132. hdfs namenode -metadataVersion command does not honor + configured name dirs. (Charles Lamb via wang) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index bcb5a8697d..217645a689 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -1347,6 +1347,9 @@ private static void doRecovery(StartupOption startOpt, Configuration conf) */ private static boolean printMetadataVersion(Configuration conf) throws IOException { + final String nsId = DFSUtil.getNamenodeNameServiceId(conf); + final String namenodeId = HAUtil.getNameNodeId(conf, nsId); + NameNode.initializeGenericKeys(conf, nsId, namenodeId); final FSImage fsImage = new FSImage(conf); final FSNamesystem fs = new FSNamesystem(conf, fsImage, false); return fsImage.recoverTransitionRead( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java index 0e809cf9c6..03c75577ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java @@ -25,27 +25,22 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.junit.After; -import org.junit.Before; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; + public class TestMetadataVersionOutput { private MiniDFSCluster dfsCluster = null; private final Configuration conf = new Configuration(); - @Before - public void setUp() throws Exception { - dfsCluster = new MiniDFSCluster.Builder(conf). - numDataNodes(1). - checkExitOnShutdown(false). - build(); - dfsCluster.waitClusterUp(); - } - @After public void tearDown() throws Exception { if (dfsCluster != null) { @@ -54,9 +49,26 @@ public void tearDown() throws Exception { Thread.sleep(2000); } + private void initConfig() { + conf.set(DFS_NAMESERVICE_ID, "ns1"); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1"); + conf.set(DFS_HA_NAMENODE_ID_KEY, "nn1"); + conf.set(DFS_NAMENODE_NAME_DIR_KEY + ".ns1.nn1", MiniDFSCluster.getBaseDirectory() + "1"); + conf.unset(DFS_NAMENODE_NAME_DIR_KEY); + } + @Test(timeout = 30000) public void testMetadataVersionOutput() throws IOException { + initConfig(); + dfsCluster = new MiniDFSCluster.Builder(conf). + manageNameDfsDirs(false). + numDataNodes(1). + checkExitOnShutdown(false). + build(); + dfsCluster.waitClusterUp(); + dfsCluster.shutdown(false); + initConfig(); final PrintStream origOut = System.out; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final PrintStream stdOut = new PrintStream(baos); From b93d9603a25b6b93f67c69503130164eef047876 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Tue, 23 Sep 2014 14:35:33 -0700 Subject: [PATCH 10/10] HDFS-7130. TestDataTransferKeepalive fails intermittently on Windows. Contributed by Chris Nauroth. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../org/apache/hadoop/hdfs/TestDataTransferKeepalive.java | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4b7e857785..af6c135be8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -932,6 +932,9 @@ Release 2.6.0 - UNRELEASED HDFS-7115. TestEncryptionZonesWithHA assumes Unix path separator for KMS key store path. (Xiaoyu Yao via cnauroth) + HDFS-7130. TestDataTransferKeepalive fails intermittently on Windows. + (cnauroth) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index 003e6be273..eae8ea7681 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -105,7 +105,7 @@ public void testDatanodeRespectsKeepAliveTimeout() throws Exception { // Sleep for a bit longer than the keepalive timeout // and make sure the xceiver died. - Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 1); + Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 50); assertXceiverCount(0); // The socket is still in the cache, because we don't @@ -149,7 +149,7 @@ public void testClientResponsesKeepAliveTimeout() throws Exception { assertXceiverCount(1); // Sleep for a bit longer than the client keepalive timeout. - Thread.sleep(CLIENT_EXPIRY_MS + 1); + Thread.sleep(CLIENT_EXPIRY_MS + 50); // Taking out a peer which is expired should give a null. Peer peer = peerCache.get(dn.getDatanodeId(), false);