YARN-6811. [ATS1.5] All history logs should be kept under its own User Directory. Contributed by Rohith Sharma K S.

This commit is contained in:
Junping Du 2017-08-04 16:03:56 -07:00
parent bbc6d254c8
commit f44b349b81
6 changed files with 224 additions and 33 deletions

View File

@ -2069,6 +2069,10 @@ public static boolean isAclEnabled(Configuration conf) {
= TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir";
/**
* Settings for timeline service v2.0
*/

View File

@ -145,10 +145,13 @@ public FileSystemTimelineWriter(Configuration conf,
new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl,
timerTaskTTL);
this.isAppendSupported =
conf.getBoolean(
this.isAppendSupported = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
boolean storeInsideUserDir = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
false);
objMapper = createObjectMapper();
int attemptDirCacheSize = conf.getInt(
@ -157,8 +160,8 @@ public FileSystemTimelineWriter(Configuration conf,
YarnConfiguration
.DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
attemptDirCache =
new AttemptDirCache(attemptDirCacheSize, fs, activePath);
attemptDirCache = new AttemptDirCache(attemptDirCacheSize, fs, activePath,
authUgi, storeInsideUserDir);
if (LOG.isDebugEnabled()) {
StringBuilder debugMSG = new StringBuilder();
@ -171,6 +174,8 @@ public FileSystemTimelineWriter(Configuration conf,
+ "=" + ttl + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ "=" + isAppendSupported + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR
+ "=" + storeInsideUserDir + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ "=" + activePath);
@ -946,8 +951,11 @@ private static class AttemptDirCache {
private final Map<ApplicationAttemptId, Path> attemptDirCache;
private final FileSystem fs;
private final Path activePath;
private final UserGroupInformation authUgi;
private final boolean storeInsideUserDir;
public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath,
UserGroupInformation ugi, boolean storeInsideUserDir) {
this.attemptDirCacheSize = cacheSize;
this.attemptDirCache =
new LinkedHashMap<ApplicationAttemptId, Path>(
@ -961,6 +969,8 @@ protected boolean removeEldestEntry(
};
this.fs = fs;
this.activePath = activePath;
this.authUgi = ugi;
this.storeInsideUserDir = storeInsideUserDir;
}
public Path getAppAttemptDir(ApplicationAttemptId attemptId)
@ -993,8 +1003,8 @@ private Path createAttemptDir(ApplicationAttemptId appAttemptId)
}
private Path createApplicationDir(ApplicationId appId) throws IOException {
Path appDir =
new Path(activePath, appId.toString());
Path appRootDir = getAppRootDir(authUgi.getShortUserName());
Path appDir = new Path(appRootDir, appId.toString());
if (FileSystem.mkdirs(fs, appDir,
new FsPermission(APP_LOG_DIR_PERMISSIONS))) {
if (LOG.isDebugEnabled()) {
@ -1003,5 +1013,19 @@ private Path createApplicationDir(ApplicationId appId) throws IOException {
}
return appDir;
}
private Path getAppRootDir(String user) throws IOException {
if (!storeInsideUserDir) {
return activePath;
}
Path userDir = new Path(activePath, user);
if (FileSystem.mkdirs(fs, userDir,
new FsPermission(APP_LOG_DIR_PERMISSIONS))) {
if (LOG.isDebugEnabled()) {
LOG.debug("New user directory created - " + userDir);
}
}
return userDir;
}
}
}

View File

@ -3244,4 +3244,14 @@
<value>0.0.0.0:8091</value>
</property>
<property>
<description>
It is TimelineClient 1.5 configuration whether to store active
applications timeline data with in user directory i.e
${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}
</description>
<name>yarn.timeline-service.entity-group-fs-store.with-user-dir</name>
<value>false</value>
</property>
</configuration>

View File

@ -59,6 +59,7 @@ public class TestTimelineClientForATS1_5 {
private static FileContext localFS;
private static File localActiveDir;
private TimelineWriter spyTimelineWriter;
private UserGroupInformation authUgi;
@Before
public void setup() throws Exception {
@ -69,6 +70,10 @@ public void setup() throws Exception {
localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
localActiveDir.mkdir();
LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
authUgi = UserGroupInformation.getCurrentUser();
}
private YarnConfiguration getConfigurations() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
@ -77,7 +82,7 @@ public void setup() throws Exception {
conf.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
"summary_type");
client = createTimelineClient(conf);
return conf;
}
@After
@ -90,6 +95,21 @@ public void tearDown() throws Exception {
@Test
public void testPostEntities() throws Exception {
client = createTimelineClient(getConfigurations());
verifyForPostEntities(false);
}
@Test
public void testPostEntitiesToKeepUnderUserDir() throws Exception {
YarnConfiguration conf = getConfigurations();
conf.setBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
true);
client = createTimelineClient(conf);
verifyForPostEntities(true);
}
private void verifyForPostEntities(boolean storeInsideUserDir) {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
TimelineEntityGroupId groupId =
@ -118,7 +138,8 @@ public void testPostEntities() throws Exception {
entityTDB[0] = entities[0];
verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId1), "summarylog-"
new Path(getAppAttemptDir(attemptId1, storeInsideUserDir),
"summarylog-"
+ attemptId1.toString())));
reset(spyTimelineWriter);
@ -132,13 +153,16 @@ public void testPostEntities() throws Exception {
verify(spyTimelineWriter, times(0)).putEntities(
any(TimelineEntity[].class));
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId2), "summarylog-"
new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
"summarylog-"
+ attemptId2.toString())));
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId2), "entitylog-"
new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
"entitylog-"
+ groupId.toString())));
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId2), "entitylog-"
new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
"entitylog-"
+ groupId2.toString())));
reset(spyTimelineWriter);
} catch (Exception e) {
@ -148,6 +172,21 @@ public void testPostEntities() throws Exception {
@Test
public void testPutDomain() {
client = createTimelineClient(getConfigurations());
verifyForPutDomain(false);
}
@Test
public void testPutDomainToKeepUnderUserDir() {
YarnConfiguration conf = getConfigurations();
conf.setBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
true);
client = createTimelineClient(conf);
verifyForPutDomain(true);
}
private void verifyForPutDomain(boolean storeInsideUserDir) {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId attemptId1 =
@ -161,23 +200,33 @@ public void testPutDomain() {
client.putDomain(attemptId1, domain);
verify(spyTimelineWriter, times(0)).putDomain(domain);
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId1), "domainlog-"
+ attemptId1.toString())));
Assert.assertTrue(localFS.util()
.exists(new Path(getAppAttemptDir(attemptId1, storeInsideUserDir),
"domainlog-" + attemptId1.toString())));
reset(spyTimelineWriter);
} catch (Exception e) {
Assert.fail("Exception is not expected." + e);
}
}
private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
Path appDir =
new Path(localActiveDir.getAbsolutePath(), appAttemptId
.getApplicationId().toString());
private Path getAppAttemptDir(ApplicationAttemptId appAttemptId,
boolean storeInsideUserDir) {
Path userDir = getUserDir(appAttemptId, storeInsideUserDir);
Path appDir = new Path(userDir, appAttemptId.getApplicationId().toString());
Path attemptDir = new Path(appDir, appAttemptId.toString());
return attemptDir;
}
private Path getUserDir(ApplicationAttemptId appAttemptId,
boolean storeInsideUserDir) {
if (!storeInsideUserDir) {
return new Path(localActiveDir.getAbsolutePath());
}
Path userDir =
new Path(localActiveDir.getAbsolutePath(), authUgi.getShortUserName());
return userDir;
}
private static TimelineEntity generateEntity(String type) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId("entity id");

View File

@ -356,7 +356,13 @@ protected void serviceStop() throws Exception {
@VisibleForTesting
int scanActiveLogs() throws IOException {
long startTime = Time.monotonicNow();
RemoteIterator<FileStatus> iter = list(activeRootPath);
int logsToScanCount = scanActiveLogs(activeRootPath);
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
return logsToScanCount;
}
int scanActiveLogs(Path dir) throws IOException {
RemoteIterator<FileStatus> iter = list(dir);
int logsToScanCount = 0;
while (iter.hasNext()) {
FileStatus stat = iter.next();
@ -368,10 +374,9 @@ int scanActiveLogs() throws IOException {
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
executor.execute(new ActiveLogParser(logs));
} else {
LOG.debug("Unable to parse entry {}", name);
logsToScanCount += scanActiveLogs(stat.getPath());
}
}
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
return logsToScanCount;
}
@ -418,6 +423,18 @@ private AppLogs getAndSetAppLogs(ApplicationId applicationId)
appDirPath = getActiveAppPath(applicationId);
if (fs.exists(appDirPath)) {
appState = AppState.ACTIVE;
} else {
// check for user directory inside active path
RemoteIterator<FileStatus> iter = list(activeRootPath);
while (iter.hasNext()) {
Path child = new Path(iter.next().getPath().getName(),
applicationId.toString());
appDirPath = new Path(activeRootPath, child);
if (fs.exists(appDirPath)) {
appState = AppState.ACTIVE;
break;
}
}
}
}
if (appState != AppState.UNKNOWN) {

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.AfterClass;
@ -58,7 +60,6 @@
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@ -91,6 +92,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static ApplicationId mainTestAppId;
private static Path mainTestAppDirPath;
private static Path testDoneDirPath;
private static Path testActiveDirPath;
private static String mainEntityLogFileName;
private EntityGroupFSTimelineStore store;
@ -125,22 +127,27 @@ public static void setupClass() throws Exception {
+ i);
sampleAppIds.add(appId);
}
testActiveDirPath = getTestRootPath("active");
// Among all sample applicationIds, choose the first one for most of the
// tests.
mainTestAppId = sampleAppIds.get(0);
mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
mainTestAppDirPath = new Path(testActiveDirPath, mainTestAppId.toString());
mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
testDoneDirPath = getTestRootPath("done");
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
testDoneDirPath.toString());
config.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
testActiveDirPath.toString());
}
@Before
public void setup() throws Exception {
for (ApplicationId appId : sampleAppIds) {
Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
Path attemotDirPath =
new Path(new Path(testActiveDirPath, appId.toString()),
getAttemptDirName(appId));
createTestFiles(appId, attemotDirPath);
}
@ -178,7 +185,7 @@ public void setup() throws Exception {
public void tearDown() throws Exception {
store.stop();
for (ApplicationId appId : sampleAppIds) {
fs.delete(getTestRootPath(appId.toString()), true);
fs.delete(new Path(testActiveDirPath,appId.toString()), true);
}
if (testJar != null) {
testJar.delete();
@ -414,8 +421,88 @@ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
}
@Test
public void testGetEntityPluginRead() throws Exception {
EntityGroupFSTimelineStore store = null;
ApplicationId appId =
ApplicationId.fromString("application_1501509265053_0001");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path userBase = new Path(testActiveDirPath, user);
Path userAppRoot = new Path(userBase, appId.toString());
Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
try {
store = createAndStartTimelineStore(AppState.ACTIVE);
String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
createTestFiles(appId, attemotDirPath, logFileName);
TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
entityNew.getEntityType(), EnumSet.allOf(Field.class));
assertNotNull(entity);
assertEquals(entityNew.getEntityId(), entity.getEntityId());
assertEquals(entityNew.getEntityType(), entity.getEntityType());
} finally {
if (store != null) {
store.stop();
}
fs.delete(userBase, true);
}
}
@Test
public void testScanActiveLogsAndMoveToDonePluginRead() throws Exception {
EntityGroupFSTimelineStore store = null;
ApplicationId appId =
ApplicationId.fromString("application_1501509265053_0002");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path userBase = new Path(testActiveDirPath, user);
Path userAppRoot = new Path(userBase, appId.toString());
Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
try {
store = createAndStartTimelineStore(AppState.COMPLETED);
String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
createTestFiles(appId, attemotDirPath, logFileName);
store.scanActiveLogs();
TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
entityNew.getEntityType(), EnumSet.allOf(Field.class));
assertNotNull(entity);
assertEquals(entityNew.getEntityId(), entity.getEntityId());
assertEquals(entityNew.getEntityType(), entity.getEntityType());
} finally {
if (store != null) {
store.stop();
}
fs.delete(userBase, true);
}
}
private EntityGroupFSTimelineStore createAndStartTimelineStore(
AppState appstate) {
// stop before creating new store to get the lock
store.stop();
EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
@Override
protected AppState getAppState(ApplicationId appId) throws IOException {
return appstate;
}
};
newStore.init(config);
newStore.setFs(fs);
newStore.start();
return newStore;
}
private void createTestFiles(ApplicationId appId, Path attemptDirPath)
throws IOException {
createTestFiles(appId, attemptDirPath, mainEntityLogFileName);
}
private void createTestFiles(ApplicationId appId, Path attemptDirPath,
String logPath) throws IOException {
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
PluginStoreTestUtils.writeEntities(entities,
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
@ -429,7 +516,7 @@ private void createTestFiles(ApplicationId appId, Path attemptDirPath)
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
PluginStoreTestUtils.writeEntities(entityList,
new Path(attemptDirPath, mainEntityLogFileName), fs);
new Path(attemptDirPath, logPath), fs);
FSDataOutputStream out = fs.create(
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));