From 4f758dd4c682bacbb110c51a96079a6c5d103c95 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 5 Dec 2019 21:49:00 +0100 Subject: [PATCH] YARN-9607. Auto-configuring rollover-size of IFile format for non-appendable filesystems. Contributed by Adam Antal --- .../LogAggregationFileController.java | 5 --- .../LogAggregationIndexedFileController.java | 33 +++++++++++-------- ...stLogAggregationIndexedFileController.java | 28 ++++++++++++++++ 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 001f4f53e7..ec633d6de4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -99,11 +99,6 @@ public abstract class LogAggregationFileController { protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission .createImmutable((short) (0640 ^ 0777)); - // This is temporary solution. The configuration will be deleted once we have - // the FileSystem API to check whether append operation is supported or not. - public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND - = YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append"; - protected Configuration conf; protected Path remoteRootLogDir; protected String remoteRootLogDirSuffix; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index a27d809904..7af58d7a83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -47,12 +47,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -69,7 +72,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; @@ -135,16 +137,6 @@ public LogAggregationIndexedFileController() {} @Override public void initInternal(Configuration conf) { - // Currently, we need the underlying File System to support append - // operation. Will remove this check after we finish - // LogAggregationIndexedFileController for non-append mode. - boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true); - if (!append) { - throw new YarnRuntimeException("The configuration:" - + LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only" - + " use LogAggregationIndexedFileController when the FileSystem " - + "support append operations."); - } String compressName = conf.get( YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); @@ -1139,8 +1131,23 @@ public static int getFSInputBufferSize(Configuration conf) { @Private @VisibleForTesting public long getRollOverLogMaxSize(Configuration conf) { - return 1024L * 1024 * 1024 * conf.getInt( - LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + boolean supportAppend = false; + try { + FileSystem fs = FileSystem.get(remoteRootLogDir.toUri(), conf); + if (fs instanceof LocalFileSystem || fs.hasPathCapability( + remoteRootLogDir, CommonPathCapabilities.FS_APPEND)) { + supportAppend = true; + } + } catch (Exception ioe) { + LOG.warn("Unable to determine if the filesystem supports " + + "append operation", ioe); + } + if (supportAppend) { + return 1024L * 1024 * 1024 * conf.getInt( + LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + } else { + return 0L; + } } private abstract class FSAction { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java index 098f3be4ce..73351813e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java @@ -469,4 +469,32 @@ private File createAndWriteLocalLogFile(Path localLogDir, String logType, private String logMessage(ContainerId containerId, String logType) { return "Hello " + containerId + " in " + logType + "!"; } + + @Test + public void testGetRollOverLogMaxSize() { + String fileControllerName = "testController"; + String remoteDirConf = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + fileControllerName); + Configuration conf = new Configuration(); + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + long defaultRolloverSize = 10L * 1024 * 1024 * 1024; + + // test local filesystem + fileFormat.initialize(conf, fileControllerName); + assertThat(fileFormat.getRollOverLogMaxSize(conf)) + .isEqualTo(defaultRolloverSize); + + // test file system supporting append + conf.set(remoteDirConf, "webhdfs://localhost/path"); + fileFormat.initialize(conf, fileControllerName); + assertThat(fileFormat.getRollOverLogMaxSize(conf)) + .isEqualTo(defaultRolloverSize); + + // test file system not supporting append + conf.set(remoteDirConf, "s3a://test/path"); + fileFormat.initialize(conf, fileControllerName); + assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero(); + } }