YARN-10767. Yarn Logs Command retrying on Standby RM for 30 times. Contributed by D M Murali Krishna Reddy.

This commit is contained in:
Jim Brennan 2021-06-15 18:58:42 +00:00
parent a77bf7cf07
commit 9a6a11c452
2 changed files with 12 additions and 17 deletions

View File

@ -23,6 +23,7 @@
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -35,7 +36,7 @@
@Unstable
public class RMHAUtils {
public static String findActiveRMHAId(YarnConfiguration conf) {
public static String findActiveRMHAId(Configuration conf) {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
Collection<String> rmIds =
yarnConf.getStringCollection(YarnConfiguration.RM_HA_IDS);

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.yarn.util.StringHelper.PATH_JOINER;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@ -97,24 +98,17 @@ public static void setNMWebAppHostNameAndPort(Configuration conf,
*/
public static <T, R> R execOnActiveRM(Configuration conf,
ThrowingBiFunction<String, T, R> func, T arg) throws Exception {
String rm1Address = getRMWebAppURLWithScheme(conf, 0);
try {
return func.apply(rm1Address, arg);
} catch (Exception e) {
if (HAUtil.isHAEnabled(conf)) {
int rms = HAUtil.getRMHAIds(conf).size();
for (int i=1; i<rms; i++) {
try {
rm1Address = getRMWebAppURLWithScheme(conf, i);
return func.apply(rm1Address, arg);
} catch (Exception e1) {
// ignore and try next one when RM is down
e = e1;
}
}
int haIndex = 0;
if (HAUtil.isHAEnabled(conf)) {
String activeRMId = RMHAUtils.findActiveRMHAId(conf);
if (activeRMId != null) {
haIndex = new ArrayList<>(HAUtil.getRMHAIds(conf)).indexOf(activeRMId);
} else {
throw new ConnectException("No Active RM available");
}
throw e;
}
String rm1Address = getRMWebAppURLWithScheme(conf, haIndex);
return func.apply(rm1Address, arg);
}
/** A BiFunction which throws on Exception. */