Merge branch 'trunk' into HADOOP-12756
This commit is contained in:
commit
7d4431c93b
@ -260,7 +260,7 @@ public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void done(TaskAttemptID taskAttemptID) throws IOException {
|
public void done(TaskAttemptID taskAttemptID) throws IOException {
|
||||||
LOG.info("Done acknowledgement from " + taskAttemptID.toString());
|
LOG.info("Done acknowledgment from " + taskAttemptID.toString());
|
||||||
|
|
||||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
|
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
|
||||||
TypeConverter.toYarn(taskAttemptID);
|
TypeConverter.toYarn(taskAttemptID);
|
||||||
|
@ -400,7 +400,7 @@ protected void serviceStop() throws Exception {
|
|||||||
}
|
}
|
||||||
mi.shutDownTimer();
|
mi.shutDownTimer();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Exception while cancelling delayed flush timer. "
|
LOG.info("Exception while canceling delayed flush timer. "
|
||||||
+ "Likely caused by a failed flush " + e.getMessage());
|
+ "Likely caused by a failed flush " + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1312,7 +1312,7 @@ void taskCleanup(TaskUmbilicalProtocol umbilical)
|
|||||||
setPhase(TaskStatus.Phase.CLEANUP);
|
setPhase(TaskStatus.Phase.CLEANUP);
|
||||||
getProgress().setStatus("cleanup");
|
getProgress().setStatus("cleanup");
|
||||||
statusUpdate(umbilical);
|
statusUpdate(umbilical);
|
||||||
LOG.info("Runnning cleanup for the task");
|
LOG.info("Running cleanup for the task");
|
||||||
// do the cleanup
|
// do the cleanup
|
||||||
committer.abortTask(taskContext);
|
committer.abortTask(taskContext);
|
||||||
}
|
}
|
||||||
|
@ -672,7 +672,7 @@ private void checkVersion() throws IOException {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
||||||
LOG.info("Storing state DB schedma version info " + getCurrentVersion());
|
LOG.info("Storing state DB schema version info " + getCurrentVersion());
|
||||||
storeVersion();
|
storeVersion();
|
||||||
} else {
|
} else {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
|
@ -20,10 +20,13 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.ConnectException;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
@ -37,6 +40,10 @@
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import javax.net.ssl.HttpsURLConnection;
|
||||||
|
import javax.net.ssl.SSLSocketFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.cache.Cache;
|
import com.google.common.cache.Cache;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
@ -50,7 +57,12 @@
|
|||||||
import org.apache.commons.lang.time.DurationFormatUtils;
|
import org.apache.commons.lang.time.DurationFormatUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.http.HttpConfig.Policy;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||||
|
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
|
||||||
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||||
@ -60,12 +72,17 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.QueueStatistics;
|
import org.apache.hadoop.yarn.api.records.QueueStatistics;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
|
|
||||||
public class TopCLI extends YarnCLI {
|
public class TopCLI extends YarnCLI {
|
||||||
|
|
||||||
|
private static final String CLUSTER_INFO_URL = "/ws/v1/cluster/info";
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TopCLI.class);
|
private static final Log LOG = LogFactory.getLog(TopCLI.class);
|
||||||
private String CLEAR = "\u001b[2J";
|
private String CLEAR = "\u001b[2J";
|
||||||
private String CLEAR_LINE = "\u001b[2K";
|
private String CLEAR_LINE = "\u001b[2K";
|
||||||
@ -742,18 +759,12 @@ protected QueueMetrics getQueueMetrics() {
|
|||||||
|
|
||||||
long getRMStartTime() {
|
long getRMStartTime() {
|
||||||
try {
|
try {
|
||||||
URL url =
|
// connect with url
|
||||||
new URL("http://"
|
URL url = getClusterUrl();
|
||||||
+ client.getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS)
|
if (null == url) {
|
||||||
+ "/ws/v1/cluster/info");
|
return -1;
|
||||||
URLConnection conn = url.openConnection();
|
}
|
||||||
conn.connect();
|
JSONObject clusterInfo = getJSONObject(connect(url));
|
||||||
InputStream in = conn.getInputStream();
|
|
||||||
String encoding = conn.getContentEncoding();
|
|
||||||
encoding = encoding == null ? "UTF-8" : encoding;
|
|
||||||
String body = IOUtils.toString(in, encoding);
|
|
||||||
JSONObject obj = new JSONObject(body);
|
|
||||||
JSONObject clusterInfo = obj.getJSONObject("clusterInfo");
|
|
||||||
return clusterInfo.getLong("startedOn");
|
return clusterInfo.getLong("startedOn");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Could not fetch RM start time", e);
|
LOG.error("Could not fetch RM start time", e);
|
||||||
@ -761,6 +772,80 @@ long getRMStartTime() {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private JSONObject getJSONObject(URLConnection conn)
|
||||||
|
throws IOException, JSONException {
|
||||||
|
InputStream in = conn.getInputStream();
|
||||||
|
String encoding = conn.getContentEncoding();
|
||||||
|
encoding = encoding == null ? "UTF-8" : encoding;
|
||||||
|
String body = IOUtils.toString(in, encoding);
|
||||||
|
JSONObject obj = new JSONObject(body);
|
||||||
|
JSONObject clusterInfo = obj.getJSONObject("clusterInfo");
|
||||||
|
return clusterInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private URL getClusterUrl() throws Exception {
|
||||||
|
URL url = null;
|
||||||
|
Configuration conf = getConf();
|
||||||
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
|
Collection<String> haids = HAUtil.getRMHAIds(conf);
|
||||||
|
for (String rmhid : haids) {
|
||||||
|
try {
|
||||||
|
url = getHAClusterUrl(conf, rmhid);
|
||||||
|
if (isActive(url)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (ConnectException e) {
|
||||||
|
// ignore and try second one when one of RM is down
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
url = new URL(
|
||||||
|
WebAppUtils.getRMWebAppURLWithScheme(conf) + CLUSTER_INFO_URL);
|
||||||
|
}
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isActive(URL url) throws Exception {
|
||||||
|
URLConnection connect = connect(url);
|
||||||
|
JSONObject clusterInfo = getJSONObject(connect);
|
||||||
|
return clusterInfo.getString("haState").equals("ACTIVE");
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public URL getHAClusterUrl(Configuration conf, String rmhid)
|
||||||
|
throws MalformedURLException {
|
||||||
|
return new URL(WebAppUtils.getHttpSchemePrefix(conf)
|
||||||
|
+ WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(conf,
|
||||||
|
YarnConfiguration.useHttps(conf) ? Policy.HTTPS_ONLY
|
||||||
|
: Policy.HTTP_ONLY,
|
||||||
|
rmhid)
|
||||||
|
+ CLUSTER_INFO_URL);
|
||||||
|
}
|
||||||
|
|
||||||
|
private URLConnection connect(URL url) throws Exception {
|
||||||
|
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
||||||
|
AuthenticatedURL authUrl;
|
||||||
|
SSLFactory clientSslFactory;
|
||||||
|
URLConnection connection;
|
||||||
|
// If https is chosen, configures SSL client.
|
||||||
|
if (YarnConfiguration.useHttps(getConf())) {
|
||||||
|
clientSslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, getConf());
|
||||||
|
clientSslFactory.init();
|
||||||
|
SSLSocketFactory sslSocktFact = clientSslFactory.createSSLSocketFactory();
|
||||||
|
|
||||||
|
authUrl =
|
||||||
|
new AuthenticatedURL(new KerberosAuthenticator(), clientSslFactory);
|
||||||
|
connection = authUrl.openConnection(url, token);
|
||||||
|
HttpsURLConnection httpsConn = (HttpsURLConnection) connection;
|
||||||
|
httpsConn.setSSLSocketFactory(sslSocktFact);
|
||||||
|
} else {
|
||||||
|
authUrl = new AuthenticatedURL(new KerberosAuthenticator());
|
||||||
|
connection = authUrl.openConnection(url, token);
|
||||||
|
}
|
||||||
|
connection.connect();
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
|
String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
|
||||||
StringBuilder ret = new StringBuilder();
|
StringBuilder ret = new StringBuilder();
|
||||||
String queue = "root";
|
String queue = "root";
|
||||||
@ -768,7 +853,10 @@ String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
|
|||||||
queue = StringUtils.join(queues, ",");
|
queue = StringUtils.join(queues, ",");
|
||||||
}
|
}
|
||||||
long now = Time.now();
|
long now = Time.now();
|
||||||
long uptime = now - rmStartTime;
|
long uptime = 0L;
|
||||||
|
if (rmStartTime != -1) {
|
||||||
|
uptime = now - rmStartTime;
|
||||||
|
}
|
||||||
long days = TimeUnit.MILLISECONDS.toDays(uptime);
|
long days = TimeUnit.MILLISECONDS.toDays(uptime);
|
||||||
long hours =
|
long hours =
|
||||||
TimeUnit.MILLISECONDS.toHours(uptime)
|
TimeUnit.MILLISECONDS.toHours(uptime)
|
||||||
|
@ -0,0 +1,106 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.cli;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for TopCli.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TestTopCLI {
|
||||||
|
|
||||||
|
private static final String RM1_NODE_ID = "rm1";
|
||||||
|
private static final String RM2_NODE_ID = "rm2";
|
||||||
|
|
||||||
|
private static List<String> dummyHostNames =
|
||||||
|
Arrays.asList("host1", "host2", "host3");
|
||||||
|
|
||||||
|
private static Map<String, String> savedStaticResolution = new HashMap<>();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void initializeDummyHostnameResolution() throws Exception {
|
||||||
|
String previousIpAddress;
|
||||||
|
for (String hostName : dummyHostNames) {
|
||||||
|
previousIpAddress = NetUtils.getStaticResolution(hostName);
|
||||||
|
if (null != previousIpAddress) {
|
||||||
|
savedStaticResolution.put(hostName, previousIpAddress);
|
||||||
|
}
|
||||||
|
NetUtils.addStaticResolution(hostName, "10.20.30.40");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void restoreDummyHostnameResolution() throws Exception {
|
||||||
|
for (Map.Entry<String, String> hostnameToIpEntry : savedStaticResolution
|
||||||
|
.entrySet()) {
|
||||||
|
NetUtils.addStaticResolution(hostnameToIpEntry.getKey(),
|
||||||
|
hostnameToIpEntry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHAClusterInfoURL() throws IOException, InterruptedException {
|
||||||
|
TopCLI topcli = new TopCLI();
|
||||||
|
// http
|
||||||
|
String rm1Address = "host2:8088";
|
||||||
|
String rm2Address = "host3:8088";
|
||||||
|
Configuration conf = topcli.getConf();
|
||||||
|
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + "." + RM1_NODE_ID,
|
||||||
|
rm1Address);
|
||||||
|
topcli.getConf().set(
|
||||||
|
YarnConfiguration.RM_WEBAPP_ADDRESS + "." + RM2_NODE_ID, rm2Address);
|
||||||
|
topcli.getConf().setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
topcli.getConf().set(YarnConfiguration.RM_HA_IDS,
|
||||||
|
RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
|
URL clusterUrl = topcli.getHAClusterUrl(conf, RM1_NODE_ID);
|
||||||
|
Assert.assertEquals("http", clusterUrl.getProtocol());
|
||||||
|
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
|
||||||
|
clusterUrl = topcli.getHAClusterUrl(conf, RM2_NODE_ID);
|
||||||
|
Assert.assertEquals("http", clusterUrl.getProtocol());
|
||||||
|
Assert.assertEquals(rm2Address, clusterUrl.getAuthority());
|
||||||
|
// https
|
||||||
|
rm1Address = "host2:9088";
|
||||||
|
rm2Address = "host3:9088";
|
||||||
|
conf = topcli.getConf();
|
||||||
|
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + "." + RM1_NODE_ID,
|
||||||
|
rm1Address);
|
||||||
|
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + "." + RM2_NODE_ID,
|
||||||
|
rm2Address);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
|
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
|
||||||
|
clusterUrl = topcli.getHAClusterUrl(conf, RM1_NODE_ID);
|
||||||
|
Assert.assertEquals("https", clusterUrl.getProtocol());
|
||||||
|
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
|
||||||
|
}
|
||||||
|
}
|
@ -142,7 +142,7 @@ public void setDrainEventsOnStop() {
|
|||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
if (drainEventsOnStop) {
|
if (drainEventsOnStop) {
|
||||||
blockNewEvents = true;
|
blockNewEvents = true;
|
||||||
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
|
LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
|
||||||
long endTime = System.currentTimeMillis() + getConfig()
|
long endTime = System.currentTimeMillis() + getConfig()
|
||||||
.getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
|
.getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
|
||||||
YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
|
YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
|
||||||
|
@ -54,7 +54,7 @@ public static YarnAuthorizationProvider getInstance(Configuration conf) {
|
|||||||
(YarnAuthorizationProvider) ReflectionUtils.newInstance(
|
(YarnAuthorizationProvider) ReflectionUtils.newInstance(
|
||||||
authorizerClass, conf);
|
authorizerClass, conf);
|
||||||
authorizer.init(conf);
|
authorizer.init(conf);
|
||||||
LOG.info(authorizerClass.getName() + " is instiantiated.");
|
LOG.info(authorizerClass.getName() + " is instantiated.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return authorizer;
|
return authorizer;
|
||||||
|
@ -40,7 +40,6 @@
|
|||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.RMHAUtils;
|
import org.apache.hadoop.yarn.util.RMHAUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
@ -183,32 +182,32 @@ public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf,
|
|||||||
|
|
||||||
public static String getResolvedRemoteRMWebAppURLWithoutScheme(Configuration conf,
|
public static String getResolvedRemoteRMWebAppURLWithoutScheme(Configuration conf,
|
||||||
Policy httpPolicy) {
|
Policy httpPolicy) {
|
||||||
InetSocketAddress address = null;
|
|
||||||
String rmId = null;
|
String rmId = null;
|
||||||
if (HAUtil.isHAEnabled(conf)) {
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
// If HA enabled, pick one of the RM-IDs and rely on redirect to go to
|
// If HA enabled, pick one of the RM-IDs and rely on redirect to go to
|
||||||
// the Active RM
|
// the Active RM
|
||||||
rmId = (String) HAUtil.getRMHAIds(conf).toArray()[0];
|
rmId = (String) HAUtil.getRMHAIds(conf).toArray()[0];
|
||||||
}
|
}
|
||||||
|
return getResolvedRemoteRMWebAppURLWithoutScheme(conf, httpPolicy, rmId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getResolvedRemoteRMWebAppURLWithoutScheme(
|
||||||
|
Configuration conf, Policy httpPolicy, String rmId) {
|
||||||
|
InetSocketAddress address = null;
|
||||||
|
|
||||||
if (httpPolicy == Policy.HTTPS_ONLY) {
|
if (httpPolicy == Policy.HTTPS_ONLY) {
|
||||||
address =
|
address = conf.getSocketAddr(
|
||||||
conf.getSocketAddr(
|
rmId == null ? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
|
||||||
rmId == null
|
: HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
|
||||||
? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
|
rmId),
|
||||||
: HAUtil.addSuffix(
|
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
|
||||||
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, rmId),
|
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
|
||||||
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
|
|
||||||
} else {
|
} else {
|
||||||
address =
|
address = conf.getSocketAddr(
|
||||||
conf.getSocketAddr(
|
rmId == null ? YarnConfiguration.RM_WEBAPP_ADDRESS
|
||||||
rmId == null
|
: HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, rmId),
|
||||||
? YarnConfiguration.RM_WEBAPP_ADDRESS
|
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
|
||||||
: HAUtil.addSuffix(
|
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
|
||||||
YarnConfiguration.RM_WEBAPP_ADDRESS, rmId),
|
|
||||||
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
|
|
||||||
}
|
}
|
||||||
return getResolvedAddress(address);
|
return getResolvedAddress(address);
|
||||||
}
|
}
|
||||||
|
@ -405,7 +405,7 @@ public Map<ContainerId, ContainerHistoryData> getContainers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Completed reading history information of all conatiners"
|
LOG.info("Completed reading history information of all containers"
|
||||||
+ " of application attempt " + appAttemptId);
|
+ " of application attempt " + appAttemptId);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Error when reading history information of some containers"
|
LOG.info("Error when reading history information of some containers"
|
||||||
|
@ -216,7 +216,7 @@ private boolean isEnabled() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!(isPmemCheckEnabled() || isVmemCheckEnabled())) {
|
if (!(isPmemCheckEnabled() || isVmemCheckEnabled())) {
|
||||||
LOG.info("Neither virutal-memory nor physical-memory monitoring is " +
|
LOG.info("Neither virtual-memory nor physical-memory monitoring is " +
|
||||||
"needed. Not running the monitor-thread");
|
"needed. Not running the monitor-thread");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
@ -386,8 +387,8 @@ public Object answer(InvocationOnMock invocationOnMock)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}}).when(mockUtil).copy(any(Path.class), any(Path.class),
|
||||||
}).when(mockUtil).copy(any(Path.class), any(Path.class));
|
anyBoolean(), anyBoolean());
|
||||||
|
|
||||||
doAnswer(new Answer() {
|
doAnswer(new Answer() {
|
||||||
@Override
|
@Override
|
||||||
@ -478,7 +479,8 @@ public ContainerLocalizer createContainerLocalizer(String user,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify that the calls happen the expected number of times
|
// Verify that the calls happen the expected number of times
|
||||||
verify(mockUtil, times(1)).copy(any(Path.class), any(Path.class));
|
verify(mockUtil, times(1)).copy(any(Path.class), any(Path.class),
|
||||||
|
anyBoolean(), anyBoolean());
|
||||||
verify(mockLfs, times(2)).getFsStatus(any(Path.class));
|
verify(mockLfs, times(2)).getFsStatus(any(Path.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,7 +415,7 @@ protected Plan initializePlan(String planQueueName) throws YarnException {
|
|||||||
minAllocation, maxAllocation, planQueueName,
|
minAllocation, maxAllocation, planQueueName,
|
||||||
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
|
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
|
||||||
.getMoveOnExpiry(planQueuePath), rmContext);
|
.getMoveOnExpiry(planQueuePath), rmContext);
|
||||||
LOG.info("Intialized plan {} based on reservable queue {}",
|
LOG.info("Initialized plan {} based on reservable queue {}",
|
||||||
plan.toString(), planQueueName);
|
plan.toString(), planQueueName);
|
||||||
return plan;
|
return plan;
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
|
|||||||
LOG.info("Initializing " + queueName + "\n" +
|
LOG.info("Initializing " + queueName + "\n" +
|
||||||
"capacity = " + queueCapacities.getCapacity() +
|
"capacity = " + queueCapacities.getCapacity() +
|
||||||
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
||||||
"asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
|
"absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
|
||||||
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
|
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
|
||||||
"maxCapacity = " + queueCapacities.getMaximumCapacity() +
|
"maxCapacity = " + queueCapacities.getMaximumCapacity() +
|
||||||
" [= configuredMaxCapacity ]" + "\n" +
|
" [= configuredMaxCapacity ]" + "\n" +
|
||||||
|
@ -125,9 +125,9 @@ synchronized void setupQueueConfigs(Resource clusterResource)
|
|||||||
|
|
||||||
LOG.info(queueName +
|
LOG.info(queueName +
|
||||||
", capacity=" + this.queueCapacities.getCapacity() +
|
", capacity=" + this.queueCapacities.getCapacity() +
|
||||||
", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
|
", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
|
||||||
", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
|
", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
|
||||||
", asboluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
|
", absoluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
|
||||||
", state=" + state +
|
", state=" + state +
|
||||||
", acls=" + aclsString +
|
", acls=" + aclsString +
|
||||||
", labels=" + labelStrBuilder.toString() + "\n" +
|
", labels=" + labelStrBuilder.toString() + "\n" +
|
||||||
|
@ -944,7 +944,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
|
|||||||
FSAppAttempt application = getSchedulerApp(appAttemptId);
|
FSAppAttempt application = getSchedulerApp(appAttemptId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
LOG.info("Calling allocate on removed " +
|
LOG.info("Calling allocate on removed " +
|
||||||
"or non existant application " + appAttemptId);
|
"or non existent application " + appAttemptId);
|
||||||
return EMPTY_ALLOCATION;
|
return EMPTY_ALLOCATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user