From 250b50018e8c94d8ca83ff981b01f26bb68c0842 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 13 Sep 2018 13:28:54 -0500 Subject: [PATCH] YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker state. Contributed by Pradeep Ambati --- .../ResourceLocalizationService.java | 85 +++--- .../recovery/NMLeveldbStateStoreService.java | 175 ++++++++---- .../recovery/NMStateStoreService.java | 29 +- .../recovery/NMMemoryStateStoreService.java | 18 +- .../TestNMLeveldbStateStoreService.java | 269 +++++++++++++++--- 5 files changed, 418 insertions(+), 158 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index d9b887f56d..71f48acb37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -308,63 +308,66 @@ public void recoverLocalizedResources(RecoveredLocalizationState state) String user = userEntry.getKey(); RecoveredUserResources userResources = userEntry.getValue(); trackerState = userResources.getPrivateTrackerState(); - if (!trackerState.isEmpty()) { - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - null, dispatcher, true, super.getConfig(), stateStore, - dirsHandler); - LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + null, dispatcher, true, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, + tracker); + if (oldTracker != null) { + tracker = oldTracker; } + recoverTrackerResources(tracker, trackerState); for (Map.Entry appEntry : userResources.getAppTrackerStates().entrySet()) { trackerState = appEntry.getValue(); - if (!trackerState.isEmpty()) { - ApplicationId appId = appEntry.getKey(); - String appIdStr = appId.toString(); - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - appId, dispatcher, false, super.getConfig(), stateStore, - dirsHandler); - LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); + ApplicationId appId = appEntry.getKey(); + String appIdStr = appId.toString(); + LocalResourcesTracker tracker1 = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker1 = appRsrc.putIfAbsent(appIdStr, + tracker1); + if (oldTracker1 != null) { + tracker1 = oldTracker1; } + recoverTrackerResources(tracker1, trackerState); } } } } private void recoverTrackerResources(LocalResourcesTracker tracker, - LocalResourceTrackerState state) throws URISyntaxException { - for (LocalizedResourceProto proto : state.getLocalizedResources()) { - LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); - LocalResourceRequest req = new LocalResourceRequest(rsrc); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering localized resource " + req + " at " - + proto.getLocalPath()); + LocalResourceTrackerState state) throws URISyntaxException, IOException { + try (RecoveryIterator it = + state.getCompletedResourcesIterator()) { + while (it != null && it.hasNext()) { + LocalizedResourceProto proto = it.next(); + LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering localized resource " + req + " at " + + proto.getLocalPath()); + } + tracker.handle(new ResourceRecoveredEvent(req, + new Path(proto.getLocalPath()), proto.getSize())); } - tracker.handle(new ResourceRecoveredEvent(req, - new Path(proto.getLocalPath()), proto.getSize())); } - for (Map.Entry entry : - state.getInProgressResources().entrySet()) { - LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); - LocalResourceRequest req = new LocalResourceRequest(rsrc); - Path localPath = entry.getValue(); - tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); + try (RecoveryIterator> it = + state.getStartedResourcesIterator()) { + while (it != null && it.hasNext()) { + Map.Entry entry = it.next(); + LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + Path localPath = entry.getValue(); + tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); - // delete any in-progress localizations, containers will request again - LOG.info("Deleting in-progress localization for " + req + " at " - + localPath); - tracker.remove(tracker.getLocalizedResource(req), delService); + // delete any in-progress localizations, containers will request again + LOG.info("Deleting in-progress localization for " + req + " at " + + localPath); + tracker.remove(tracker.getLocalizedResource(req), delService); + } } // TODO: remove untracked directories in local filesystem diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 5d4253db9d..1d7771a9e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -67,6 +67,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.AbstractMap; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -154,6 +155,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; + /** + * The Local Tracker State DB key locations - "completed" and "started". + * To seek through app tracker states in RecoveredUserResources + * we need to move from one app tracker state to another using key "zzz". + * zzz comes later in lexicographical order than started. + * Similarly to move one user to another in RLS,we can use "zzz", + * as RecoveredUserResources uses two keys appcache and filecache. + */ + private static final String BEYOND_ENTRIES_SUFFIX = "zzz/"; + private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX = "/assignedResources_"; @@ -862,112 +873,154 @@ private Entry getNextRecoveredPrivateLocalizatio public RecoveredLocalizationState loadLocalizationState() throws IOException { RecoveredLocalizationState state = new RecoveredLocalizationState(); - LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); - state.publicTrackerState = loadResourceTrackerState(it, + state.publicTrackerState = loadResourceTrackerState( LOCALIZATION_PUBLIC_KEY_PREFIX); state.it = new UserResourcesIterator(); return state; } - private LocalResourceTrackerState loadResourceTrackerState( - LeveldbIterator iter, String keyPrefix) throws IOException { + private LocalResourceTrackerState loadResourceTrackerState(String keyPrefix) + throws IOException { final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX; final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; - LocalResourceTrackerState state = new LocalResourceTrackerState(); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); - String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; - } - if (key.startsWith(completedPrefix)) { - state.localizedResources = loadCompletedResources(iter, - completedPrefix); - } else if (key.startsWith(startedPrefix)) { - state.inProgressResources = loadStartedResources(iter, startedPrefix); - } else { - throw new IOException("Unexpected key in resource tracker state: " - + key); - } - } + RecoveryIterator crIt = + new CompletedResourcesIterator(completedPrefix); + RecoveryIterator> srIt = + new StartedResourcesIterator(startedPrefix); - return state; + return new LocalResourceTrackerState(crIt, srIt); } - private List loadCompletedResources( + private class CompletedResourcesIterator extends + BaseRecoveryIterator { + private String startKey; + CompletedResourcesIterator(String startKey) throws IOException { + super(startKey); + this.startKey = startKey; + } + + @Override + protected LocalizedResourceProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextCompletedResource(it, startKey); + } + } + + private LocalizedResourceProto getNextCompletedResource( LeveldbIterator iter, String keyPrefix) throws IOException { - List rsrcs = - new ArrayList(); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + LocalizedResourceProto nextCompletedResource = null; + if (iter.hasNext()){ + Entry entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { - break; + return null; } if (LOG.isDebugEnabled()) { LOG.debug("Loading completed resource from " + key); } - rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue())); - iter.next(); + nextCompletedResource = LocalizedResourceProto.parseFrom( + entry.getValue()); } - - return rsrcs; + return nextCompletedResource; } - private Map loadStartedResources( + private class StartedResourcesIterator extends + BaseRecoveryIterator> { + private String startKey; + StartedResourcesIterator(String startKey) throws IOException { + super(startKey); + this.startKey = startKey; + } + + @Override + protected Entry getNextItem(LeveldbIterator it) + throws IOException { + return getNextStartedResource(it, startKey); + } + } + + private Entry getNextStartedResource( LeveldbIterator iter, String keyPrefix) throws IOException { - Map rsrcs = - new HashMap(); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry nextStartedResource = null; + if (iter.hasNext()){ + Entry entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { - break; + return null; } Path localPath = new Path(key.substring(keyPrefix.length())); if (LOG.isDebugEnabled()) { LOG.debug("Loading in-progress resource at " + localPath); } - rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath); - iter.next(); + nextStartedResource = new SimpleEntry( + LocalResourceProto.parseFrom(entry.getValue()), localPath); } + return nextStartedResource; + } - return rsrcs; + private void seekPastPrefix(LeveldbIterator iter, String keyPrefix) + throws IOException { + try{ + iter.seek(bytes(keyPrefix + BEYOND_ENTRIES_SUFFIX)); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (key.startsWith(keyPrefix)) { + iter.next(); + } else { + break; + } + } + } catch (DBException e) { + throw new IOException(e); + } } private RecoveredUserResources loadUserLocalizedResources( LeveldbIterator iter, String keyPrefix) throws IOException { RecoveredUserResources userResources = new RecoveredUserResources(); + + // seek through App cache + String appCachePrefix = keyPrefix + LOCALIZATION_APPCACHE_SUFFIX; + iter.seek(bytes(appCachePrefix)); while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry entry = iter.peekNext(); String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { + + if (!key.startsWith(appCachePrefix)) { break; } - if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) { - userResources.privateTrackerState = loadResourceTrackerState(iter, - keyPrefix + LOCALIZATION_FILECACHE_SUFFIX); - } else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX, - keyPrefix.length())) { - int appIdStartPos = keyPrefix.length() + - LOCALIZATION_APPCACHE_SUFFIX.length(); - int appIdEndPos = key.indexOf('/', appIdStartPos); - if (appIdEndPos < 0) { - throw new IOException("Unable to determine appID in resource key: " - + key); - } - ApplicationId appId = ApplicationId.fromString( - key.substring(appIdStartPos, appIdEndPos)); - userResources.appTrackerStates.put(appId, - loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1))); - } else { - throw new IOException("Unexpected user resource key " + key); + int appIdStartPos = appCachePrefix.length(); + int appIdEndPos = key.indexOf('/', appIdStartPos); + if (appIdEndPos < 0) { + throw new IOException("Unable to determine appID in resource key: " + + key); } + ApplicationId appId = ApplicationId.fromString( + key.substring(appIdStartPos, appIdEndPos)); + String trackerStateKey = key.substring(0, appIdEndPos+1); + userResources.appTrackerStates.put(appId, + loadResourceTrackerState(trackerStateKey)); + // Seek to next application + seekPastPrefix(iter, trackerStateKey); } + + // File Cache + String fileCachePrefix = keyPrefix + LOCALIZATION_FILECACHE_SUFFIX; + iter.seek(bytes(fileCachePrefix)); + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (key.startsWith(fileCachePrefix)) { + userResources.privateTrackerState = + loadResourceTrackerState(fileCachePrefix); + } + + // seek to Next User. + seekPastPrefix(iter, keyPrefix); return userResources; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 35caec9a47..c6d8a1368a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -220,27 +219,31 @@ public void setResourceMappings(ResourceMappings mappings) { } public static class LocalResourceTrackerState { - List localizedResources = - new ArrayList(); - Map inProgressResources = - new HashMap(); + final private RecoveryIterator + completedResourcesIterator; + final private RecoveryIterator> + startedResourcesIterator; - public List getLocalizedResources() { - return localizedResources; + LocalResourceTrackerState(RecoveryIterator crIt, + RecoveryIterator> srIt) { + this.completedResourcesIterator = crIt; + this.startedResourcesIterator = srIt; } - public Map getInProgressResources() { - return inProgressResources; + public RecoveryIterator + getCompletedResourcesIterator() { + return completedResourcesIterator; } - public boolean isEmpty() { - return localizedResources.isEmpty() && inProgressResources.isEmpty(); + public RecoveryIterator> + getStartedResourcesIterator() { + return startedResourcesIterator; } } public static class RecoveredUserResources { LocalResourceTrackerState privateTrackerState = - new LocalResourceTrackerState(); + new LocalResourceTrackerState(null, null); Map appTrackerStates = new HashMap(); @@ -256,7 +259,7 @@ public LocalResourceTrackerState getPrivateTrackerState() { public static class RecoveredLocalizationState { LocalResourceTrackerState publicTrackerState = - new LocalResourceTrackerState(); + new LocalResourceTrackerState(null, null); RecoveryIterator> it = null; public LocalResourceTrackerState getPublicTrackerState() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 9658ecdf63..54842a9146 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -292,13 +292,23 @@ private RecoveredContainerState getRecoveredContainerState( } private LocalResourceTrackerState loadTrackerState(TrackerState ts) { - LocalResourceTrackerState result = new LocalResourceTrackerState(); - result.localizedResources.addAll(ts.localizedResources.values()); + List completedResources = + new ArrayList(ts.localizedResources.values()); + RecoveryIterator crIt = + new NMMemoryRecoveryIterator( + completedResources.iterator()); + + Map inProgressMap = + new HashMap(); for (Map.Entry entry : ts.inProgressMap.entrySet()) { - result.inProgressResources.put(entry.getValue(), entry.getKey()); + inProgressMap.put(entry.getValue(), entry.getKey()); } - return result; + RecoveryIterator> srIt = + new NMMemoryRecoveryIterator>( + inProgressMap.entrySet().iterator()); + + return new LocalResourceTrackerState(crIt, srIt); } private TrackerState getTrackerState(TrackerKey key) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index fcbbc52a3c..87208f7649 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -192,6 +192,28 @@ private Map loadContainerTokens( return containerTokens; } + private List loadCompletedResources( + RecoveryIterator it) throws IOException { + List completedResources = + new ArrayList(); + while (it != null && it.hasNext()) { + completedResources.add(it.next()); + } + return completedResources; + } + + private Map loadStartedResources( + RecoveryIterator > it) + throws IOException { + Map startedResources = + new HashMap(); + while (it != null &&it.hasNext()) { + Map.Entry entry = it.next(); + startedResources.put(entry.getKey(), entry.getValue()); + } + return startedResources; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -207,8 +229,10 @@ private void verifyEmptyState() throws IOException { assertNotNull(state); LocalResourceTrackerState pubts = state.getPublicTrackerState(); assertNotNull(pubts); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertTrue(pubts.getInProgressResources().isEmpty()); + assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator()) + .isEmpty()); + assertTrue(loadStartedResources(pubts.getStartedResourcesIterator()) + .isEmpty()); assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @@ -541,6 +565,111 @@ private StartContainerRequest createContainerRequestInternal(ContainerId return StartContainerRequest.newInstance(clc, containerToken); } + @Test + public void testLocalTrackerStateIterator() throws IOException { + String user1 = "somebody"; + ApplicationId appId1 = ApplicationId.newInstance(1, 1); + ApplicationId appId2 = ApplicationId.newInstance(2, 2); + + String user2 = "someone"; + ApplicationId appId3 = ApplicationId.newInstance(3, 3); + + // start and finish local resource for applications + Path appRsrcPath1 = new Path("hdfs://some/app/resource1"); + LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl) + LocalResource.newInstance( + URL.fromPath(appRsrcPath1), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto1 = rsrcPb1.getProto(); + Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1"); + Path appRsrcPath2 = new Path("hdfs://some/app/resource2"); + LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl) + LocalResource.newInstance( + URL.fromPath(appRsrcPath2), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto2 = rsrcPb2.getProto(); + Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2"); + Path appRsrcPath3 = new Path("hdfs://some/app/resource3"); + LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl) + LocalResource.newInstance( + URL.fromPath(appRsrcPath3), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto3 = rsrcPb3.getProto(); + Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2"); + + stateStore.startResourceLocalization(user1, appId1, appRsrcProto1, + appRsrcLocalPath1); + stateStore.startResourceLocalization(user1, appId2, appRsrcProto2, + appRsrcLocalPath2); + stateStore.startResourceLocalization(user2, appId3, appRsrcProto3, + appRsrcLocalPath3); + + LocalizedResourceProto appLocalizedProto1 = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto1) + .setLocalPath(appRsrcLocalPath1.toString()) + .setSize(1234567L) + .build(); + LocalizedResourceProto appLocalizedProto2 = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto2) + .setLocalPath(appRsrcLocalPath2.toString()) + .setSize(1234567L) + .build(); + LocalizedResourceProto appLocalizedProto3 = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto3) + .setLocalPath(appRsrcLocalPath3.toString()) + .setSize(1234567L) + .build(); + + + stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1); + stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2); + stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3); + + + List completedResources = + new ArrayList(); + Map startedResources = + new HashMap(); + + // restart and verify two users exist and two apps completed for user1. + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + Map userResources = + loadUserResources(state.getIterator()); + assertEquals(2, userResources.size()); + + RecoveredUserResources uResource = userResources.get(user1); + assertEquals(2, uResource.getAppTrackerStates().size()); + LocalResourceTrackerState app1ts = + uResource.getAppTrackerStates().get(appId1); + assertNotNull(app1ts); + completedResources = loadCompletedResources( + app1ts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + app1ts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); + assertEquals(appLocalizedProto1, + completedResources.iterator().next()); + LocalResourceTrackerState app2ts = + uResource.getAppTrackerStates().get(appId2); + assertNotNull(app2ts); + completedResources = loadCompletedResources( + app2ts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + app2ts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); + assertEquals(appLocalizedProto2, + completedResources.iterator().next()); + } + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; @@ -558,27 +687,44 @@ public void testStartResourceLocalization() throws IOException { stateStore.startResourceLocalization(user, appId, appRsrcProto, appRsrcLocalPath); + List completedResources = + new ArrayList(); + Map startedResources = + new HashMap(); + // restart and verify only app resource is marked in-progress restartStateStore(); RecoveredLocalizationState state = stateStore.loadLocalizationState(); LocalResourceTrackerState pubts = state.getPublicTrackerState(); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertTrue(pubts.getInProgressResources().isEmpty()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertTrue(startedResources.isEmpty()); Map userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertTrue(privts.getLocalizedResources().isEmpty()); - assertTrue(privts.getInProgressResources().isEmpty()); + completedResources = loadCompletedResources( + privts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + privts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertTrue(startedResources.isEmpty()); assertEquals(1, rur.getAppTrackerStates().size()); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getLocalizedResources().isEmpty()); - assertEquals(1, appts.getInProgressResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(1, startedResources.size()); assertEquals(appRsrcLocalPath, - appts.getInProgressResources().get(appRsrcProto)); + startedResources.get(appRsrcProto)); // start some public and private resources Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); @@ -613,28 +759,40 @@ public void testStartResourceLocalization() throws IOException { restartStateStore(); state = stateStore.loadLocalizationState(); pubts = state.getPublicTrackerState(); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertEquals(2, pubts.getInProgressResources().size()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(2, startedResources.size()); assertEquals(pubRsrcLocalPath1, - pubts.getInProgressResources().get(pubRsrcProto1)); + startedResources.get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, - pubts.getInProgressResources().get(pubRsrcProto2)); + startedResources.get(pubRsrcProto2)); userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertTrue(privts.getLocalizedResources().isEmpty()); - assertEquals(1, privts.getInProgressResources().size()); + completedResources = loadCompletedResources( + privts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + privts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(1, startedResources.size()); assertEquals(privRsrcLocalPath, - privts.getInProgressResources().get(privRsrcProto)); + startedResources.get(privRsrcProto)); assertEquals(1, rur.getAppTrackerStates().size()); appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getLocalizedResources().isEmpty()); - assertEquals(1, appts.getInProgressResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(1, startedResources.size()); assertEquals(appRsrcLocalPath, - appts.getInProgressResources().get(appRsrcProto)); + startedResources.get(appRsrcProto)); } @Test @@ -661,27 +819,44 @@ public void testFinishResourceLocalization() throws IOException { .build(); stateStore.finishResourceLocalization(user, appId, appLocalizedProto); + List completedResources = + new ArrayList(); + Map startedResources = + new HashMap(); + // restart and verify only app resource is completed restartStateStore(); RecoveredLocalizationState state = stateStore.loadLocalizationState(); LocalResourceTrackerState pubts = state.getPublicTrackerState(); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertTrue(pubts.getInProgressResources().isEmpty()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertTrue(startedResources.isEmpty()); Map userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertTrue(privts.getLocalizedResources().isEmpty()); - assertTrue(privts.getInProgressResources().isEmpty()); + completedResources = loadCompletedResources( + privts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + privts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertTrue(startedResources.isEmpty()); assertEquals(1, rur.getAppTrackerStates().size()); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getInProgressResources().isEmpty()); - assertEquals(1, appts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); assertEquals(appLocalizedProto, - appts.getLocalizedResources().iterator().next()); + completedResources.iterator().next()); // start some public and private resources Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); @@ -732,28 +907,40 @@ public void testFinishResourceLocalization() throws IOException { restartStateStore(); state = stateStore.loadLocalizationState(); pubts = state.getPublicTrackerState(); - assertEquals(1, pubts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertEquals(1, completedResources.size()); assertEquals(pubLocalizedProto1, - pubts.getLocalizedResources().iterator().next()); - assertEquals(1, pubts.getInProgressResources().size()); + completedResources.iterator().next()); + assertEquals(1, startedResources.size()); assertEquals(pubRsrcLocalPath2, - pubts.getInProgressResources().get(pubRsrcProto2)); + startedResources.get(pubRsrcProto2)); userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertEquals(1, privts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + privts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + privts.getStartedResourcesIterator()); + assertEquals(1, completedResources.size()); assertEquals(privLocalizedProto, - privts.getLocalizedResources().iterator().next()); - assertTrue(privts.getInProgressResources().isEmpty()); + completedResources.iterator().next()); + assertTrue(startedResources.isEmpty()); assertEquals(1, rur.getAppTrackerStates().size()); appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getInProgressResources().isEmpty()); - assertEquals(1, appts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); assertEquals(appLocalizedProto, - appts.getLocalizedResources().iterator().next()); + completedResources.iterator().next()); } @Test @@ -841,10 +1028,14 @@ public void testRemoveLocalizedResource() throws IOException { restartStateStore(); RecoveredLocalizationState state = stateStore.loadLocalizationState(); LocalResourceTrackerState pubts = state.getPublicTrackerState(); - assertTrue(pubts.getInProgressResources().isEmpty()); - assertEquals(1, pubts.getLocalizedResources().size()); + List completedResources = + loadCompletedResources(pubts.getCompletedResourcesIterator()); + Map startedResources = + loadStartedResources(pubts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); assertEquals(pubLocalizedProto1, - pubts.getLocalizedResources().iterator().next()); + completedResources.iterator().next()); Map userResources = loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty());