YARN-7697. NM goes down with OOM due to leak in log-aggregation. (Xuan Gong via wangda)

Change-Id: Ie4fc7979d834e25f37a033c314f3efceeb8f4a9e
This commit is contained in:
Wangda Tan 2018-02-12 10:28:35 +08:00
parent 789a185c16
commit d4c98579e3
6 changed files with 79 additions and 30 deletions

View File

@ -226,10 +226,12 @@ public abstract void renderAggregatedLogsBlock(Block html,
* Returns the owner of the application. * Returns the owner of the application.
* *
* @param aggregatedLogPath the aggregatedLog path * @param aggregatedLogPath the aggregatedLog path
* @param appId the ApplicationId
* @return the application owner * @return the application owner
* @throws IOException if we can not get the application owner * @throws IOException if we can not get the application owner
*/ */
public abstract String getApplicationOwner(Path aggregatedLogPath) public abstract String getApplicationOwner(Path aggregatedLogPath,
ApplicationId appId)
throws IOException; throws IOException;
/** /**
@ -237,11 +239,12 @@ public abstract String getApplicationOwner(Path aggregatedLogPath)
* found. * found.
* *
* @param aggregatedLogPath the aggregatedLog path. * @param aggregatedLogPath the aggregatedLog path.
* @param appId the ApplicationId
* @return a map of the Application ACLs. * @return a map of the Application ACLs.
* @throws IOException if we can not get the application acls * @throws IOException if we can not get the application acls
*/ */
public abstract Map<ApplicationAccessType, String> getApplicationAcls( public abstract Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException; Path aggregatedLogPath, ApplicationId appId) throws IOException;
/** /**
* Verify and create the remote log directory. * Verify and create the remote log directory.

View File

@ -135,7 +135,7 @@ protected void render(Block html) {
IndexedLogsMeta indexedLogsMeta = null; IndexedLogsMeta indexedLogsMeta = null;
try { try {
indexedLogsMeta = fileController.loadIndexedLogsMeta( indexedLogsMeta = fileController.loadIndexedLogsMeta(
thisNodeFile.getPath(), endIndex); thisNodeFile.getPath(), endIndex, appId);
} catch (Exception ex) { } catch (Exception ex) {
// DO NOTHING // DO NOTHING
LOG.warn("Can not load log meta from the log file:" LOG.warn("Can not load log meta from the log file:"

View File

@ -284,16 +284,8 @@ private Path initializeWriterInRolling(final Path remoteLogFile,
currentRemoteLogFile.getName())) { currentRemoteLogFile.getName())) {
overwriteCheckSum = false; overwriteCheckSum = false;
long endIndex = checksumFileInputStream.readLong(); long endIndex = checksumFileInputStream.readLong();
IndexedLogsMeta recoveredLogsMeta = null; IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta(
try { currentRemoteLogFile, endIndex, appId);
truncateFileWithRetries(fc, currentRemoteLogFile,
endIndex);
recoveredLogsMeta = loadIndexedLogsMeta(
currentRemoteLogFile);
} catch (Exception ex) {
recoveredLogsMeta = loadIndexedLogsMeta(
currentRemoteLogFile, endIndex);
}
if (recoveredLogsMeta != null) { if (recoveredLogsMeta != null) {
indexedLogsMeta = recoveredLogsMeta; indexedLogsMeta = recoveredLogsMeta;
} }
@ -524,11 +516,11 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
IndexedLogsMeta indexedLogsMeta = null; IndexedLogsMeta indexedLogsMeta = null;
try { try {
indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(), indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(),
endIndex); endIndex, appId);
} catch (Exception ex) { } catch (Exception ex) {
// DO NOTHING // DO NOTHING
LOG.warn("Can not load log meta from the log file:" LOG.warn("Can not load log meta from the log file:"
+ thisNodeFile.getPath()); + thisNodeFile.getPath() + "\n" + ex.getMessage());
continue; continue;
} }
if (indexedLogsMeta == null) { if (indexedLogsMeta == null) {
@ -636,14 +628,14 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
endIndex = checkSumIndex.longValue(); endIndex = checkSumIndex.longValue();
} }
IndexedLogsMeta current = loadIndexedLogsMeta( IndexedLogsMeta current = loadIndexedLogsMeta(
thisNodeFile.getPath(), endIndex); thisNodeFile.getPath(), endIndex, appId);
if (current != null) { if (current != null) {
listOfLogsMeta.add(current); listOfLogsMeta.add(current);
} }
} catch (IOException ex) { } catch (IOException ex) {
// DO NOTHING // DO NOTHING
LOG.warn("Can not get log meta from the log file:" LOG.warn("Can not get log meta from the log file:"
+ thisNodeFile.getPath()); + thisNodeFile.getPath() + "\n" + ex.getMessage());
} }
} }
for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) { for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) {
@ -721,6 +713,7 @@ public boolean apply(FileStatus next) {
checkSumFiles.put(nodeName, Long.valueOf(index)); checkSumFiles.put(nodeName, Long.valueOf(index));
} }
} catch (IOException ex) { } catch (IOException ex) {
LOG.warn(ex.getMessage());
continue; continue;
} finally { } finally {
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
@ -773,25 +766,26 @@ public void renderAggregatedLogsBlock(Block html, ViewContext context) {
} }
@Override @Override
public String getApplicationOwner(Path aggregatedLogPath) public String getApplicationOwner(Path aggregatedLogPath,
ApplicationId appId)
throws IOException { throws IOException {
if (this.cachedIndexedLogsMeta == null if (this.cachedIndexedLogsMeta == null
|| !this.cachedIndexedLogsMeta.getRemoteLogPath() || !this.cachedIndexedLogsMeta.getRemoteLogPath()
.equals(aggregatedLogPath)) { .equals(aggregatedLogPath)) {
this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); loadIndexedLogsMeta(aggregatedLogPath, appId), aggregatedLogPath);
} }
return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser(); return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser();
} }
@Override @Override
public Map<ApplicationAccessType, String> getApplicationAcls( public Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException { Path aggregatedLogPath, ApplicationId appId) throws IOException {
if (this.cachedIndexedLogsMeta == null if (this.cachedIndexedLogsMeta == null
|| !this.cachedIndexedLogsMeta.getRemoteLogPath() || !this.cachedIndexedLogsMeta.getRemoteLogPath()
.equals(aggregatedLogPath)) { .equals(aggregatedLogPath)) {
this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); loadIndexedLogsMeta(aggregatedLogPath, appId), aggregatedLogPath);
} }
return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls(); return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls();
} }
@ -804,8 +798,8 @@ public Path getRemoteAppLogDir(ApplicationId appId, String user)
} }
@Private @Private
public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end) public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end,
throws IOException { ApplicationId appId) throws IOException {
FileContext fileContext = FileContext fileContext =
FileContext.getFileContext(remoteLogPath.toUri(), conf); FileContext.getFileContext(remoteLogPath.toUri(), conf);
FSDataInputStream fsDataIStream = null; FSDataInputStream fsDataIStream = null;
@ -816,8 +810,36 @@ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end)
} }
long fileLength = end < 0 ? fileContext.getFileStatus( long fileLength = end < 0 ? fileContext.getFileStatus(
remoteLogPath).getLen() : end; remoteLogPath).getLen() : end;
fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH); fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
int offset = fsDataIStream.readInt(); int offset = fsDataIStream.readInt();
// If the offset/log meta size is larger than 64M,
// output a warn message for better debug.
if (offset > 64 * 1024 * 1024) {
LOG.warn("The log meta size read from " + remoteLogPath
+ " is " + offset);
}
// Load UUID and make sure the UUID is correct.
byte[] uuidRead = new byte[UUID_LENGTH];
int uuidReadLen = fsDataIStream.read(uuidRead);
if (this.uuid == null) {
this.uuid = createUUID(appId);
}
if (uuidReadLen != UUID_LENGTH || !Arrays.equals(this.uuid, uuidRead)) {
if (LOG.isDebugEnabled()) {
LOG.debug("the length of loaded UUID:" + uuidReadLen);
LOG.debug("the loaded UUID:" + new String(uuidRead,
Charset.forName("UTF-8")));
LOG.debug("the expected UUID:" + new String(this.uuid,
Charset.forName("UTF-8")));
}
throw new IOException("The UUID from "
+ remoteLogPath + " is not correct. The offset of loaded UUID is "
+ (fileLength - UUID_LENGTH));
}
// Load Log Meta
byte[] array = new byte[offset]; byte[] array = new byte[offset];
fsDataIStream.seek( fsDataIStream.seek(
fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH); fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
@ -833,9 +855,9 @@ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end)
} }
} }
private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath) private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath,
throws IOException { ApplicationId appId) throws IOException {
return loadIndexedLogsMeta(remoteLogPath, -1); return loadIndexedLogsMeta(remoteLogPath, -1, appId);
} }
/** /**
@ -1040,6 +1062,7 @@ private static class IndexedFileOutputStreamState {
this.out = compressAlgo.createCompressionStream( this.out = compressAlgo.createCompressionStream(
fsBufferedOutput, compressor, 0); fsBufferedOutput, compressor, 0);
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage());
compressAlgo.returnCompressor(compressor); compressAlgo.returnCompressor(compressor);
throw e; throw e;
} }

View File

@ -335,14 +335,15 @@ public void renderAggregatedLogsBlock(Block html, ViewContext context) {
} }
@Override @Override
public String getApplicationOwner(Path aggregatedLog) throws IOException { public String getApplicationOwner(Path aggregatedLog, ApplicationId appId)
throws IOException {
createTFileLogReader(aggregatedLog); createTFileLogReader(aggregatedLog);
return this.tfReader.getLogReader().getApplicationOwner(); return this.tfReader.getLogReader().getApplicationOwner();
} }
@Override @Override
public Map<ApplicationAccessType, String> getApplicationAcls( public Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLog) throws IOException { Path aggregatedLog, ApplicationId appId) throws IOException {
createTFileLogReader(aggregatedLog); createTFileLogReader(aggregatedLog);
return this.tfReader.getLogReader().getApplicationAcls(); return this.tfReader.getLogReader().getApplicationAcls();
} }

View File

@ -194,14 +194,15 @@ public void renderAggregatedLogsBlock(Block html, ViewContext context) {
} }
@Override @Override
public String getApplicationOwner(Path aggregatedLogPath) public String getApplicationOwner(Path aggregatedLogPath,
ApplicationId appId)
throws IOException { throws IOException {
return null; return null;
} }
@Override @Override
public Map<ApplicationAccessType, String> getApplicationAcls( public Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException { Path aggregatedLogPath, ApplicationId appId) throws IOException {
return null; return null;
} }
} }

View File

@ -55,7 +55,9 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After; import org.junit.After;
@ -219,6 +221,25 @@ public boolean isRollover(final FileContext fc,
} }
sysOutStream.reset(); sysOutStream.reset();
Configuration factoryConf = new Configuration(conf);
factoryConf.set("yarn.log-aggregation.file-formats", "Indexed");
factoryConf.set("yarn.log-aggregation.file-controller.Indexed.class",
"org.apache.hadoop.yarn.logaggregation.filecontroller.ifile"
+ ".LogAggregationIndexedFileController");
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(factoryConf);
LogAggregationFileController fileController = factory
.getFileControllerForRead(appId, USER_UGI.getShortUserName());
Assert.assertTrue(fileController instanceof
LogAggregationIndexedFileController);
foundLogs = fileController.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : logTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
sysOutStream.reset();
// create a checksum file // create a checksum file
Path checksumFile = new Path(fileFormat.getRemoteAppLogDir( Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
appId, USER_UGI.getShortUserName()), appId, USER_UGI.getShortUserName()),