YARN-11175. Refactor LogAggregationFileControllerFactory

This commit is contained in:
9uapaw 2022-06-13 13:58:13 +02:00
parent 170668b994
commit c9a174a260

View File

@ -53,9 +53,8 @@ public class LogAggregationFileControllerFactory {
LogAggregationFileControllerFactory.class); LogAggregationFileControllerFactory.class);
private final Pattern p = Pattern.compile( private final Pattern p = Pattern.compile(
"^[A-Za-z_]+[A-Za-z0-9_]*$"); "^[A-Za-z_]+[A-Za-z0-9_]*$");
private LinkedList<LogAggregationFileController> controllers private final LinkedList<LogAggregationFileController> controllers = new LinkedList<>();
= new LinkedList<>(); private final Configuration conf;
private Configuration conf;
/** /**
* Construct the LogAggregationFileControllerFactory object. * Construct the LogAggregationFileControllerFactory object.
@ -65,40 +64,45 @@ public class LogAggregationFileControllerFactory {
this.conf = conf; this.conf = conf;
Collection<String> fileControllers = conf.getStringCollection( Collection<String> fileControllers = conf.getStringCollection(
YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS); YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
List<String> controllerClassName = new ArrayList<>();
Map<String, String> controllerChecker = new HashMap<>(); Map<String, String> controllerChecker = new HashMap<>();
for (String fileController : fileControllers) { for (String controllerName : fileControllers) {
Preconditions.checkArgument(validateAggregatedFileControllerName( validateAggregatedFileControllerName(controllerName);
fileController), "The FileControllerName: " + fileController
+ " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
+" is invalid." + "The valid File Controller name should only "
+ "contain a-zA-Z0-9_ and can not start with numbers");
String remoteDirStr = String.format( validateConflictingControllers(conf, controllerChecker, controllerName);
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, DeterminedControllerClassName className =
fileController); new DeterminedControllerClassName(conf, controllerName);
String remoteDir = conf.get(remoteDirStr); LogAggregationFileController controller = createFileControllerInstance(conf,
boolean defaultRemoteDir = false; controllerName, className);
if (remoteDir == null || remoteDir.isEmpty()) { controller.initialize(conf, controllerName);
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, controllers.add(controller);
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
defaultRemoteDir = true;
} }
String suffixStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
fileController);
String suffix = conf.get(suffixStr);
boolean defaultSuffix = false;
if (suffix == null || suffix.isEmpty()) {
suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
defaultSuffix = true;
} }
String dirSuffix = remoteDir + "-" + suffix;
private LogAggregationFileController createFileControllerInstance(
Configuration conf,
String fileController, DeterminedControllerClassName className) {
Class<? extends LogAggregationFileController> clazz = conf.getClass(
className.configKey, null, LogAggregationFileController.class);
if (clazz == null) {
throw new RuntimeException("No class defined for " + fileController);
}
LogAggregationFileController instance = ReflectionUtils.newInstance(clazz, conf);
if (instance == null) {
throw new RuntimeException("No object created for " + className.value);
}
return instance;
}
private void validateConflictingControllers(
Configuration conf, Map<String, String> controllerChecker, String fileController) {
DeterminedLogAggregationRemoteDir remoteDir =
new DeterminedLogAggregationRemoteDir(conf, fileController);
DeterminedLogAggregationSuffix suffix =
new DeterminedLogAggregationSuffix(conf, fileController);
String dirSuffix = remoteDir.value + "-" + suffix.value;
if (controllerChecker.containsKey(dirSuffix)) { if (controllerChecker.containsKey(dirSuffix)) {
if (defaultRemoteDir && defaultSuffix) { if (remoteDir.usingDefault && suffix.usingDefault) {
String fileControllerStr = controllerChecker.get(dirSuffix); String fileControllerStr = controllerChecker.get(dirSuffix);
List<String> controllersList = new ArrayList<>(); List<String> controllersList = new ArrayList<>();
controllersList.add(fileControllerStr); controllersList.add(fileControllerStr);
@ -107,36 +111,13 @@ public class LogAggregationFileControllerFactory {
controllerChecker.put(dirSuffix, fileControllerStr); controllerChecker.put(dirSuffix, fileControllerStr);
} else { } else {
String conflictController = controllerChecker.get(dirSuffix); String conflictController = controllerChecker.get(dirSuffix);
throw new RuntimeException("The combined value of " + remoteDirStr throw new RuntimeException(String.format("The combined value of %s " +
+ " and " + suffixStr + " should not be the same as the value" "and %s should not be the same as the value set for %s",
+ " set for " + conflictController); remoteDir.configKey, suffix.configKey, conflictController));
} }
} else { } else {
controllerChecker.put(dirSuffix, fileController); controllerChecker.put(dirSuffix, fileController);
} }
String classKey = String.format(
YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
fileController);
String className = conf.get(classKey);
if (className == null || className.isEmpty()) {
throw new RuntimeException("No class configured for "
+ fileController);
}
controllerClassName.add(className);
Class<? extends LogAggregationFileController> sClass = conf.getClass(
classKey, null, LogAggregationFileController.class);
if (sClass == null) {
throw new RuntimeException("No class defined for " + fileController);
}
LogAggregationFileController s = ReflectionUtils.newInstance(
sClass, conf);
if (s == null) {
throw new RuntimeException("No object created for "
+ controllerClassName);
}
s.initialize(conf, fileController);
controllers.add(s);
}
} }
/** /**
@ -169,8 +150,7 @@ public class LogAggregationFileControllerFactory {
return fileController; return fileController;
} }
} catch (Exception ex) { } catch (Exception ex) {
diagnosticsMsg.append(ex.getMessage() + "\n"); diagnosticsMsg.append(ex.getMessage()).append("\n");
continue;
} }
} }
} }
@ -184,19 +164,26 @@ public class LogAggregationFileControllerFactory {
return fileController; return fileController;
} }
} catch (Exception ex) { } catch (Exception ex) {
diagnosticsMsg.append(ex.getMessage() + "\n"); diagnosticsMsg.append(ex.getMessage()).append("\n");
continue;
} }
} }
throw new IOException(diagnosticsMsg.toString()); throw new IOException(diagnosticsMsg.toString());
} }
private boolean validateAggregatedFileControllerName(String name) { private void validateAggregatedFileControllerName(String name) {
boolean valid;
if (name == null || name.trim().isEmpty()) { if (name == null || name.trim().isEmpty()) {
return false; valid = false;
} else {
valid = p.matcher(name).matches();
} }
return p.matcher(name).matches();
Preconditions.checkArgument(valid,
String.format("The FileControllerName: %s set in " +
"%s is invalid.The valid File Controller name should only contain " +
"a-zA-Z0-9_ and cannot start with numbers", name,
YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS));
} }
@Private @Private
@ -205,4 +192,64 @@ public class LogAggregationFileControllerFactory {
getConfiguredLogAggregationFileControllerList() { getConfiguredLogAggregationFileControllerList() {
return this.controllers; return this.controllers;
} }
private static class DeterminedLogAggregationRemoteDir {
private String value;
private boolean usingDefault = false;
private final String configKey;
DeterminedLogAggregationRemoteDir(Configuration conf,
String fileController) {
configKey = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
fileController);
String remoteDir = conf.get(configKey);
if (remoteDir == null || remoteDir.isEmpty()) {
this.value = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
this.usingDefault = true;
} else {
this.value = remoteDir;
}
}
}
private static class DeterminedLogAggregationSuffix {
private String value;
private boolean usingDefault = false;
private final String configKey;
DeterminedLogAggregationSuffix(Configuration conf,
String fileController) {
configKey = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
fileController);
String suffix = conf.get(configKey);
if (suffix == null || suffix.isEmpty()) {
this.value = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
this.usingDefault = true;
} else {
this.value = suffix;
}
}
}
private static class DeterminedControllerClassName {
private final String configKey;
private final String value;
DeterminedControllerClassName(Configuration conf,
String fileController) {
this.configKey = String.format(
YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
fileController);
this.value = conf.get(configKey);
if (value == null || value.isEmpty()) {
throw new RuntimeException("No class configured for "
+ fileController);
}
}
}
} }