YARN-9519. TFile log aggregation file format is not working for yarn.log-aggregation.TFile.remote-app-log-dir config. Contributed by Adam Antal.

This commit is contained in:
Sunil G 2019-05-14 10:48:08 -07:00
parent 6bcc1dce52
commit 7d831eca64
4 changed files with 222 additions and 108 deletions

View File

@ -134,6 +134,10 @@ public void initialize(Configuration conf, String controllerName) {
this.retentionSize = configuredRetentionSize;
}
this.fileControllerName = controllerName;
extractRemoteRootLogDir();
extractRemoteRootLogDirSuffix();
initInternal(conf);
}
@ -250,6 +254,45 @@ public abstract String getApplicationOwner(Path aggregatedLogPath,
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath, ApplicationId appId) throws IOException;
/**
* Sets the remoteRootLogDirSuffix class variable extracting
* {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT}
* from the configuration, or
* {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR_SUFFIX} appended by the
* FileController's name, if the former is not set.
*/
private void extractRemoteRootLogDirSuffix() {
String suffix = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
fileControllerName);
remoteRootLogDirSuffix = conf.get(suffix);
if (remoteRootLogDirSuffix == null
|| remoteRootLogDirSuffix.isEmpty()) {
remoteRootLogDirSuffix = conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ "-" + fileControllerName.toLowerCase();
}
}
/**
* Sets the remoteRootLogDir class variable extracting
* {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT}
* from the configuration or {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR},
* if the former is not set.
*/
private void extractRemoteRootLogDir() {
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
fileControllerName);
String remoteDir = conf.get(remoteDirStr);
if (remoteDir == null || remoteDir.isEmpty()) {
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
}
remoteRootLogDir = new Path(remoteDir);
}
/**
* Verify and create the remote log directory.
*/

View File

@ -145,26 +145,6 @@ public void initInternal(Configuration conf) {
+ " use LogAggregationIndexedFileController when the FileSystem "
+ "support append operations.");
}
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
this.fileControllerName);
String remoteDir = conf.get(remoteDirStr);
if (remoteDir == null || remoteDir.isEmpty()) {
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
}
this.remoteRootLogDir = new Path(remoteDir);
String suffix = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
this.fileControllerName);
this.remoteRootLogDirSuffix = conf.get(suffix);
if (this.remoteRootLogDirSuffix == null
|| this.remoteRootLogDirSuffix.isEmpty()) {
this.remoteRootLogDirSuffix = conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ "-ifile";
}
String compressName = conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);

View File

@ -41,7 +41,6 @@
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
@ -77,12 +76,7 @@ public LogAggregationTFileController(){}
@Override
public void initInternal(Configuration conf) {
this.remoteRootLogDir = new Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
// do nothing
}
@Override

View File

@ -18,17 +18,26 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import static org.junit.Assert.*;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -38,120 +47,208 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test LogAggregationFileControllerFactory.
*
*/
public class TestLogAggregationFileControllerFactory {
public class TestLogAggregationFileControllerFactory extends Configured {
private static final Logger LOG = LoggerFactory.getLogger(
TestLogAggregationFileControllerFactory.class);
@Test(timeout = 10000)
public void testLogAggregationFileControllerFactory() throws Exception {
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
String appOwner = "test";
String remoteLogRootDir = "target/app-logs/";
private static final String REMOTE_LOG_ROOT = "target/app-logs/";
private static final String REMOTE_DEFAULT_DIR = "default/";
private static final String APP_OWNER = "test";
private static final String WRONG_ROOT_LOG_DIR_MSG =
"Wrong remote root log directory found.";
private static final String WRONG_ROOT_LOG_DIR_SUFFIX_MSG =
"Wrong remote root log directory suffix found.";
private static final List<Class<? extends LogAggregationFileController>>
ALL_FILE_CONTROLLERS = Arrays.asList(
TestLogAggregationFileController.class,
LogAggregationIndexedFileController.class,
LogAggregationTFileController.class);
private static final List<String> ALL_FILE_CONTROLLER_NAMES =
Arrays.asList("TestLogAggregationFileController", "IFile", "TFile");
private ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
@Before
public void setup() throws IOException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT +
REMOTE_DEFAULT_DIR);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
FileSystem fs = FileSystem.get(conf);
setConf(conf);
}
private void verifyFileControllerInstance(
LogAggregationFileControllerFactory factory,
Class<? extends LogAggregationFileController> className)
throws IOException {
List<LogAggregationFileController> fileControllers =
factory.getConfiguredLogAggregationFileControllerList();
FileSystem fs = FileSystem.get(getConf());
Path logPath = fileControllers.get(0).getRemoteAppLogDir(appId, APP_OWNER);
LOG.debug("Checking " + logPath);
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(conf);
LinkedList<LogAggregationFileController> list = factory
.getConfiguredLogAggregationFileControllerList();
assertTrue(list.size() == 1);
assertTrue(list.getFirst() instanceof LogAggregationTFileController);
assertTrue(factory.getFileControllerForWrite()
instanceof LogAggregationTFileController);
Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
try {
if (fs.exists(logPath)) {
fs.delete(logPath, true);
}
assertTrue(fs.mkdirs(logPath));
Writer writer =
new FileWriter(new File(logPath.toString(), "testLog"));
writer.write("test");
writer.close();
assertTrue(factory.getFileControllerForRead(appId, appOwner)
instanceof LogAggregationTFileController);
try (Writer writer =
new FileWriter(new File(logPath.toString(), "testLog"))) {
writer.write("test");
}
assertTrue("The used LogAggregationFileController is not instance of "
+ className.getSimpleName(), className.isInstance(
factory.getFileControllerForRead(appId, APP_OWNER)));
} finally {
fs.delete(logPath, true);
}
}
@Test
public void testDefaultLogAggregationFileControllerFactory()
throws IOException {
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(getConf());
List<LogAggregationFileController> list = factory
.getConfiguredLogAggregationFileControllerList();
assertEquals("Only one LogAggregationFileController is expected!", 1,
list.size());
assertTrue("TFile format is expected to be the first " +
"LogAggregationFileController!", list.get(0) instanceof
LogAggregationTFileController);
assertTrue("TFile format is expected to be used for writing!",
factory.getFileControllerForWrite() instanceof
LogAggregationTFileController);
verifyFileControllerInstance(factory, LogAggregationTFileController.class);
}
@Test(expected = Exception.class)
public void testLogAggregationFileControllerFactoryClassNotSet() {
Configuration conf = getConf();
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
"TestLogAggregationFileController");
// Did not set class for TestLogAggregationFileController,
// should get the exception.
try {
factory =
new LogAggregationFileControllerFactory(conf);
fail();
} catch (Exception ex) {
// should get exception
}
new LogAggregationFileControllerFactory(conf);
fail("TestLogAggregationFileController's class was not set, " +
"but the factory creation did not fail.");
}
private void enableFileControllers(
List<Class<? extends LogAggregationFileController>> fileControllers,
List<String> fileControllerNames) {
Configuration conf = getConf();
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
"TestLogAggregationFileController,TFile");
conf.setClass(
"yarn.log-aggregation.file-controller.TestLogAggregationFileController"
+ ".class", TestLogAggregationFileController.class,
LogAggregationFileController.class);
StringUtils.join(fileControllerNames, ","));
for (int i = 0; i < fileControllers.size(); i++) {
Class<? extends LogAggregationFileController> fileController =
fileControllers.get(i);
String controllerName = fileControllerNames.get(i);
conf.set(
"yarn.log-aggregation.TestLogAggregationFileController"
+ ".remote-app-log-dir", remoteLogRootDir);
conf.set(
"yarn.log-aggregation.TestLogAggregationFileController"
+ ".remote-app-log-dir-suffix", "testLog");
factory = new LogAggregationFileControllerFactory(conf);
list = factory.getConfiguredLogAggregationFileControllerList();
assertTrue(list.size() == 2);
assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
assertTrue(list.getLast() instanceof LogAggregationTFileController);
assertTrue(factory.getFileControllerForWrite()
instanceof TestLogAggregationFileController);
logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
try {
if (fs.exists(logPath)) {
fs.delete(logPath, true);
}
assertTrue(fs.mkdirs(logPath));
Writer writer =
new FileWriter(new File(logPath.toString(), "testLog"));
writer.write("test");
writer.close();
assertTrue(factory.getFileControllerForRead(appId, appOwner)
instanceof TestLogAggregationFileController);
} finally {
fs.delete(logPath, true);
conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
controllerName), fileController, LogAggregationFileController.class);
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
controllerName), REMOTE_LOG_ROOT + controllerName + "/");
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
controllerName), controllerName);
}
}
@Test
public void testLogAggregationFileControllerFactory() throws Exception {
enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(getConf());
List<LogAggregationFileController> list =
factory.getConfiguredLogAggregationFileControllerList();
assertEquals("The expected number of LogAggregationFileController " +
"is not 3!", 3, list.size());
assertTrue("Test format is expected to be the first " +
"LogAggregationFileController!", list.get(0) instanceof
TestLogAggregationFileController);
assertTrue("IFile format is expected to be the second " +
"LogAggregationFileController!", list.get(1) instanceof
LogAggregationIndexedFileController);
assertTrue("TFile format is expected to be the first " +
"LogAggregationFileController!", list.get(2) instanceof
LogAggregationTFileController);
assertTrue("Test format is expected to be used for writing!",
factory.getFileControllerForWrite() instanceof
TestLogAggregationFileController);
verifyFileControllerInstance(factory,
TestLogAggregationFileController.class);
}
@Test
public void testClassConfUsed() {
enableFileControllers(Collections.singletonList(
LogAggregationTFileController.class),
Collections.singletonList("TFile"));
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(getConf());
LogAggregationFileController fc = factory.getFileControllerForWrite();
assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/TFile",
fc.getRemoteRootLogDir().toString());
assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "TFile",
fc.getRemoteRootLogDirSuffix());
}
@Test
public void testNodemanagerConfigurationIsUsed() {
Configuration conf = getConf();
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(conf);
LogAggregationFileController fc = factory.getFileControllerForWrite();
assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/default",
fc.getRemoteRootLogDir().toString());
assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "log-tfile",
fc.getRemoteRootLogDirSuffix());
}
@Test
public void testDefaultConfUsed() {
Configuration conf = getConf();
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR);
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX);
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(getConf());
LogAggregationFileController fc = factory.getFileControllerForWrite();
assertEquals(WRONG_ROOT_LOG_DIR_MSG, "/tmp/logs",
fc.getRemoteRootLogDir().toString());
assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "logs-tfile",
fc.getRemoteRootLogDirSuffix());
}
private static class TestLogAggregationFileController
extends LogAggregationFileController {
@Override
public void initInternal(Configuration conf) {
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
this.fileControllerName);
this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
String suffix = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
this.fileControllerName);
this.remoteRootLogDirSuffix = conf.get(suffix);
// Do Nothing
}
@Override