YARN-4155. TestLogAggregationService.testLogAggregationServiceWithInterval failing. (Bibin A Chundatt via stevel)
This commit is contained in:
parent
bafeb6c7bc
commit
e2d59e2c7b
@ -957,6 +957,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart.
|
YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart.
|
||||||
(Varun Saxena via jianhe)
|
(Varun Saxena via jianhe)
|
||||||
|
|
||||||
|
YARN-4155. TestLogAggregationService.testLogAggregationServiceWithInterval failing
|
||||||
|
(Bibin A Chundatt via stevel)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -248,7 +249,7 @@ protected DeletionService createDeletionService() {
|
|||||||
public void delete(String user, Path subDir, Path... baseDirs) {
|
public void delete(String user, Path subDir, Path... baseDirs) {
|
||||||
// Don't do any deletions.
|
// Don't do any deletions.
|
||||||
LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
|
LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
|
||||||
+ ", baseDirs - " + baseDirs);
|
+ ", baseDirs - " + Arrays.asList(baseDirs));
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -788,7 +788,9 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId,
|
|||||||
// ContainerLogDir should be created
|
// ContainerLogDir should be created
|
||||||
String containerStr = ConverterUtils.toString(containerId);
|
String containerStr = ConverterUtils.toString(containerId);
|
||||||
File containerLogDir = new File(appLogDir, containerStr);
|
File containerLogDir = new File(appLogDir, containerStr);
|
||||||
containerLogDir.mkdir();
|
boolean created = containerLogDir.mkdirs();
|
||||||
|
LOG.info("Created Dir:" + containerLogDir.getAbsolutePath() + " status :"
|
||||||
|
+ created);
|
||||||
for (String fileType : fileName) {
|
for (String fileType : fileName) {
|
||||||
Writer writer11 = new FileWriter(new File(containerLogDir, fileType));
|
Writer writer11 = new FileWriter(new File(containerLogDir, fileType));
|
||||||
writer11.write(containerStr + " Hello " + fileType + "!");
|
writer11.write(containerStr + " Hello " + fileType + "!");
|
||||||
@ -1926,7 +1928,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
|
|||||||
//configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
|
//configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
|
||||||
//have fully qualified path
|
//have fully qualified path
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
"file://" + this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.toURI().toString());
|
||||||
this.conf.setLong(
|
this.conf.setLong(
|
||||||
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
||||||
3600);
|
3600);
|
||||||
@ -1943,7 +1945,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
|
|||||||
// again in next cycle.
|
// again in next cycle.
|
||||||
this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
|
this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
|
||||||
|
|
||||||
ApplicationId application = BuilderUtils.newApplicationId(123456, 1);
|
ApplicationId application =
|
||||||
|
BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
BuilderUtils.newApplicationAttemptId(application, 1);
|
BuilderUtils.newApplicationAttemptId(application, 1);
|
||||||
ContainerId container = createContainer(appAttemptId, 1,
|
ContainerId container = createContainer(appAttemptId, 1,
|
||||||
@ -2009,8 +2012,10 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
|
|||||||
|
|
||||||
// Same logs will not be aggregated again.
|
// Same logs will not be aggregated again.
|
||||||
// Only one aggregated log file in Remote file directory.
|
// Only one aggregated log file in Remote file directory.
|
||||||
Assert.assertEquals(numOfLogsAvailable(logAggregationService,
|
Assert.assertTrue(
|
||||||
application, true, null), 1);
|
"Only one aggregated log file in Remote file directory expected",
|
||||||
|
waitAndCheckLogNum(logAggregationService, application, 50, 1, true,
|
||||||
|
null));
|
||||||
|
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
@ -2134,6 +2139,7 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
|||||||
FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf)
|
FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf)
|
||||||
.listStatus(appLogDir);
|
.listStatus(appLogDir);
|
||||||
} catch (FileNotFoundException fnf) {
|
} catch (FileNotFoundException fnf) {
|
||||||
|
LOG.info("Context file not vailable: " + fnf);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int count = 0;
|
int count = 0;
|
||||||
@ -2143,13 +2149,17 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
|||||||
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
|
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
|
||||||
|| (lastLogFile != null && filename.contains(lastLogFile)
|
|| (lastLogFile != null && filename.contains(lastLogFile)
|
||||||
&& sizeLimited)) {
|
&& sizeLimited)) {
|
||||||
|
LOG.info("fileName :" + filename);
|
||||||
|
LOG.info("lastLogFile :" + lastLogFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (filename.contains(LogAggregationUtils
|
if (filename.contains(LogAggregationUtils
|
||||||
.getNodeString(logAggregationService.getNodeId()))) {
|
.getNodeString(logAggregationService.getNodeId()))) {
|
||||||
|
LOG.info("Node list filename :" + filename);
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("File Count :" + count);
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2158,13 +2168,17 @@ private boolean waitAndCheckLogNum(
|
|||||||
int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
|
int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
int logFiles=numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
||||||
lastLogFile) != expectNum && count <= maxAttempts) {
|
lastLogFile);
|
||||||
|
while ((logFiles != expectNum)
|
||||||
|
&& (count <= maxAttempts)) {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
count++;
|
count++;
|
||||||
|
logFiles =
|
||||||
|
numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
||||||
|
lastLogFile);
|
||||||
}
|
}
|
||||||
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
return (logFiles == expectNum);
|
||||||
lastLogFile) == expectNum;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LogFileStatusInLastCycle {
|
private static class LogFileStatusInLastCycle {
|
||||||
|
Loading…
Reference in New Issue
Block a user