YARN-4987. Read cache concurrency issue between read and evict in EntityGroupFS timeline store. Contributed by Li Lu.

This commit is contained in:
Junping Du 2016-05-27 06:58:32 -07:00
parent bde819abbb
commit 705286ccae
4 changed files with 326 additions and 91 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -30,6 +32,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cache item for timeline server v1.5 reader cache. Each cache item has a
@ -40,12 +43,17 @@ public class EntityCacheItem {
= LoggerFactory.getLogger(EntityCacheItem.class);
private TimelineStore store;
private TimelineEntityGroupId groupId;
private EntityGroupFSTimelineStore.AppLogs appLogs;
private long lastRefresh;
private Configuration config;
private FileSystem fs;
private int refCount = 0;
private static AtomicInteger activeStores = new AtomicInteger(0);
public EntityCacheItem(Configuration config, FileSystem fs) {
public EntityCacheItem(TimelineEntityGroupId gId, Configuration config,
FileSystem fs) {
this.groupId = gId;
this.config = config;
this.fs = fs;
}
@ -70,17 +78,24 @@ public synchronized void setAppLogs(
/**
* @return The timeline store, either loaded or unloaded, of this cache item.
* This method will not hold the storage from being reclaimed.
*/
public synchronized TimelineStore getStore() {
return store;
}
/**
* @return The number of currently active stores in all CacheItems.
*/
public static int getActiveStores() {
return activeStores.get();
}
/**
* Refresh this cache item if it needs refresh. This will enforce an appLogs
* rescan and then load new data. The refresh process is synchronized with
* other operations on the same cache item.
*
* @param groupId Group id of the cache
* @param aclManager ACL manager for the timeline storage
* @param jsonFactory JSON factory for the storage
* @param objMapper Object mapper for the storage
@ -89,10 +104,9 @@ public synchronized TimelineStore getStore() {
* object filled with all entities in the group.
* @throws IOException
*/
public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
TimelineACLsManager aclManager, JsonFactory jsonFactory,
ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
throws IOException {
public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
JsonFactory jsonFactory, ObjectMapper objMapper,
EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
if (needRefresh()) {
long startTime = Time.monotonicNow();
// If an application is not finished, we only update summary logs (and put
@ -105,6 +119,7 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
}
if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) {
activeStores.getAndIncrement();
store = new LevelDBCacheTimelineStore(groupId.toString(),
"LeveldbCache." + groupId);
store.init(config);
@ -148,11 +163,35 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
}
/**
* Release the cache item for the given group id.
*
* @param groupId the group id that the cache should release
* Increase the number of references to this cache item by 1.
*/
public synchronized void releaseCache(TimelineEntityGroupId groupId) {
public synchronized void incrRefs() {
refCount++;
}
/**
* Unregister a reader. Try to release the cache if the reader to current
* cache reaches 0.
*
* @return true if the cache has been released, otherwise false
*/
public synchronized boolean tryRelease() {
refCount--;
// Only reclaim the storage if there is no reader.
if (refCount > 0) {
LOG.debug("{} references left for cached group {}, skipping the release",
refCount, groupId);
return false;
}
forceRelease();
return true;
}
/**
* Force releasing the cache item for the given group id, even though there
* may be active references.
*/
public synchronized void forceRelease() {
try {
if (store != null) {
store.close();
@ -161,12 +200,21 @@ public synchronized void releaseCache(TimelineEntityGroupId groupId) {
LOG.warn("Error closing timeline store", e);
}
store = null;
activeStores.getAndDecrement();
refCount = 0;
// reset offsets so next time logs are re-parsed
for (LogInfo log : appLogs.getDetailLogs()) {
if (log.getFilename().contains(groupId.toString())) {
log.setOffset(0);
}
}
LOG.debug("Cache for group {} released. ", groupId);
}
@InterfaceAudience.Private
@VisibleForTesting
synchronized int getRefCount() {
return refCount;
}
private boolean needRefresh() {

View File

@ -108,6 +108,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
+ "%04d" + Path.SEPARATOR // app num / 1,000,000
+ "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
+ "%s" + Path.SEPARATOR; // full app id
// Indicates when to force release a cache item even if there are active
// readers. Enlarge this factor may increase memory usage for the reader since
// there may be more cache items "hanging" in memory but not in cache.
private static final int CACHE_ITEM_OVERFLOW_FACTOR = 2;
private YarnClient yarnClient;
private TimelineStore summaryStore;
@ -172,7 +176,15 @@ protected boolean removeEldestEntry(
TimelineEntityGroupId groupId = eldest.getKey();
LOG.debug("Evicting {} due to space limitations", groupId);
EntityCacheItem cacheItem = eldest.getValue();
cacheItem.releaseCache(groupId);
int activeStores = EntityCacheItem.getActiveStores();
if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) {
LOG.debug("Force release cache {} since {} stores are active",
groupId, activeStores);
cacheItem.forceRelease();
} else {
LOG.debug("Try release cache {}", groupId);
cacheItem.tryRelease();
}
if (cacheItem.getAppLogs().isDone()) {
appIdLogMap.remove(groupId.getApplicationId());
}
@ -826,17 +838,19 @@ void setFs(FileSystem incomingFs) {
@InterfaceAudience.Private
@VisibleForTesting
void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
cacheItem.incrRefs();
cachedLogs.put(groupId, cacheItem);
}
private List<TimelineStore> getTimelineStoresFromCacheIds(
Set<TimelineEntityGroupId> groupIds, String entityType)
Set<TimelineEntityGroupId> groupIds, String entityType,
List<EntityCacheItem> cacheItems)
throws IOException {
List<TimelineStore> stores = new LinkedList<TimelineStore>();
// For now we just handle one store in a context. We return the first
// non-null storage for the group ids.
for (TimelineEntityGroupId groupId : groupIds) {
TimelineStore storeForId = getCachedStore(groupId);
TimelineStore storeForId = getCachedStore(groupId, cacheItems);
if (storeForId != null) {
LOG.debug("Adding {} as a store for the query", storeForId.getName());
stores.add(storeForId);
@ -851,8 +865,9 @@ private List<TimelineStore> getTimelineStoresFromCacheIds(
return stores;
}
private List<TimelineStore> getTimelineStoresForRead(String entityId,
String entityType) throws IOException {
protected List<TimelineStore> getTimelineStoresForRead(String entityId,
String entityType, List<EntityCacheItem> cacheItems)
throws IOException {
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
LOG.debug("Trying plugin {} for id {} and type {}",
@ -871,12 +886,12 @@ private List<TimelineStore> getTimelineStoresForRead(String entityId,
cacheIdPlugin.getClass().getName());
}
}
return getTimelineStoresFromCacheIds(groupIds, entityType);
return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
}
private List<TimelineStore> getTimelineStoresForRead(String entityType,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
throws IOException {
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
List<EntityCacheItem> cacheItems) throws IOException {
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
Set<TimelineEntityGroupId> idsFromPlugin =
@ -888,24 +903,26 @@ private List<TimelineStore> getTimelineStoresForRead(String entityType,
groupIds.addAll(idsFromPlugin);
}
}
return getTimelineStoresFromCacheIds(groupIds, entityType);
return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
}
// find a cached timeline store or null if it cannot be located
private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
throws IOException {
private TimelineStore getCachedStore(TimelineEntityGroupId groupId,
List<EntityCacheItem> cacheItems) throws IOException {
EntityCacheItem cacheItem;
synchronized (this.cachedLogs) {
// Note that the content in the cache log storage may be stale.
cacheItem = this.cachedLogs.get(groupId);
if (cacheItem == null) {
LOG.debug("Set up new cache item for id {}", groupId);
cacheItem = new EntityCacheItem(getConfig(), fs);
cacheItem = new EntityCacheItem(groupId, getConfig(), fs);
AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
if (appLogs != null) {
LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
cacheItem.setAppLogs(appLogs);
this.cachedLogs.put(groupId, cacheItem);
// Add the reference by the cache
cacheItem.incrRefs();
} else {
LOG.warn("AppLogs for groupId {} is set to null!", groupId);
}
@ -915,30 +932,43 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
if (cacheItem.getAppLogs() != null) {
AppLogs appLogs = cacheItem.getAppLogs();
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
objMapper, metrics);
// Add the reference by the store
cacheItem.incrRefs();
cacheItems.add(cacheItem);
store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper,
metrics);
} else {
LOG.warn("AppLogs for group id {} is null", groupId);
}
return store;
}
protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) {
for (EntityCacheItem item : relatedCacheItems) {
item.tryRelease();
}
}
@Override
public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
List<TimelineStore> stores = getTimelineStoresForRead(entityType,
primaryFilter, secondaryFilters);
primaryFilter, secondaryFilters, relatedCacheItems);
TimelineEntities returnEntities = new TimelineEntities();
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {} for the request", store.getName());
returnEntities.addEntities(
store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
checkAcl).getEntities());
TimelineEntities entities = store.getEntities(entityType, limit,
windowStart, windowEnd, fromId, fromTs, primaryFilter,
secondaryFilters, fieldsToRetrieve, checkAcl);
if (entities != null) {
returnEntities.addEntities(entities.getEntities());
}
}
tryReleaseCacheItems(relatedCacheItems);
return returnEntities;
}
@ -946,17 +976,21 @@ public TimelineEntities getEntities(String entityType, Long limit,
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) throws IOException {
LOG.debug("getEntity type={} id={}", entityType, entityId);
List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType,
relatedCacheItems);
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
store.toString());
TimelineEntity e =
store.getEntity(entityId, entityType, fieldsToRetrieve);
if (e != null) {
tryReleaseCacheItems(relatedCacheItems);
return e;
}
}
LOG.debug("getEntity: Found nothing");
tryReleaseCacheItems(relatedCacheItems);
return null;
}
@ -966,10 +1000,11 @@ public TimelineEvents getEntityTimelines(String entityType,
Long windowEnd, Set<String> eventTypes) throws IOException {
LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
TimelineEvents returnEvents = new TimelineEvents();
List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
for (String entityId : entityIds) {
LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
List<TimelineStore> stores
= getTimelineStoresForRead(entityId, entityType);
= getTimelineStoresForRead(entityId, entityType, relatedCacheItems);
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
store.toString());
@ -978,9 +1013,12 @@ public TimelineEvents getEntityTimelines(String entityType,
TimelineEvents events =
store.getEntityTimelines(entityType, entityIdSet, limit,
windowStart, windowEnd, eventTypes);
returnEvents.addEvents(events.getAllEvents());
if (events != null) {
returnEvents.addEvents(events.getAllEvents());
}
}
}
tryReleaseCacheItems(relatedCacheItems);
return returnEvents;
}

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import java.util.Collection;
import java.util.Set;
@ -26,31 +28,32 @@
class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
private static TimelineEntityGroupId timelineEntityGroupId
= TimelineEntityGroupId.newInstance(
TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
static final String APP_ID_FILTER_NAME = "appid";
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
NameValuePair primaryFilter,
Collection<NameValuePair> secondaryFilters) {
return Sets.newHashSet(timelineEntityGroupId);
ApplicationId appId
= ConverterUtils.toApplicationId(primaryFilter.getValue().toString());
return Sets.newHashSet(getStandardTimelineGroupId(appId));
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
String entityType) {
return Sets.newHashSet(timelineEntityGroupId);
ApplicationId appId = ConverterUtils.toApplicationId(entityId);
return Sets.newHashSet(getStandardTimelineGroupId(appId));
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
SortedSet<String> entityIds,
Set<String> eventTypes) {
return Sets.newHashSet(timelineEntityGroupId);
return Sets.newHashSet();
}
static TimelineEntityGroupId getStandardTimelineGroupId() {
return timelineEntityGroupId;
static TimelineEntityGroupId getStandardTimelineGroupId(ApplicationId appId) {
return TimelineEntityGroupId.newInstance(appId, "test");
}
}

View File

@ -44,8 +44,17 @@
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import static org.junit.Assert.assertEquals;
@ -53,24 +62,15 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static final String SAMPLE_APP_NAME = "1234_5678";
private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000";
private static final int CACHE_TEST_CACHE_SIZE = 5;
static final ApplicationId TEST_APPLICATION_ID
= ConverterUtils.toApplicationId(
ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
private static final String TEST_APP_DIR_NAME
= TEST_APPLICATION_ID.toString();
private static final String TEST_ATTEMPT_DIR_NAME
= ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
private static final String TEST_SUMMARY_LOG_FILE_NAME
= EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
private static final String TEST_ENTITY_LOG_FILE_NAME
= EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId();
private static final String TEST_DOMAIN_LOG_FILE_NAME
= EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
@ -78,9 +78,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
= new Path(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestEntityGroupFSTimelineStore.class.getSimpleName());
private static Path testAppDirPath;
private static Path testAttemptDirPath;
private static Path testDoneDirPath;
private static Configuration config = new YarnConfiguration();
private static MiniDFSCluster hdfsCluster;
@ -88,7 +85,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static FileContext fc;
private static FileContextTestHelper fileContextTestHelper =
new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore");
private EntityGroupFSTimelineStore store;
private static List<ApplicationId> sampleAppIds;
private static ApplicationId mainTestAppId;
private static Path mainTestAppDirPath;
private static Path testDoneDirPath;
private static String mainEntityLogFileName;
private EntityGroupFSTimelineStoreForTest store;
private TimelineEntity entityNew;
@Rule
@ -101,23 +105,44 @@ public static void setupClass() throws Exception {
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
"YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
config.setInt(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
CACHE_TEST_CACHE_SIZE);
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster
= new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
fs = hdfsCluster.getFileSystem();
fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
testAppDirPath = getTestRootPath(TEST_APPLICATION_ID.toString());
testAttemptDirPath = new Path(testAppDirPath, TEST_ATTEMPT_DIR_NAME);
testDoneDirPath = getTestRootPath("done");
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, testDoneDirPath.toString());
sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1);
for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) {
ApplicationId appId = ConverterUtils.toApplicationId(
ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST
+ i);
sampleAppIds.add(appId);
}
// Among all sample applicationIds, choose the first one for most of the
// tests.
mainTestAppId = sampleAppIds.get(0);
mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
testDoneDirPath = getTestRootPath("done");
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
testDoneDirPath.toString());
}
@Before
public void setup() throws Exception {
createTestFiles();
store = new EntityGroupFSTimelineStore();
for (ApplicationId appId : sampleAppIds) {
Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
getAttemptDirName(appId));
createTestFiles(appId, attemotDirPath);
}
store = new EntityGroupFSTimelineStoreForTest();
if (currTestName.getMethodName().contains("Plugin")) {
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
EntityGroupPlugInForTest.class.getName());
@ -130,7 +155,9 @@ public void setup() throws Exception {
@After
public void tearDown() throws Exception {
store.stop();
fs.delete(testAppDirPath, true);
for (ApplicationId appId : sampleAppIds) {
fs.delete(getTestRootPath(appId.toString()), true);
}
}
@AfterClass
@ -144,7 +171,7 @@ public static void tearDownClass() throws Exception {
@Test
public void testAppLogsScanLogs() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
@ -160,14 +187,14 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
for (LogInfo log : detailLogs) {
String fileName = log.getFilename();
assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
assertEquals(fileName, mainEntityLogFileName);
}
}
@Test
public void testMoveToDone() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
Path pathBefore = appLogs.getAppDirPath();
appLogs.moveToDone();
@ -182,7 +209,7 @@ public void testParseSummaryLogs() throws Exception {
MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
long beforeScan = scanned.value();
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
@ -194,6 +221,9 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
public void testCleanLogs() throws Exception {
// Create test dirs and files
// Irrelevant file, should not be reclaimed
String appDirName = mainTestAppId.toString();
String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix
+ appDirName + "_1";
Path irrelevantFilePath = new Path(
testDoneDirPath, "irrelevant.log");
FSDataOutputStream stream = fs.create(irrelevantFilePath);
@ -204,29 +234,29 @@ public void testCleanLogs() throws Exception {
Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001");
// First application, untouched after creation
Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
Path appDirClean = new Path(doneAppHomeDir, appDirName);
Path attemptDirClean = new Path(appDirClean, attemptDirName);
fs.mkdirs(attemptDirClean);
Path filePath = new Path(attemptDirClean, "test.log");
stream = fs.create(filePath);
stream.close();
// Second application, one file touched after creation
Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1");
Path attemptDirHoldByFile
= new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
= new Path(appDirHoldByFile, attemptDirName);
fs.mkdirs(attemptDirHoldByFile);
Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
stream = fs.create(filePathHold);
stream.close();
// Third application, one dir touched after creation
Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2");
Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName);
fs.mkdirs(attemptDirHoldByDir);
Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
fs.mkdirs(dirPathHold);
// Fourth application, empty dirs
Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3");
Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName);
fs.mkdirs(attemptDirEmpty);
Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
fs.mkdirs(dirPathEmpty);
@ -274,12 +304,15 @@ public void testPluginRead() throws Exception {
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
// Load data and cache item, prepare timeline store by making a cache item
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
EntityCacheItem cacheItem = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
config, fs);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
cacheItem);
MutableCounterLong detailLogEntityRead =
store.metrics.getGetEntityToDetailOps();
MutableStat cacheRefresh = store.metrics.getCacheRefresh();
@ -291,16 +324,20 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
= PluginStoreTestUtils.getTdmWithStore(config, store);
// Verify single entity read
TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(),
EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
assertEquals(1, cacheItem.getRefCount());
assertEquals(1, EntityCacheItem.getActiveStores());
// Verify multiple entities read
TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
NameValuePair primaryFilter = new NameValuePair(
EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null,
null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertEquals(entities.getEntities().size(), 1);
assertEquals(1, entities.getEntities().size());
for (TimelineEntity entity : entities.getEntities()) {
assertEquals(entityNew.getStartTime(), entity.getStartTime());
}
@ -309,11 +346,79 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
}
@Test(timeout = 90000L)
public void testMultiplePluginRead() throws Exception {
Thread mainThread = Thread.currentThread();
mainThread.setName("testMain");
// Verify precondition
assertEquals(EntityGroupPlugInForTest.class.getName(),
store.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
// Prepare timeline store by making cache items
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
final EntityCacheItem cacheItem = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
config, fs);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
cacheItem);
// Launch the blocking read call in a future
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
FutureTask<TimelineEntity> blockingReader =
new FutureTask<>(new Callable<TimelineEntity>() {
public TimelineEntity call() throws Exception {
Thread currThread = Thread.currentThread();
currThread.setName("blockingReader");
return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
EnumSet.allOf(TimelineReader.Field.class));
}});
threadExecutor.execute(blockingReader);
try {
while (!store.testCacheReferenced) {
Thread.sleep(300);
}
} catch (InterruptedException e) {
fail("Interrupted on exception " + e);
}
// Try refill the cache after the first cache item is referenced
for (ApplicationId appId : sampleAppIds) {
// Skip the first appId since it's already in cache
if (appId.equals(mainTestAppId)) {
continue;
}
EntityGroupFSTimelineStore.AppLogs currAppLog =
store.new AppLogs(appId, getTestRootPath(appId.toString()),
AppState.COMPLETED);
EntityCacheItem item = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
config, fs);
item.setAppLogs(currAppLog);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
item);
}
// At this time, the cache item of the blocking reader should be evicted.
assertEquals(1, cacheItem.getRefCount());
store.testCanProceed = true;
TimelineEntity entity3 = blockingReader.get();
assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
assertEquals(0, cacheItem.getRefCount());
threadExecutor.shutdownNow();
}
@Test
public void testSummaryRead() throws Exception {
// Load data
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
MutableCounterLong summaryLogEntityRead
= store.metrics.getGetEntityToSummaryOps();
@ -331,28 +436,32 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
UserGroupInformation.getLoginUser());
assertEquals(entities.getEntities().size(), 1);
for (TimelineEntity entity : entities.getEntities()) {
assertEquals((Long) 123l, entity.getStartTime());
assertEquals((Long) 123L, entity.getStartTime());
}
// Verify metrics
assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
}
private void createTestFiles() throws IOException {
private void createTestFiles(ApplicationId appId, Path attemptDirPath)
throws IOException {
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
PluginStoreTestUtils.writeEntities(entities,
new Path(testAttemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
Map<String, Set<Object>> primaryFilters = new HashMap<>();
Set<Object> appSet = new HashSet<Object>();
appSet.add(appId.toString());
primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet);
entityNew = PluginStoreTestUtils
.createEntity("id_3", "type_3", 789l, null, null,
null, null, "domain_id_1");
.createEntity(appId.toString(), "type_3", 789L, null, null,
primaryFilters, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
PluginStoreTestUtils.writeEntities(entityList,
new Path(testAttemptDirPath, TEST_ENTITY_LOG_FILE_NAME), fs);
new Path(attemptDirPath, mainEntityLogFileName), fs);
FSDataOutputStream out = fs.create(
new Path(testAttemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
out.close();
}
@ -360,4 +469,41 @@ private static Path getTestRootPath(String pathString) {
return fileContextTestHelper.getTestRootPath(fc, pathString);
}
private static String getAttemptDirName(ApplicationId appId) {
return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
}
private static class EntityGroupFSTimelineStoreForTest
extends EntityGroupFSTimelineStore {
// Flags used for the concurrent testing environment
private volatile boolean testCanProceed = false;
private volatile boolean testCacheReferenced = false;
TimelineEntity getEntityBlocking(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) throws IOException {
List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
List<TimelineStore> stores = getTimelineStoresForRead(entityId,
entityType, relatedCacheItems);
testCacheReferenced = true;
try {
while (!testCanProceed) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
fail("Interrupted " + e);
}
for (TimelineStore store : stores) {
TimelineEntity e =
store.getEntity(entityId, entityType, fieldsToRetrieve);
if (e != null) {
tryReleaseCacheItems(relatedCacheItems);
return e;
}
}
tryReleaseCacheItems(relatedCacheItems);
return null;
}
}
}