YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker state. Contributed by Pradeep Ambati

This commit is contained in:
Jason Lowe 2018-09-13 13:28:54 -05:00
parent e1b242a984
commit 250b50018e
5 changed files with 418 additions and 158 deletions

View File

@ -308,63 +308,66 @@ public void recoverLocalizedResources(RecoveredLocalizationState state)
String user = userEntry.getKey(); String user = userEntry.getKey();
RecoveredUserResources userResources = userEntry.getValue(); RecoveredUserResources userResources = userEntry.getValue();
trackerState = userResources.getPrivateTrackerState(); trackerState = userResources.getPrivateTrackerState();
if (!trackerState.isEmpty()) { LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, null, dispatcher, true, super.getConfig(), stateStore,
null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
dirsHandler); LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker);
tracker); if (oldTracker != null) {
if (oldTracker != null) { tracker = oldTracker;
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
} }
recoverTrackerResources(tracker, trackerState);
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry : for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
userResources.getAppTrackerStates().entrySet()) { userResources.getAppTrackerStates().entrySet()) {
trackerState = appEntry.getValue(); trackerState = appEntry.getValue();
if (!trackerState.isEmpty()) { ApplicationId appId = appEntry.getKey();
ApplicationId appId = appEntry.getKey(); String appIdStr = appId.toString();
String appIdStr = appId.toString(); LocalResourcesTracker tracker1 = new LocalResourcesTrackerImpl(user,
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, appId, dispatcher, false, super.getConfig(), stateStore,
appId, dispatcher, false, super.getConfig(), stateStore, dirsHandler);
dirsHandler); LocalResourcesTracker oldTracker1 = appRsrc.putIfAbsent(appIdStr,
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, tracker1);
tracker); if (oldTracker1 != null) {
if (oldTracker != null) { tracker1 = oldTracker1;
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
} }
recoverTrackerResources(tracker1, trackerState);
} }
} }
} }
} }
private void recoverTrackerResources(LocalResourcesTracker tracker, private void recoverTrackerResources(LocalResourcesTracker tracker,
LocalResourceTrackerState state) throws URISyntaxException { LocalResourceTrackerState state) throws URISyntaxException, IOException {
for (LocalizedResourceProto proto : state.getLocalizedResources()) { try (RecoveryIterator<LocalizedResourceProto> it =
LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); state.getCompletedResourcesIterator()) {
LocalResourceRequest req = new LocalResourceRequest(rsrc); while (it != null && it.hasNext()) {
if (LOG.isDebugEnabled()) { LocalizedResourceProto proto = it.next();
LOG.debug("Recovering localized resource " + req + " at " LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+ proto.getLocalPath()); LocalResourceRequest req = new LocalResourceRequest(rsrc);
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering localized resource " + req + " at "
+ proto.getLocalPath());
}
tracker.handle(new ResourceRecoveredEvent(req,
new Path(proto.getLocalPath()), proto.getSize()));
} }
tracker.handle(new ResourceRecoveredEvent(req,
new Path(proto.getLocalPath()), proto.getSize()));
} }
for (Map.Entry<LocalResourceProto, Path> entry : try (RecoveryIterator<Map.Entry<LocalResourceProto, Path>> it =
state.getInProgressResources().entrySet()) { state.getStartedResourcesIterator()) {
LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); while (it != null && it.hasNext()) {
LocalResourceRequest req = new LocalResourceRequest(rsrc); Map.Entry<LocalResourceProto, Path> entry = it.next();
Path localPath = entry.getValue(); LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); LocalResourceRequest req = new LocalResourceRequest(rsrc);
Path localPath = entry.getValue();
tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
// delete any in-progress localizations, containers will request again // delete any in-progress localizations, containers will request again
LOG.info("Deleting in-progress localization for " + req + " at " LOG.info("Deleting in-progress localization for " + req + " at "
+ localPath); + localPath);
tracker.remove(tracker.getLocalizedResource(req), delService); tracker.remove(tracker.getLocalizedResource(req), delService);
}
} }
// TODO: remove untracked directories in local filesystem // TODO: remove untracked directories in local filesystem

View File

@ -67,6 +67,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -154,6 +155,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
/**
* The Local Tracker State DB key locations - "completed" and "started".
* To seek through app tracker states in RecoveredUserResources
* we need to move from one app tracker state to another using key "zzz".
* zzz comes later in lexicographical order than started.
* Similarly to move one user to another in RLS,we can use "zzz",
* as RecoveredUserResources uses two keys appcache and filecache.
*/
private static final String BEYOND_ENTRIES_SUFFIX = "zzz/";
private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX = private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
"/assignedResources_"; "/assignedResources_";
@ -862,112 +873,154 @@ private Entry<String, RecoveredUserResources> getNextRecoveredPrivateLocalizatio
public RecoveredLocalizationState loadLocalizationState() public RecoveredLocalizationState loadLocalizationState()
throws IOException { throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState(); RecoveredLocalizationState state = new RecoveredLocalizationState();
LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); state.publicTrackerState = loadResourceTrackerState(
state.publicTrackerState = loadResourceTrackerState(it,
LOCALIZATION_PUBLIC_KEY_PREFIX); LOCALIZATION_PUBLIC_KEY_PREFIX);
state.it = new UserResourcesIterator(); state.it = new UserResourcesIterator();
return state; return state;
} }
private LocalResourceTrackerState loadResourceTrackerState( private LocalResourceTrackerState loadResourceTrackerState(String keyPrefix)
LeveldbIterator iter, String keyPrefix) throws IOException { throws IOException {
final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX; final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX;
final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
LocalResourceTrackerState state = new LocalResourceTrackerState();
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
if (key.startsWith(completedPrefix)) { RecoveryIterator<LocalizedResourceProto> crIt =
state.localizedResources = loadCompletedResources(iter, new CompletedResourcesIterator(completedPrefix);
completedPrefix); RecoveryIterator<Entry<LocalResourceProto, Path>> srIt =
} else if (key.startsWith(startedPrefix)) { new StartedResourcesIterator(startedPrefix);
state.inProgressResources = loadStartedResources(iter, startedPrefix);
} else {
throw new IOException("Unexpected key in resource tracker state: "
+ key);
}
}
return state; return new LocalResourceTrackerState(crIt, srIt);
} }
private List<LocalizedResourceProto> loadCompletedResources( private class CompletedResourcesIterator extends
BaseRecoveryIterator<LocalizedResourceProto> {
private String startKey;
CompletedResourcesIterator(String startKey) throws IOException {
super(startKey);
this.startKey = startKey;
}
@Override
protected LocalizedResourceProto getNextItem(LeveldbIterator it)
throws IOException {
return getNextCompletedResource(it, startKey);
}
}
private LocalizedResourceProto getNextCompletedResource(
LeveldbIterator iter, String keyPrefix) throws IOException { LeveldbIterator iter, String keyPrefix) throws IOException {
List<LocalizedResourceProto> rsrcs = LocalizedResourceProto nextCompletedResource = null;
new ArrayList<LocalizedResourceProto>(); if (iter.hasNext()){
while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next();
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey()); String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) { if (!key.startsWith(keyPrefix)) {
break; return null;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading completed resource from " + key); LOG.debug("Loading completed resource from " + key);
} }
rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue())); nextCompletedResource = LocalizedResourceProto.parseFrom(
iter.next(); entry.getValue());
} }
return nextCompletedResource;
return rsrcs;
} }
private Map<LocalResourceProto, Path> loadStartedResources( private class StartedResourcesIterator extends
BaseRecoveryIterator<Entry<LocalResourceProto, Path>> {
private String startKey;
StartedResourcesIterator(String startKey) throws IOException {
super(startKey);
this.startKey = startKey;
}
@Override
protected Entry<LocalResourceProto, Path> getNextItem(LeveldbIterator it)
throws IOException {
return getNextStartedResource(it, startKey);
}
}
private Entry<LocalResourceProto, Path> getNextStartedResource(
LeveldbIterator iter, String keyPrefix) throws IOException { LeveldbIterator iter, String keyPrefix) throws IOException {
Map<LocalResourceProto, Path> rsrcs = Entry<LocalResourceProto, Path> nextStartedResource = null;
new HashMap<LocalResourceProto, Path>(); if (iter.hasNext()){
while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next();
Entry<byte[],byte[]> entry = iter.peekNext();
String key = asString(entry.getKey()); String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) { if (!key.startsWith(keyPrefix)) {
break; return null;
} }
Path localPath = new Path(key.substring(keyPrefix.length())); Path localPath = new Path(key.substring(keyPrefix.length()));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading in-progress resource at " + localPath); LOG.debug("Loading in-progress resource at " + localPath);
} }
rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath); nextStartedResource = new SimpleEntry<LocalResourceProto, Path>(
iter.next(); LocalResourceProto.parseFrom(entry.getValue()), localPath);
} }
return nextStartedResource;
}
return rsrcs; private void seekPastPrefix(LeveldbIterator iter, String keyPrefix)
throws IOException {
try{
iter.seek(bytes(keyPrefix + BEYOND_ENTRIES_SUFFIX));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (key.startsWith(keyPrefix)) {
iter.next();
} else {
break;
}
}
} catch (DBException e) {
throw new IOException(e);
}
} }
private RecoveredUserResources loadUserLocalizedResources( private RecoveredUserResources loadUserLocalizedResources(
LeveldbIterator iter, String keyPrefix) throws IOException { LeveldbIterator iter, String keyPrefix) throws IOException {
RecoveredUserResources userResources = new RecoveredUserResources(); RecoveredUserResources userResources = new RecoveredUserResources();
// seek through App cache
String appCachePrefix = keyPrefix + LOCALIZATION_APPCACHE_SUFFIX;
iter.seek(bytes(appCachePrefix));
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext(); Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey()); String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
if (!key.startsWith(appCachePrefix)) {
break; break;
} }
if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) { int appIdStartPos = appCachePrefix.length();
userResources.privateTrackerState = loadResourceTrackerState(iter, int appIdEndPos = key.indexOf('/', appIdStartPos);
keyPrefix + LOCALIZATION_FILECACHE_SUFFIX); if (appIdEndPos < 0) {
} else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX, throw new IOException("Unable to determine appID in resource key: "
keyPrefix.length())) { + key);
int appIdStartPos = keyPrefix.length() +
LOCALIZATION_APPCACHE_SUFFIX.length();
int appIdEndPos = key.indexOf('/', appIdStartPos);
if (appIdEndPos < 0) {
throw new IOException("Unable to determine appID in resource key: "
+ key);
}
ApplicationId appId = ApplicationId.fromString(
key.substring(appIdStartPos, appIdEndPos));
userResources.appTrackerStates.put(appId,
loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1)));
} else {
throw new IOException("Unexpected user resource key " + key);
} }
ApplicationId appId = ApplicationId.fromString(
key.substring(appIdStartPos, appIdEndPos));
String trackerStateKey = key.substring(0, appIdEndPos+1);
userResources.appTrackerStates.put(appId,
loadResourceTrackerState(trackerStateKey));
// Seek to next application
seekPastPrefix(iter, trackerStateKey);
} }
// File Cache
String fileCachePrefix = keyPrefix + LOCALIZATION_FILECACHE_SUFFIX;
iter.seek(bytes(fileCachePrefix));
Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (key.startsWith(fileCachePrefix)) {
userResources.privateTrackerState =
loadResourceTrackerState(fileCachePrefix);
}
// seek to Next User.
seekPastPrefix(iter, keyPrefix);
return userResources; return userResources;
} }

View File

@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -220,27 +219,31 @@ public void setResourceMappings(ResourceMappings mappings) {
} }
public static class LocalResourceTrackerState { public static class LocalResourceTrackerState {
List<LocalizedResourceProto> localizedResources = final private RecoveryIterator<LocalizedResourceProto>
new ArrayList<LocalizedResourceProto>(); completedResourcesIterator;
Map<LocalResourceProto, Path> inProgressResources = final private RecoveryIterator<Entry<LocalResourceProto, Path>>
new HashMap<LocalResourceProto, Path>(); startedResourcesIterator;
public List<LocalizedResourceProto> getLocalizedResources() { LocalResourceTrackerState(RecoveryIterator<LocalizedResourceProto> crIt,
return localizedResources; RecoveryIterator<Entry<LocalResourceProto, Path>> srIt) {
this.completedResourcesIterator = crIt;
this.startedResourcesIterator = srIt;
} }
public Map<LocalResourceProto, Path> getInProgressResources() { public RecoveryIterator<LocalizedResourceProto>
return inProgressResources; getCompletedResourcesIterator() {
return completedResourcesIterator;
} }
public boolean isEmpty() { public RecoveryIterator<Entry<LocalResourceProto, Path>>
return localizedResources.isEmpty() && inProgressResources.isEmpty(); getStartedResourcesIterator() {
return startedResourcesIterator;
} }
} }
public static class RecoveredUserResources { public static class RecoveredUserResources {
LocalResourceTrackerState privateTrackerState = LocalResourceTrackerState privateTrackerState =
new LocalResourceTrackerState(); new LocalResourceTrackerState(null, null);
Map<ApplicationId, LocalResourceTrackerState> appTrackerStates = Map<ApplicationId, LocalResourceTrackerState> appTrackerStates =
new HashMap<ApplicationId, LocalResourceTrackerState>(); new HashMap<ApplicationId, LocalResourceTrackerState>();
@ -256,7 +259,7 @@ public LocalResourceTrackerState getPrivateTrackerState() {
public static class RecoveredLocalizationState { public static class RecoveredLocalizationState {
LocalResourceTrackerState publicTrackerState = LocalResourceTrackerState publicTrackerState =
new LocalResourceTrackerState(); new LocalResourceTrackerState(null, null);
RecoveryIterator<Entry<String, RecoveredUserResources>> it = null; RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
public LocalResourceTrackerState getPublicTrackerState() { public LocalResourceTrackerState getPublicTrackerState() {

View File

@ -292,13 +292,23 @@ private RecoveredContainerState getRecoveredContainerState(
} }
private LocalResourceTrackerState loadTrackerState(TrackerState ts) { private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
LocalResourceTrackerState result = new LocalResourceTrackerState(); List<LocalizedResourceProto> completedResources =
result.localizedResources.addAll(ts.localizedResources.values()); new ArrayList<LocalizedResourceProto>(ts.localizedResources.values());
RecoveryIterator<LocalizedResourceProto> crIt =
new NMMemoryRecoveryIterator<LocalizedResourceProto>(
completedResources.iterator());
Map<LocalResourceProto, Path> inProgressMap =
new HashMap<LocalResourceProto, Path>();
for (Map.Entry<Path, LocalResourceProto> entry : for (Map.Entry<Path, LocalResourceProto> entry :
ts.inProgressMap.entrySet()) { ts.inProgressMap.entrySet()) {
result.inProgressResources.put(entry.getValue(), entry.getKey()); inProgressMap.put(entry.getValue(), entry.getKey());
} }
return result; RecoveryIterator<Map.Entry<LocalResourceProto, Path>> srIt =
new NMMemoryRecoveryIterator<Map.Entry<LocalResourceProto, Path>>(
inProgressMap.entrySet().iterator());
return new LocalResourceTrackerState(crIt, srIt);
} }
private TrackerState getTrackerState(TrackerKey key) { private TrackerState getTrackerState(TrackerKey key) {

View File

@ -192,6 +192,28 @@ private Map<ContainerId, Long> loadContainerTokens(
return containerTokens; return containerTokens;
} }
private List<LocalizedResourceProto> loadCompletedResources(
RecoveryIterator<LocalizedResourceProto> it) throws IOException {
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
while (it != null && it.hasNext()) {
completedResources.add(it.next());
}
return completedResources;
}
private Map<LocalResourceProto, Path> loadStartedResources(
RecoveryIterator <Map.Entry<LocalResourceProto, Path>> it)
throws IOException {
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
while (it != null &&it.hasNext()) {
Map.Entry<LocalResourceProto, Path> entry = it.next();
startedResources.put(entry.getKey(), entry.getValue());
}
return startedResources;
}
private void restartStateStore() throws IOException { private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock // need to close so leveldb releases database lock
if (stateStore != null) { if (stateStore != null) {
@ -207,8 +229,10 @@ private void verifyEmptyState() throws IOException {
assertNotNull(state); assertNotNull(state);
LocalResourceTrackerState pubts = state.getPublicTrackerState(); LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertNotNull(pubts); assertNotNull(pubts);
assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator())
assertTrue(pubts.getInProgressResources().isEmpty()); .isEmpty());
assertTrue(loadStartedResources(pubts.getStartedResourcesIterator())
.isEmpty());
assertTrue(loadUserResources(state.getIterator()).isEmpty()); assertTrue(loadUserResources(state.getIterator()).isEmpty());
} }
@ -541,6 +565,111 @@ private StartContainerRequest createContainerRequestInternal(ContainerId
return StartContainerRequest.newInstance(clc, containerToken); return StartContainerRequest.newInstance(clc, containerToken);
} }
@Test
public void testLocalTrackerStateIterator() throws IOException {
String user1 = "somebody";
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
ApplicationId appId2 = ApplicationId.newInstance(2, 2);
String user2 = "someone";
ApplicationId appId3 = ApplicationId.newInstance(3, 3);
// start and finish local resource for applications
Path appRsrcPath1 = new Path("hdfs://some/app/resource1");
LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath1),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto1 = rsrcPb1.getProto();
Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1");
Path appRsrcPath2 = new Path("hdfs://some/app/resource2");
LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath2),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto2 = rsrcPb2.getProto();
Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2");
Path appRsrcPath3 = new Path("hdfs://some/app/resource3");
LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath3),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto3 = rsrcPb3.getProto();
Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2");
stateStore.startResourceLocalization(user1, appId1, appRsrcProto1,
appRsrcLocalPath1);
stateStore.startResourceLocalization(user1, appId2, appRsrcProto2,
appRsrcLocalPath2);
stateStore.startResourceLocalization(user2, appId3, appRsrcProto3,
appRsrcLocalPath3);
LocalizedResourceProto appLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto1)
.setLocalPath(appRsrcLocalPath1.toString())
.setSize(1234567L)
.build();
LocalizedResourceProto appLocalizedProto2 =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto2)
.setLocalPath(appRsrcLocalPath2.toString())
.setSize(1234567L)
.build();
LocalizedResourceProto appLocalizedProto3 =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto3)
.setLocalPath(appRsrcLocalPath3.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1);
stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2);
stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3);
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
// restart and verify two users exist and two apps completed for user1.
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(2, userResources.size());
RecoveredUserResources uResource = userResources.get(user1);
assertEquals(2, uResource.getAppTrackerStates().size());
LocalResourceTrackerState app1ts =
uResource.getAppTrackerStates().get(appId1);
assertNotNull(app1ts);
completedResources = loadCompletedResources(
app1ts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
app1ts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto1,
completedResources.iterator().next());
LocalResourceTrackerState app2ts =
uResource.getAppTrackerStates().get(appId2);
assertNotNull(app2ts);
completedResources = loadCompletedResources(
app2ts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
app2ts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto2,
completedResources.iterator().next());
}
@Test @Test
public void testStartResourceLocalization() throws IOException { public void testStartResourceLocalization() throws IOException {
String user = "somebody"; String user = "somebody";
@ -558,27 +687,44 @@ public void testStartResourceLocalization() throws IOException {
stateStore.startResourceLocalization(user, appId, appRsrcProto, stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath); appRsrcLocalPath);
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
// restart and verify only app resource is marked in-progress // restart and verify only app resource is marked in-progress
restartStateStore(); restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState(); RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState(); LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertTrue(pubts.getInProgressResources().isEmpty()); pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
Map<String, RecoveredUserResources> userResources = Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator()); loadUserResources(state.getIterator());
assertEquals(1, userResources.size()); assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user); RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState(); LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts); assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertTrue(privts.getInProgressResources().isEmpty()); privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size()); assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts); assertNotNull(appts);
assertTrue(appts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertEquals(1, appts.getInProgressResources().size()); appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath, assertEquals(appRsrcLocalPath,
appts.getInProgressResources().get(appRsrcProto)); startedResources.get(appRsrcProto));
// start some public and private resources // start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
@ -613,28 +759,40 @@ public void testStartResourceLocalization() throws IOException {
restartStateStore(); restartStateStore();
state = stateStore.loadLocalizationState(); state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState(); pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertEquals(2, pubts.getInProgressResources().size()); pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(2, startedResources.size());
assertEquals(pubRsrcLocalPath1, assertEquals(pubRsrcLocalPath1,
pubts.getInProgressResources().get(pubRsrcProto1)); startedResources.get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2, assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2)); startedResources.get(pubRsrcProto2));
userResources = loadUserResources(state.getIterator()); userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size()); assertEquals(1, userResources.size());
rur = userResources.get(user); rur = userResources.get(user);
privts = rur.getPrivateTrackerState(); privts = rur.getPrivateTrackerState();
assertNotNull(privts); assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertEquals(1, privts.getInProgressResources().size()); privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(privRsrcLocalPath, assertEquals(privRsrcLocalPath,
privts.getInProgressResources().get(privRsrcProto)); startedResources.get(privRsrcProto));
assertEquals(1, rur.getAppTrackerStates().size()); assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId); appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts); assertNotNull(appts);
assertTrue(appts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertEquals(1, appts.getInProgressResources().size()); appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath, assertEquals(appRsrcLocalPath,
appts.getInProgressResources().get(appRsrcProto)); startedResources.get(appRsrcProto));
} }
@Test @Test
@ -661,27 +819,44 @@ public void testFinishResourceLocalization() throws IOException {
.build(); .build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto); stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
// restart and verify only app resource is completed // restart and verify only app resource is completed
restartStateStore(); restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState(); RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState(); LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertTrue(pubts.getInProgressResources().isEmpty()); pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
Map<String, RecoveredUserResources> userResources = Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator()); loadUserResources(state.getIterator());
assertEquals(1, userResources.size()); assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user); RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState(); LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts); assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty()); completedResources = loadCompletedResources(
assertTrue(privts.getInProgressResources().isEmpty()); privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size()); assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts); assertNotNull(appts);
assertTrue(appts.getInProgressResources().isEmpty()); completedResources = loadCompletedResources(
assertEquals(1, appts.getLocalizedResources().size()); appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto, assertEquals(appLocalizedProto,
appts.getLocalizedResources().iterator().next()); completedResources.iterator().next());
// start some public and private resources // start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
@ -732,28 +907,40 @@ public void testFinishResourceLocalization() throws IOException {
restartStateStore(); restartStateStore();
state = stateStore.loadLocalizationState(); state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState(); pubts = state.getPublicTrackerState();
assertEquals(1, pubts.getLocalizedResources().size()); completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1, assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next()); completedResources.iterator().next());
assertEquals(1, pubts.getInProgressResources().size()); assertEquals(1, startedResources.size());
assertEquals(pubRsrcLocalPath2, assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2)); startedResources.get(pubRsrcProto2));
userResources = loadUserResources(state.getIterator()); userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size()); assertEquals(1, userResources.size());
rur = userResources.get(user); rur = userResources.get(user);
privts = rur.getPrivateTrackerState(); privts = rur.getPrivateTrackerState();
assertNotNull(privts); assertNotNull(privts);
assertEquals(1, privts.getLocalizedResources().size()); completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertEquals(1, completedResources.size());
assertEquals(privLocalizedProto, assertEquals(privLocalizedProto,
privts.getLocalizedResources().iterator().next()); completedResources.iterator().next());
assertTrue(privts.getInProgressResources().isEmpty()); assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size()); assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId); appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts); assertNotNull(appts);
assertTrue(appts.getInProgressResources().isEmpty()); completedResources = loadCompletedResources(
assertEquals(1, appts.getLocalizedResources().size()); appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto, assertEquals(appLocalizedProto,
appts.getLocalizedResources().iterator().next()); completedResources.iterator().next());
} }
@Test @Test
@ -841,10 +1028,14 @@ public void testRemoveLocalizedResource() throws IOException {
restartStateStore(); restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState(); RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState(); LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getInProgressResources().isEmpty()); List<LocalizedResourceProto> completedResources =
assertEquals(1, pubts.getLocalizedResources().size()); loadCompletedResources(pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources =
loadStartedResources(pubts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1, assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next()); completedResources.iterator().next());
Map<String, RecoveredUserResources> userResources = Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator()); loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty()); assertTrue(userResources.isEmpty());