YARN-4924. NM recovery race can lead to container not cleaned up. Contributed by sandflee
This commit is contained in:
parent
a74580a4d3
commit
3150ae8108
@ -296,20 +296,8 @@ private void recover() throws IOException, URISyntaxException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recovering container with state: " + rcs);
|
||||
}
|
||||
|
||||
recoverContainer(rcs);
|
||||
}
|
||||
|
||||
String diagnostic = "Application marked finished during recovery";
|
||||
for (ApplicationId appId : appsState.getFinishedApplications()) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Application marked finished during recovery: " + appId);
|
||||
}
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationFinishEvent(appId, diagnostic));
|
||||
}
|
||||
} else {
|
||||
LOG.info("Not a recoverable state store. Nothing to recover.");
|
||||
}
|
||||
@ -1332,11 +1320,6 @@ public void handle(ContainerManagerEvent event) {
|
||||
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
|
||||
diagnostic = "Application killed by ResourceManager";
|
||||
}
|
||||
try {
|
||||
this.context.getNMStateStore().storeFinishedApplication(appID);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to update application state in store", e);
|
||||
}
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ApplicationFinishEvent(appID,
|
||||
diagnostic));
|
||||
|
@ -84,6 +84,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||
|
||||
private static final String APPLICATIONS_KEY_PREFIX =
|
||||
"ContainerManager/applications/";
|
||||
@Deprecated
|
||||
private static final String FINISHED_APPS_KEY_PREFIX =
|
||||
"ContainerManager/finishedApps/";
|
||||
|
||||
@ -392,20 +393,6 @@ public RecoveredApplicationsState loadApplicationsState()
|
||||
state.applications.add(
|
||||
ContainerManagerApplicationProto.parseFrom(entry.getValue()));
|
||||
}
|
||||
|
||||
state.finishedApplications = new ArrayList<ApplicationId>();
|
||||
keyPrefix = FINISHED_APPS_KEY_PREFIX;
|
||||
iter.seek(bytes(keyPrefix));
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.next();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(keyPrefix)) {
|
||||
break;
|
||||
}
|
||||
ApplicationId appId =
|
||||
ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
|
||||
state.finishedApplications.add(appId);
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
@ -414,6 +401,8 @@ public RecoveredApplicationsState loadApplicationsState()
|
||||
}
|
||||
}
|
||||
|
||||
cleanupDeprecatedFinishedApps();
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
@ -433,21 +422,6 @@ public void storeApplication(ApplicationId appId,
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeFinishedApplication(ApplicationId appId)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("storeFinishedApplication.appId: " + appId);
|
||||
}
|
||||
|
||||
String key = FINISHED_APPS_KEY_PREFIX + appId;
|
||||
try {
|
||||
db.put(bytes(key), new byte[0]);
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeApplication(ApplicationId appId)
|
||||
throws IOException {
|
||||
@ -460,8 +434,6 @@ public void removeApplication(ApplicationId appId)
|
||||
try {
|
||||
String key = APPLICATIONS_KEY_PREFIX + appId;
|
||||
batch.delete(bytes(key));
|
||||
key = FINISHED_APPS_KEY_PREFIX + appId;
|
||||
batch.delete(bytes(key));
|
||||
db.write(batch);
|
||||
} finally {
|
||||
batch.close();
|
||||
@ -979,6 +951,52 @@ public void removeLogDeleter(ApplicationId appId) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void cleanupDeprecatedFinishedApps() {
|
||||
try {
|
||||
cleanupKeysWithPrefix(FINISHED_APPS_KEY_PREFIX);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("cleanup keys with prefix " + FINISHED_APPS_KEY_PREFIX +
|
||||
" from leveldb failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupKeysWithPrefix(String prefix) throws IOException {
|
||||
WriteBatch batch = null;
|
||||
LeveldbIterator iter = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
try {
|
||||
batch = db.createWriteBatch();
|
||||
iter.seek(bytes(prefix));
|
||||
while (iter.hasNext()) {
|
||||
byte[] key = iter.next().getKey();
|
||||
String keyStr = asString(key);
|
||||
if (!keyStr.startsWith(prefix)) {
|
||||
break;
|
||||
}
|
||||
batch.delete(key);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("cleanup " + keyStr + " from leveldb");
|
||||
}
|
||||
}
|
||||
db.write(batch);
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (batch != null) {
|
||||
batch.close();
|
||||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getLogDeleterKey(ApplicationId appId) {
|
||||
return LOG_DELETER_KEY_PREFIX + appId;
|
||||
}
|
||||
|
@ -58,10 +58,6 @@ public void storeApplication(ApplicationId appId,
|
||||
ContainerManagerApplicationProto p) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeFinishedApplication(ApplicationId appId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeApplication(ApplicationId appId) throws IOException {
|
||||
}
|
||||
|
@ -52,15 +52,11 @@ public NMStateStoreService(String name) {
|
||||
|
||||
public static class RecoveredApplicationsState {
|
||||
List<ContainerManagerApplicationProto> applications;
|
||||
List<ApplicationId> finishedApplications;
|
||||
|
||||
public List<ContainerManagerApplicationProto> getApplications() {
|
||||
return applications;
|
||||
}
|
||||
|
||||
public List<ApplicationId> getFinishedApplications() {
|
||||
return finishedApplications;
|
||||
}
|
||||
}
|
||||
|
||||
public enum RecoveredContainerStatus {
|
||||
@ -258,14 +254,6 @@ public abstract RecoveredApplicationsState loadApplicationsState()
|
||||
public abstract void storeApplication(ApplicationId appId,
|
||||
ContainerManagerApplicationProto p) throws IOException;
|
||||
|
||||
/**
|
||||
* Record that an application has finished
|
||||
* @param appId the application ID
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void storeFinishedApplication(ApplicationId appId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Remove records corresponding to an application
|
||||
* @param appId the application ID
|
||||
|
@ -259,6 +259,10 @@ public void testApplicationRecovery() throws Exception {
|
||||
assertEquals(1, context.getApplications().size());
|
||||
app = context.getApplications().get(appId);
|
||||
assertNotNull(app);
|
||||
// no longer saving FINISH_APP event in NM stateStore,
|
||||
// simulate by resending FINISH_APP event
|
||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||
UserGroupInformation.createRemoteUser(modUser),
|
||||
|
@ -44,7 +44,6 @@
|
||||
|
||||
public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
|
||||
private Set<ApplicationId> finishedApps;
|
||||
private Map<ContainerId, RecoveredContainerState> containerStates;
|
||||
private Map<TrackerKey, TrackerState> trackerStates;
|
||||
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
||||
@ -59,7 +58,6 @@ public NMMemoryStateStoreService() {
|
||||
@Override
|
||||
protected void initStorage(Configuration conf) {
|
||||
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
|
||||
finishedApps = new HashSet<ApplicationId>();
|
||||
containerStates = new HashMap<ContainerId, RecoveredContainerState>();
|
||||
nmTokenState = new RecoveredNMTokensState();
|
||||
nmTokenState.applicationMasterKeys =
|
||||
@ -86,7 +84,6 @@ public synchronized RecoveredApplicationsState loadApplicationsState()
|
||||
RecoveredApplicationsState state = new RecoveredApplicationsState();
|
||||
state.applications = new ArrayList<ContainerManagerApplicationProto>(
|
||||
apps.values());
|
||||
state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
|
||||
return state;
|
||||
}
|
||||
|
||||
@ -98,16 +95,10 @@ public synchronized void storeApplication(ApplicationId appId,
|
||||
apps.put(appId, protoCopy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeFinishedApplication(ApplicationId appId) {
|
||||
finishedApps.add(appId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeApplication(ApplicationId appId)
|
||||
throws IOException {
|
||||
apps.remove(appId);
|
||||
finishedApps.remove(appId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -393,7 +384,6 @@ public synchronized void removeLogDeleter(ApplicationId appId)
|
||||
logDeleterState.remove(appId);
|
||||
}
|
||||
|
||||
|
||||
private static class TrackerState {
|
||||
Map<Path, LocalResourceProto> inProgressMap =
|
||||
new HashMap<Path, LocalResourceProto>();
|
||||
|
@ -174,7 +174,6 @@ public void testApplicationStorage() throws IOException {
|
||||
// test empty when no state
|
||||
RecoveredApplicationsState state = stateStore.loadApplicationsState();
|
||||
assertTrue(state.getApplications().isEmpty());
|
||||
assertTrue(state.getFinishedApplications().isEmpty());
|
||||
|
||||
// store an application and verify recovered
|
||||
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
|
||||
@ -188,10 +187,8 @@ public void testApplicationStorage() throws IOException {
|
||||
state = stateStore.loadApplicationsState();
|
||||
assertEquals(1, state.getApplications().size());
|
||||
assertEquals(appProto1, state.getApplications().get(0));
|
||||
assertTrue(state.getFinishedApplications().isEmpty());
|
||||
|
||||
// finish an application and add a new one
|
||||
stateStore.storeFinishedApplication(appId1);
|
||||
// add a new app
|
||||
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
|
||||
builder = ContainerManagerApplicationProto.newBuilder();
|
||||
builder.setId(((ApplicationIdPBImpl) appId2).getProto());
|
||||
@ -203,18 +200,13 @@ public void testApplicationStorage() throws IOException {
|
||||
assertEquals(2, state.getApplications().size());
|
||||
assertTrue(state.getApplications().contains(appProto1));
|
||||
assertTrue(state.getApplications().contains(appProto2));
|
||||
assertEquals(1, state.getFinishedApplications().size());
|
||||
assertEquals(appId1, state.getFinishedApplications().get(0));
|
||||
|
||||
// test removing an application
|
||||
stateStore.storeFinishedApplication(appId2);
|
||||
stateStore.removeApplication(appId2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadApplicationsState();
|
||||
assertEquals(1, state.getApplications().size());
|
||||
assertEquals(appProto1, state.getApplications().get(0));
|
||||
assertEquals(1, state.getFinishedApplications().size());
|
||||
assertEquals(appId1, state.getFinishedApplications().get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user