YARN-4794. Deadlock in NMClientImpl. Contributed by Jian He.
This commit is contained in:
parent
44bbc50d91
commit
ff722bbbdf
@ -171,8 +171,6 @@ private void addStartingContainer(StartedContainer startedContainer)
|
|||||||
throw RPCUtil.getRemoteException("Container "
|
throw RPCUtil.getRemoteException("Container "
|
||||||
+ startedContainer.containerId.toString() + " is already started");
|
+ startedContainer.containerId.toString() + " is already started");
|
||||||
}
|
}
|
||||||
startedContainers
|
|
||||||
.put(startedContainer.getContainerId(), startedContainer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -182,7 +180,8 @@ public Map<String, ByteBuffer> startContainer(
|
|||||||
// Do synchronization on StartedContainer to prevent race condition
|
// Do synchronization on StartedContainer to prevent race condition
|
||||||
// between startContainer and stopContainer only when startContainer is
|
// between startContainer and stopContainer only when startContainer is
|
||||||
// in progress for a given container.
|
// in progress for a given container.
|
||||||
StartedContainer startingContainer = createStartedContainer(container);
|
StartedContainer startingContainer =
|
||||||
|
new StartedContainer(container.getId(), container.getNodeId());
|
||||||
synchronized (startingContainer) {
|
synchronized (startingContainer) {
|
||||||
addStartingContainer(startingContainer);
|
addStartingContainer(startingContainer);
|
||||||
|
|
||||||
@ -210,18 +209,14 @@ public Map<String, ByteBuffer> startContainer(
|
|||||||
}
|
}
|
||||||
allServiceResponse = response.getAllServicesMetaData();
|
allServiceResponse = response.getAllServicesMetaData();
|
||||||
startingContainer.state = ContainerState.RUNNING;
|
startingContainer.state = ContainerState.RUNNING;
|
||||||
} catch (YarnException e) {
|
} catch (YarnException | IOException e) {
|
||||||
startingContainer.state = ContainerState.COMPLETE;
|
startingContainer.state = ContainerState.COMPLETE;
|
||||||
// Remove the started container if it failed to start
|
// Remove the started container if it failed to start
|
||||||
removeStartedContainer(startingContainer);
|
startedContainers.remove(startingContainer.containerId);
|
||||||
throw e;
|
|
||||||
} catch (IOException e) {
|
|
||||||
startingContainer.state = ContainerState.COMPLETE;
|
|
||||||
removeStartedContainer(startingContainer);
|
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
startingContainer.state = ContainerState.COMPLETE;
|
startingContainer.state = ContainerState.COMPLETE;
|
||||||
removeStartedContainer(startingContainer);
|
startedContainers.remove(startingContainer.containerId);
|
||||||
throw RPCUtil.getRemoteException(t);
|
throw RPCUtil.getRemoteException(t);
|
||||||
} finally {
|
} finally {
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
@ -263,7 +258,7 @@ public void increaseContainerResource(Container container)
|
|||||||
@Override
|
@Override
|
||||||
public void stopContainer(ContainerId containerId, NodeId nodeId)
|
public void stopContainer(ContainerId containerId, NodeId nodeId)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
StartedContainer startedContainer = getStartedContainer(containerId);
|
StartedContainer startedContainer = startedContainers.get(containerId);
|
||||||
|
|
||||||
// Only allow one request of stopping the container to move forward
|
// Only allow one request of stopping the container to move forward
|
||||||
// When entering the block, check whether the precursor has already stopped
|
// When entering the block, check whether the precursor has already stopped
|
||||||
@ -276,7 +271,7 @@ public void stopContainer(ContainerId containerId, NodeId nodeId)
|
|||||||
stopContainerInternal(containerId, nodeId);
|
stopContainerInternal(containerId, nodeId);
|
||||||
// Only after successful
|
// Only after successful
|
||||||
startedContainer.state = ContainerState.COMPLETE;
|
startedContainer.state = ContainerState.COMPLETE;
|
||||||
removeStartedContainer(startedContainer);
|
startedContainers.remove(startedContainer.containerId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stopContainerInternal(containerId, nodeId);
|
stopContainerInternal(containerId, nodeId);
|
||||||
@ -334,23 +329,6 @@ private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized StartedContainer createStartedContainer(
|
|
||||||
Container container) throws YarnException, IOException {
|
|
||||||
StartedContainer startedContainer = new StartedContainer(container.getId(),
|
|
||||||
container.getNodeId());
|
|
||||||
return startedContainer;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected synchronized void
|
|
||||||
removeStartedContainer(StartedContainer container) {
|
|
||||||
startedContainers.remove(container.containerId);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected synchronized StartedContainer getStartedContainer(
|
|
||||||
ContainerId containerId) {
|
|
||||||
return startedContainers.get(containerId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public AtomicBoolean getCleanupRunningContainers() {
|
public AtomicBoolean getCleanupRunningContainers() {
|
||||||
return cleanupRunningContainers;
|
return cleanupRunningContainers;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user