YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in ApplicationClientProtcol, ResourceManagerAdministrationProtocol and ResourceTrackerProtocol so that they work in HA scenario. Contributed by Xuan Gong
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581678 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a9ae9e3ec
commit
71c2b159ab
@ -580,6 +580,11 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1867. Fixed a bug in ResourceManager that was causing invalid ACL checks
|
||||
in the web-services after fail-over. (Vinod Kumar Vavilapalli)
|
||||
|
||||
YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in
|
||||
ApplicationClientProtcol, ResourceManagerAdministrationProtocol and
|
||||
ResourceTrackerProtocol so that they work in HA scenario. (Xuan Gong
|
||||
via jianhe)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -104,6 +104,7 @@ public interface ApplicationClientProtocol {
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -133,6 +134,10 @@ public GetNewApplicationResponse getNewApplication(
|
||||
* it encounters the {@link ApplicationNotFoundException} on the
|
||||
* {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
|
||||
*
|
||||
* <p>During the submission process, it checks whether the application
|
||||
* already exists. If the application exists, it will simply return
|
||||
* SubmitApplicationResponse</p>
|
||||
*
|
||||
* <p> In secure mode,the <code>ResourceManager</code> verifies access to
|
||||
* queues etc. before accepting the application submission.</p>
|
||||
*
|
||||
@ -147,6 +152,7 @@ public GetNewApplicationResponse getNewApplication(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -173,6 +179,7 @@ public SubmitApplicationResponse submitApplication(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -231,6 +238,7 @@ public GetApplicationReportResponse getApplicationReport(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetClusterMetricsResponse getClusterMetrics(
|
||||
GetClusterMetricsRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -258,6 +266,7 @@ public GetClusterMetricsResponse getClusterMetrics(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetApplicationsResponse getApplications(
|
||||
GetApplicationsRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -277,6 +286,7 @@ public GetApplicationsResponse getApplications(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetClusterNodesResponse getClusterNodes(
|
||||
GetClusterNodesRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -298,6 +308,7 @@ public GetClusterNodesResponse getClusterNodes(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetQueueInfoResponse getQueueInfo(
|
||||
GetQueueInfoRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -317,6 +328,7 @@ public GetQueueInfoResponse getQueueInfo(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
GetQueueUserAclsInfoRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -335,6 +347,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetDelegationTokenResponse getDelegationToken(
|
||||
GetDelegationTokenRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -349,6 +362,7 @@ public GetDelegationTokenResponse getDelegationToken(
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public RenewDelegationTokenResponse renewDelegationToken(
|
||||
RenewDelegationTokenRequest request) throws YarnException,
|
||||
IOException;
|
||||
@ -363,6 +377,7 @@ public RenewDelegationTokenResponse renewDelegationToken(
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||
CancelDelegationTokenRequest request) throws YarnException,
|
||||
IOException;
|
||||
@ -377,6 +392,7 @@ public CancelDelegationTokenResponse cancelDelegationToken(
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
|
||||
|
||||
@ -422,6 +438,7 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException;
|
||||
@ -453,6 +470,7 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException, IOException;
|
||||
|
||||
@ -486,6 +504,7 @@ public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException;
|
||||
|
||||
@ -520,6 +539,7 @@ public GetContainerReportResponse getContainerReport(
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
@ -50,16 +51,19 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws StandbyException, YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws StandbyException, YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public RefreshSuperUserGroupsConfigurationResponse
|
||||
refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
@ -67,18 +71,21 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
RefreshAdminAclsRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request)
|
||||
throws YarnException, IOException;
|
||||
@ -99,6 +106,7 @@ public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
@Idempotent
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
@ -0,0 +1,721 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
|
||||
public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
|
||||
protected static final HAServiceProtocol.StateChangeRequestInfo req =
|
||||
new HAServiceProtocol.StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
|
||||
protected static final String RM1_NODE_ID = "rm1";
|
||||
protected static final int RM1_PORT_BASE = 10000;
|
||||
protected static final String RM2_NODE_ID = "rm2";
|
||||
protected static final int RM2_PORT_BASE = 20000;
|
||||
|
||||
protected Configuration conf;
|
||||
protected MiniYARNClusterForHATesting cluster;
|
||||
|
||||
protected Thread failoverThread = null;
|
||||
private volatile boolean keepRunning;
|
||||
|
||||
private void setConfForRM(String rmId, String prefix, String value) {
|
||||
conf.set(HAUtil.addSuffix(prefix, rmId), value);
|
||||
}
|
||||
|
||||
private void setRpcAddressForRM(String rmId, int base) {
|
||||
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
|
||||
(base + YarnConfiguration.DEFAULT_RM_PORT));
|
||||
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
|
||||
(base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
|
||||
setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
|
||||
(base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
|
||||
setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
"0.0.0.0:" + (base + YarnConfiguration
|
||||
.DEFAULT_RM_RESOURCE_TRACKER_PORT));
|
||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
|
||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
|
||||
setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
|
||||
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
failoverThread = null;
|
||||
keepRunning = true;
|
||||
conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||
setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
|
||||
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
|
||||
|
||||
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
||||
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
keepRunning = false;
|
||||
if (failoverThread != null) {
|
||||
failoverThread.interrupt();
|
||||
try {
|
||||
failoverThread.join();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Error joining with failover thread", ex);
|
||||
}
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
|
||||
protected AdminService getAdminService(int index) {
|
||||
return cluster.getResourceManager(index).getRMContext()
|
||||
.getRMAdminService();
|
||||
}
|
||||
|
||||
protected void explicitFailover() throws IOException {
|
||||
int activeRMIndex = cluster.getActiveRMIndex();
|
||||
int newActiveRMIndex = (activeRMIndex + 1) % 2;
|
||||
getAdminService(activeRMIndex).transitionToStandby(req);
|
||||
getAdminService(newActiveRMIndex).transitionToActive(req);
|
||||
assertEquals("Failover failed", newActiveRMIndex,
|
||||
cluster.getActiveRMIndex());
|
||||
}
|
||||
|
||||
protected YarnClient createAndStartYarnClient(Configuration conf) {
|
||||
Configuration configuration = new YarnConfiguration(conf);
|
||||
YarnClient client = YarnClient.createYarnClient();
|
||||
client.init(configuration);
|
||||
client.start();
|
||||
return client;
|
||||
}
|
||||
|
||||
protected void verifyConnections() throws InterruptedException,
|
||||
YarnException {
|
||||
assertTrue("NMs failed to connect to the RM",
|
||||
cluster.waitForNodeManagersToConnect(20000));
|
||||
verifyClientConnection();
|
||||
}
|
||||
|
||||
protected void verifyClientConnection() {
|
||||
int numRetries = 3;
|
||||
while(numRetries-- > 0) {
|
||||
Configuration conf = new YarnConfiguration(this.conf);
|
||||
YarnClient client = createAndStartYarnClient(conf);
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
client.getApplications();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
fail("Client couldn't connect to the Active RM");
|
||||
}
|
||||
|
||||
protected Thread createAndStartFailoverThread() {
|
||||
Thread failoverThread = new Thread() {
|
||||
public void run() {
|
||||
keepRunning = true;
|
||||
while (keepRunning) {
|
||||
if (cluster.getStartFailoverFlag()) {
|
||||
try {
|
||||
explicitFailover();
|
||||
keepRunning = false;
|
||||
cluster.resetFailoverTriggeredFlag(true);
|
||||
} catch (Exception e) {
|
||||
// Do Nothing
|
||||
} finally {
|
||||
keepRunning = false;
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
failoverThread.start();
|
||||
return failoverThread;
|
||||
}
|
||||
|
||||
protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
|
||||
boolean overrideRTS) throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
cluster =
|
||||
new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
|
||||
numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS);
|
||||
cluster.resetStartFailoverFlag(false);
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
getAdminService(0).transitionToActive(req);
|
||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
||||
verifyConnections();
|
||||
|
||||
// Do the failover
|
||||
explicitFailover();
|
||||
verifyConnections();
|
||||
|
||||
failoverThread = createAndStartFailoverThread();
|
||||
|
||||
}
|
||||
|
||||
protected ResourceManager getActiveRM() {
|
||||
return cluster.getResourceManager(cluster.getActiveRMIndex());
|
||||
}
|
||||
|
||||
public class MiniYARNClusterForHATesting extends MiniYARNCluster {
|
||||
|
||||
private boolean overrideClientRMService;
|
||||
private boolean overrideRTS;
|
||||
private final AtomicBoolean startFailover = new AtomicBoolean(false);
|
||||
private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
|
||||
|
||||
public MiniYARNClusterForHATesting(String testName,
|
||||
int numResourceManagers, int numNodeManagers, int numLocalDirs,
|
||||
int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
|
||||
boolean overrideRTS) {
|
||||
super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
|
||||
numLogDirs, enableAHS);
|
||||
this.overrideClientRMService = overrideClientRMService;
|
||||
this.overrideRTS = overrideRTS;
|
||||
}
|
||||
|
||||
public boolean getStartFailoverFlag() {
|
||||
return startFailover.get();
|
||||
}
|
||||
|
||||
public void resetStartFailoverFlag(boolean flag) {
|
||||
startFailover.set(flag);
|
||||
}
|
||||
|
||||
public void resetFailoverTriggeredFlag(boolean flag) {
|
||||
failoverTriggered.set(flag);
|
||||
}
|
||||
|
||||
private boolean waittingForFailOver() {
|
||||
int maximumWaittingTime = 50;
|
||||
int count = 0;
|
||||
while (!failoverTriggered.get() && count >= maximumWaittingTime) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
// DO NOTHING
|
||||
}
|
||||
count++;
|
||||
}
|
||||
if (count >= maximumWaittingTime) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResourceManager createResourceManager() {
|
||||
return new ResourceManager() {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Don't try to login using keytab in the testcases.
|
||||
}
|
||||
@Override
|
||||
protected ClientRMService createClientRMService() {
|
||||
if (overrideClientRMService) {
|
||||
return new CustomedClientRMService(this.rmContext, this.scheduler,
|
||||
this.rmAppManager, this.applicationACLsManager,
|
||||
this.queueACLsManager,
|
||||
this.rmContext.getRMDelegationTokenSecretManager());
|
||||
}
|
||||
return super.createClientRMService();
|
||||
}
|
||||
@Override
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
if (overrideRTS) {
|
||||
return new CustomedResourceTrackerService(this.rmContext,
|
||||
this.nodesListManager, this.nmLivelinessMonitor,
|
||||
this.rmContext.getContainerTokenSecretManager(),
|
||||
this.rmContext.getNMTokenSecretManager());
|
||||
}
|
||||
return super.createResourceTrackerService();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private class CustomedClientRMService extends ClientRMService {
|
||||
public CustomedClientRMService(RMContext rmContext,
|
||||
YarnScheduler scheduler, RMAppManager rmAppManager,
|
||||
ApplicationACLsManager applicationACLsManager,
|
||||
QueueACLsManager queueACLsManager,
|
||||
RMDelegationTokenSecretManager rmDTSecretManager) {
|
||||
super(rmContext, scheduler, rmAppManager, applicationACLsManager,
|
||||
queueACLsManager, rmDTSecretManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// create the GetNewApplicationResponse with fake applicationId
|
||||
GetNewApplicationResponse response =
|
||||
GetNewApplicationResponse.newInstance(
|
||||
createFakeAppId(), null, null);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// create a fake application report
|
||||
ApplicationReport report = createFakeAppReport();
|
||||
GetApplicationReportResponse response =
|
||||
GetApplicationReportResponse.newInstance(report);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterMetricsResponse getClusterMetrics(
|
||||
GetClusterMetricsRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// create GetClusterMetricsResponse with fake YarnClusterMetrics
|
||||
GetClusterMetricsResponse response =
|
||||
GetClusterMetricsResponse.newInstance(
|
||||
createFakeYarnClusterMetrics());
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationsResponse getApplications(
|
||||
GetApplicationsRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// create GetApplicationsResponse with fake applicationList
|
||||
GetApplicationsResponse response =
|
||||
GetApplicationsResponse.newInstance(createFakeAppReports());
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodesResponse getClusterNodes(
|
||||
GetClusterNodesRequest request)
|
||||
throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// create GetClusterNodesResponse with fake ClusterNodeLists
|
||||
GetClusterNodesResponse response =
|
||||
GetClusterNodesResponse.newInstance(createFakeNodeReports());
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
||||
throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// return fake QueueInfo
|
||||
return GetQueueInfoResponse.newInstance(createFakeQueueInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
GetQueueUserAclsInfoRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// return fake queueUserAcls
|
||||
return GetQueueUserAclsInfoResponse
|
||||
.newInstance(createFakeQueueUserACLInfoList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// return fake ApplicationAttemptReport
|
||||
return GetApplicationAttemptReportResponse
|
||||
.newInstance(createFakeApplicationAttemptReport());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException,
|
||||
IOException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// return fake ApplicationAttemptReports
|
||||
return GetApplicationAttemptsResponse
|
||||
.newInstance(createFakeApplicationAttemptReports());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// return fake containerReport
|
||||
return GetContainerReportResponse
|
||||
.newInstance(createFakeContainerReport());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
// return fake ContainerReports
|
||||
return GetContainersResponse.newInstance(createFakeContainerReports());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
return super.submitApplication(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
return KillApplicationResponse.newInstance(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
return Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetDelegationTokenResponse getDelegationToken(
|
||||
GetDelegationTokenRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
return GetDelegationTokenResponse.newInstance(createFakeToken());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RenewDelegationTokenResponse renewDelegationToken(
|
||||
RenewDelegationTokenRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
return RenewDelegationTokenResponse
|
||||
.newInstance(createNextExpirationTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||
CancelDelegationTokenRequest request) throws YarnException {
|
||||
resetStartFailoverFlag(true);
|
||||
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
|
||||
return CancelDelegationTokenResponse.newInstance();
|
||||
}
|
||||
}
|
||||
|
||||
public ApplicationReport createFakeAppReport() {
|
||||
ApplicationId appId = ApplicationId.newInstance(1000l, 1);
|
||||
ApplicationAttemptId attemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
// create a fake application report
|
||||
ApplicationReport report =
|
||||
ApplicationReport.newInstance(appId, attemptId, "fakeUser",
|
||||
"fakeQueue", "fakeApplicationName", "localhost", 0, null,
|
||||
YarnApplicationState.FAILED, "fake an application report", "",
|
||||
1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
|
||||
"fakeApplicationType", null);
|
||||
return report;
|
||||
}
|
||||
|
||||
public List<ApplicationReport> createFakeAppReports() {
|
||||
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
|
||||
reports.add(createFakeAppReport());
|
||||
return reports;
|
||||
}
|
||||
|
||||
public ApplicationId createFakeAppId() {
|
||||
return ApplicationId.newInstance(1000l, 1);
|
||||
}
|
||||
|
||||
public ApplicationAttemptId createFakeApplicationAttemptId() {
|
||||
return ApplicationAttemptId.newInstance(createFakeAppId(), 0);
|
||||
}
|
||||
|
||||
public ContainerId createFakeContainerId() {
|
||||
return ContainerId.newInstance(createFakeApplicationAttemptId(), 0);
|
||||
}
|
||||
|
||||
public YarnClusterMetrics createFakeYarnClusterMetrics() {
|
||||
return YarnClusterMetrics.newInstance(1);
|
||||
}
|
||||
|
||||
public List<NodeReport> createFakeNodeReports() {
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 0);
|
||||
NodeReport report =
|
||||
NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
|
||||
"rack1", null, null, 4, null, 1000l);
|
||||
List<NodeReport> reports = new ArrayList<NodeReport>();
|
||||
reports.add(report);
|
||||
return reports;
|
||||
}
|
||||
|
||||
public QueueInfo createFakeQueueInfo() {
|
||||
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
|
||||
createFakeAppReports(), QueueState.RUNNING);
|
||||
}
|
||||
|
||||
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
|
||||
List<QueueACL> queueACL = new ArrayList<QueueACL>();
|
||||
queueACL.add(QueueACL.SUBMIT_APPLICATIONS);
|
||||
QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL);
|
||||
List<QueueUserACLInfo> infos = new ArrayList<QueueUserACLInfo>();
|
||||
infos.add(info);
|
||||
return infos;
|
||||
}
|
||||
|
||||
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
|
||||
return ApplicationAttemptReport.newInstance(
|
||||
createFakeApplicationAttemptId(), "localhost", 0, "", "",
|
||||
YarnApplicationAttemptState.RUNNING, createFakeContainerId());
|
||||
}
|
||||
|
||||
public List<ApplicationAttemptReport>
|
||||
createFakeApplicationAttemptReports() {
|
||||
List<ApplicationAttemptReport> reports =
|
||||
new ArrayList<ApplicationAttemptReport>();
|
||||
reports.add(createFakeApplicationAttemptReport());
|
||||
return reports;
|
||||
}
|
||||
|
||||
public ContainerReport createFakeContainerReport() {
|
||||
return ContainerReport.newInstance(createFakeContainerId(), null,
|
||||
NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
|
||||
ContainerState.COMPLETE);
|
||||
}
|
||||
|
||||
public List<ContainerReport> createFakeContainerReports() {
|
||||
List<ContainerReport> reports =
|
||||
new ArrayList<ContainerReport>();
|
||||
reports.add(createFakeContainerReport());
|
||||
return reports;
|
||||
}
|
||||
|
||||
public Token createFakeToken() {
|
||||
String identifier = "fake Token";
|
||||
String password = "fake token passwd";
|
||||
Token token = Token.newInstance(
|
||||
identifier.getBytes(), " ", password.getBytes(), " ");
|
||||
return token;
|
||||
}
|
||||
|
||||
public long createNextExpirationTime() {
|
||||
return "fake Token".getBytes().length;
|
||||
}
|
||||
|
||||
private class CustomedResourceTrackerService extends
|
||||
ResourceTrackerService {
|
||||
public CustomedResourceTrackerService(RMContext rmContext,
|
||||
NodesListManager nodesListManager,
|
||||
NMLivelinessMonitor nmLivelinessMonitor,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager) {
|
||||
super(rmContext, nodesListManager, nmLivelinessMonitor,
|
||||
containerTokenSecretManager, nmTokenSecretManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterNodeManagerResponse registerNodeManager(
|
||||
RegisterNodeManagerRequest request) throws YarnException,
|
||||
IOException {
|
||||
resetStartFailoverFlag(true);
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
return super.registerNodeManager(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
throws YarnException, IOException {
|
||||
resetStartFailoverFlag(true);
|
||||
// make sure failover has been triggered
|
||||
Assert.assertTrue(waittingForFailOver());
|
||||
return super.nodeHeartbeat(request);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,211 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
|
||||
private YarnClient client = null;
|
||||
|
||||
@Before
|
||||
public void initiate() throws Exception {
|
||||
startHACluster(1, true, false);
|
||||
Configuration conf = new YarnConfiguration(this.conf);
|
||||
client = createAndStartYarnClient(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDown() {
|
||||
if (client != null) {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetApplicationReportOnHA() throws Exception {
|
||||
ApplicationReport report =
|
||||
client.getApplicationReport(cluster.createFakeAppId());
|
||||
Assert.assertTrue(report != null);
|
||||
Assert.assertEquals(cluster.createFakeAppReport(), report);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetNewApplicationOnHA() throws Exception {
|
||||
ApplicationId appId =
|
||||
client.createApplication().getApplicationSubmissionContext()
|
||||
.getApplicationId();
|
||||
Assert.assertTrue(appId != null);
|
||||
Assert.assertEquals(cluster.createFakeAppId(), appId);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetClusterMetricsOnHA() throws Exception {
|
||||
YarnClusterMetrics clusterMetrics =
|
||||
client.getYarnClusterMetrics();
|
||||
Assert.assertTrue(clusterMetrics != null);
|
||||
Assert.assertEquals(cluster.createFakeYarnClusterMetrics(),
|
||||
clusterMetrics);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetApplicationsOnHA() throws Exception {
|
||||
List<ApplicationReport> reports =
|
||||
client.getApplications();
|
||||
Assert.assertTrue(reports != null && !reports.isEmpty());
|
||||
Assert.assertEquals(cluster.createFakeAppReports(),
|
||||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetClusterNodesOnHA() throws Exception {
|
||||
List<NodeReport> reports = client.getNodeReports(NodeState.RUNNING);
|
||||
Assert.assertTrue(reports != null && !reports.isEmpty());
|
||||
Assert.assertEquals(cluster.createFakeNodeReports(),
|
||||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetQueueInfoOnHA() throws Exception {
|
||||
QueueInfo queueInfo = client.getQueueInfo("root");
|
||||
Assert.assertTrue(queueInfo != null);
|
||||
Assert.assertEquals(cluster.createFakeQueueInfo(),
|
||||
queueInfo);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetQueueUserAclsOnHA() throws Exception {
|
||||
List<QueueUserACLInfo> queueUserAclsList = client.getQueueAclsInfo();
|
||||
Assert.assertTrue(queueUserAclsList != null
|
||||
&& !queueUserAclsList.isEmpty());
|
||||
Assert.assertEquals(cluster.createFakeQueueUserACLInfoList(),
|
||||
queueUserAclsList);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetApplicationAttemptReportOnHA() throws Exception {
|
||||
ApplicationAttemptReport report =
|
||||
client.getApplicationAttemptReport(cluster
|
||||
.createFakeApplicationAttemptId());
|
||||
Assert.assertTrue(report != null);
|
||||
Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetApplicationAttemptsOnHA() throws Exception {
|
||||
List<ApplicationAttemptReport> reports =
|
||||
client.getApplicationAttempts(cluster.createFakeAppId());
|
||||
Assert.assertTrue(reports != null && !reports.isEmpty());
|
||||
Assert.assertEquals(cluster.createFakeApplicationAttemptReports(),
|
||||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetContainerReportOnHA() throws Exception {
|
||||
ContainerReport report =
|
||||
client.getContainerReport(cluster.createFakeContainerId());
|
||||
Assert.assertTrue(report != null);
|
||||
Assert.assertEquals(cluster.createFakeContainerReport(), report);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetContainersOnHA() throws Exception {
|
||||
List<ContainerReport> reports =
|
||||
client.getContainers(cluster.createFakeApplicationAttemptId());
|
||||
Assert.assertTrue(reports != null && !reports.isEmpty());
|
||||
Assert.assertEquals(cluster.createFakeContainerReports(),
|
||||
reports);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testSubmitApplicationOnHA() throws Exception {
|
||||
ApplicationSubmissionContext appContext =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
appContext.setApplicationId(cluster.createFakeAppId());
|
||||
ContainerLaunchContext amContainer =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
appContext.setAMContainerSpec(amContainer);
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(10);
|
||||
capability.setVirtualCores(1);
|
||||
appContext.setResource(capability);
|
||||
ApplicationId appId = client.submitApplication(appContext);
|
||||
Assert.assertTrue(getActiveRM().getRMContext().getRMApps()
|
||||
.containsKey(appId));
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testMoveApplicationAcrossQueuesOnHA() throws Exception{
|
||||
client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root");
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testForceKillApplicationOnHA() throws Exception {
|
||||
client.killApplication(cluster.createFakeAppId());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testGetDelegationTokenOnHA() throws Exception {
|
||||
Token token = client.getRMDelegationToken(new Text(" "));
|
||||
Assert.assertEquals(token, cluster.createFakeToken());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testRenewDelegationTokenOnHA() throws Exception {
|
||||
RenewDelegationTokenRequest request =
|
||||
RenewDelegationTokenRequest.newInstance(cluster.createFakeToken());
|
||||
long newExpirationTime =
|
||||
ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
|
||||
.renewDelegationToken(request).getNextExpirationTime();
|
||||
Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime());
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testCancelDelegationTokenOnHA() throws Exception {
|
||||
CancelDelegationTokenRequest request =
|
||||
CancelDelegationTokenRequest.newInstance(cluster.createFakeToken());
|
||||
ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
|
||||
.cancelDelegationToken(request);
|
||||
}
|
||||
}
|
@ -0,0 +1,91 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
||||
|
||||
private ResourceTracker resourceTracker = null;
|
||||
|
||||
@Before
|
||||
public void initiate() throws Exception {
|
||||
startHACluster(0, false, true);
|
||||
this.resourceTracker = getRMClient();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDown() {
|
||||
if(this.resourceTracker != null) {
|
||||
RPC.stopProxy(this.resourceTracker);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testResourceTrackerOnHA() throws Exception {
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 0);
|
||||
Resource resource = Resource.newInstance(2048, 4);
|
||||
|
||||
// make sure registerNodeManager works when failover happens
|
||||
RegisterNodeManagerRequest request =
|
||||
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
|
||||
YarnVersionInfo.getVersion(), null);
|
||||
resourceTracker.registerNodeManager(request);
|
||||
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
|
||||
|
||||
// restart the failover thread, and make sure nodeHeartbeat works
|
||||
failoverThread = createAndStartFailoverThread();
|
||||
NodeStatus status =
|
||||
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
|
||||
null, null);
|
||||
NodeHeartbeatRequest request2 =
|
||||
NodeHeartbeatRequest.newInstance(status, null, null);
|
||||
resourceTracker.nodeHeartbeat(request2);
|
||||
}
|
||||
|
||||
private ResourceTracker getRMClient() throws IOException {
|
||||
return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class);
|
||||
}
|
||||
|
||||
private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId)
|
||||
throws Exception {
|
||||
for (int i = 0; i < timeout / 100; i++) {
|
||||
if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) {
|
||||
return true;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -19,6 +19,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
@ -27,10 +29,12 @@
|
||||
|
||||
public interface ResourceTracker {
|
||||
|
||||
@Idempotent
|
||||
public RegisterNodeManagerResponse registerNodeManager(
|
||||
RegisterNodeManagerRequest request) throws YarnException,
|
||||
IOException;
|
||||
|
||||
@AtMostOnce
|
||||
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
|
@ -505,16 +505,11 @@ public SubmitApplicationResponse submitApplication(
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
// Though duplication will checked again when app is put into rmContext,
|
||||
// but it is good to fail the invalid submission as early as possible.
|
||||
// Check whether app has already been put into rmContext,
|
||||
// If it is, simply return the response
|
||||
if (rmContext.getRMApps().get(applicationId) != null) {
|
||||
String message = "Application with id " + applicationId +
|
||||
" is already present! Cannot add a duplicate!";
|
||||
LOG.warn(message);
|
||||
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
|
||||
message, "ClientRMService", "Exception in submitting application",
|
||||
applicationId);
|
||||
throw RPCUtil.getRemoteException(message);
|
||||
LOG.info("This is an earlier submitted application: " + applicationId);
|
||||
return SubmitApplicationResponse.newInstance();
|
||||
}
|
||||
|
||||
if (submissionContext.getQueue() == null) {
|
||||
|
@ -718,7 +718,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
}
|
||||
|
||||
// TODO: Write out change to state store (YARN-1558)
|
||||
|
||||
// Also take care of RM failover
|
||||
moveEvent.getResult().set(null);
|
||||
}
|
||||
}
|
||||
|
@ -589,10 +589,8 @@ public void handle(Event event) {}
|
||||
// duplicate appId
|
||||
try {
|
||||
rmService.submitApplication(submitRequest2);
|
||||
Assert.fail("Exception is expected.");
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue("The thrown exception is not expected.",
|
||||
e.getMessage().contains("Cannot add a duplicate!"));
|
||||
Assert.fail("Exception is not expected.");
|
||||
}
|
||||
|
||||
GetApplicationsRequest getAllAppsRequest =
|
||||
|
@ -219,4 +219,89 @@ public void testGetApplicationReportIdempotent() throws Exception{
|
||||
Assert.assertEquals(appReport3.getYarnApplicationState(),
|
||||
appReport4.getYarnApplicationState());
|
||||
}
|
||||
|
||||
// There are two scenarios when RM failover happens
|
||||
// during SubmitApplication Call:
|
||||
// 1) RMStateStore already saved the ApplicationState when failover happens
|
||||
// 2) RMStateStore did not save the ApplicationState when failover happens
|
||||
@Test (timeout = 5000)
|
||||
public void
|
||||
testHandleRMHADuringSubmitApplicationCallWithSavedApplicationState()
|
||||
throws Exception {
|
||||
// Test scenario 1 when RM failover happens
|
||||
// druing SubmitApplication Call:
|
||||
// RMStateStore already saved the ApplicationState when failover happens
|
||||
startRMs();
|
||||
|
||||
// Submit Application
|
||||
// After submission, the applicationState will be saved in RMStateStore.
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
|
||||
// Do the failover
|
||||
explicitFailover();
|
||||
|
||||
// Since the applicationState has already been saved in RMStateStore
|
||||
// before failover happens, the current active rm can load the previous
|
||||
// applicationState.
|
||||
// This RMApp should exist in the RMContext of current active RM
|
||||
Assert.assertTrue(rm2.getRMContext().getRMApps()
|
||||
.containsKey(app0.getApplicationId()));
|
||||
|
||||
// When we re-submit the application with same applicationId, it will
|
||||
// check whether this application has been exist. If yes, just simply
|
||||
// return submitApplicationResponse.
|
||||
RMApp app1 =
|
||||
rm2.submitApp(200, "", UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), null, false, null,
|
||||
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||
false, false, true, app0.getApplicationId());
|
||||
|
||||
Assert.assertEquals(app1.getApplicationId(), app0.getApplicationId());
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
public void
|
||||
testHandleRMHADuringSubmitApplicationCallWithoutSavedApplicationState()
|
||||
throws Exception {
|
||||
// Test scenario 2 when RM failover happens
|
||||
// during SubmitApplication Call:
|
||||
// RMStateStore did not save the ApplicationState when failover happens.
|
||||
// Using customized RMAppManager.
|
||||
startRMsWithCustomizedRMAppManager();
|
||||
|
||||
// Submit Application
|
||||
// After submission, the applicationState will
|
||||
// not be saved in RMStateStore
|
||||
RMApp app0 =
|
||||
rm1.submitApp(200, "", UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), null, false, null,
|
||||
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||
false, false);
|
||||
|
||||
// Do the failover
|
||||
explicitFailover();
|
||||
|
||||
// When failover happens, the RMStateStore has not saved applicationState.
|
||||
// The applicationState of this RMApp is lost.
|
||||
// We should not find the RMApp in the RMContext of current active rm.
|
||||
Assert.assertFalse(rm2.getRMContext().getRMApps()
|
||||
.containsKey(app0.getApplicationId()));
|
||||
|
||||
// Submit the application with previous ApplicationId to current active RM
|
||||
// This will mimic the similar behavior of ApplicationClientProtocol#
|
||||
// submitApplication() when failover happens during the submission process
|
||||
// because the submitApplication api is marked as idempotent
|
||||
RMApp app1 =
|
||||
rm2.submitApp(200, "", UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), null, false, null,
|
||||
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||
false, false, true, app0.getApplicationId());
|
||||
|
||||
verifySubmitApp(rm2, app1, app0.getApplicationId());
|
||||
Assert.assertTrue(rm2.getRMContext().getRMApps()
|
||||
.containsKey(app0.getApplicationId()));
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,6 @@
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -243,12 +242,7 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
}
|
||||
|
||||
for (int i = 0; i < resourceManagers.length; i++) {
|
||||
resourceManagers[i] = new ResourceManager() {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Don't try to login using keytab in the testcases.
|
||||
}
|
||||
};
|
||||
resourceManagers[i] = createResourceManager();
|
||||
if (!useFixedPorts) {
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
setHARMConfiguration(i, conf);
|
||||
@ -676,7 +670,7 @@ public boolean waitForNodeManagersToConnect(long timeout)
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private class ApplicationHistoryServerWrapper extends AbstractService {
|
||||
public ApplicationHistoryServerWrapper() {
|
||||
super(ApplicationHistoryServerWrapper.class.getName());
|
||||
@ -736,4 +730,13 @@ protected synchronized void serviceStop() throws Exception {
|
||||
public ApplicationHistoryServer getApplicationHistoryServer() {
|
||||
return this.appHistoryServer;
|
||||
}
|
||||
|
||||
protected ResourceManager createResourceManager() {
|
||||
return new ResourceManager(){
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Don't try to login using keytab in the testcases.
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user