YARN-7133. Clean up lock-try order in fair scheduler. (Szilard Nemeth via Haibo Chen)
This commit is contained in:
parent
e673dd1d4d
commit
ea2c6c8c9a
@ -467,8 +467,8 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
||||||
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
@ -550,8 +550,8 @@ protected void addApplicationAttempt(
|
|||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt,
|
boolean transferStateFromPreviousAttempt,
|
||||||
boolean isAttemptRecovering) {
|
boolean isAttemptRecovering) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
SchedulerApplication<FSAppAttempt> application = applications.get(
|
SchedulerApplication<FSAppAttempt> application = applications.get(
|
||||||
applicationAttemptId.getApplicationId());
|
applicationAttemptId.getApplicationId());
|
||||||
String user = application.getUser();
|
String user = application.getUser();
|
||||||
@ -653,8 +653,8 @@ private void removeApplication(ApplicationId applicationId,
|
|||||||
private void removeApplicationAttempt(
|
private void removeApplicationAttempt(
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
LOG.info("Application " + applicationAttemptId + " is done. finalState="
|
LOG.info("Application " + applicationAttemptId + " is done. finalState="
|
||||||
+ rmAppAttemptFinalState);
|
+ rmAppAttemptFinalState);
|
||||||
FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
|
FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
|
||||||
@ -720,8 +720,8 @@ private void removeApplicationAttempt(
|
|||||||
protected void completedContainerInternal(
|
protected void completedContainerInternal(
|
||||||
RMContainer rmContainer, ContainerStatus containerStatus,
|
RMContainer rmContainer, ContainerStatus containerStatus,
|
||||||
RMContainerEventType event) {
|
RMContainerEventType event) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
Container container = rmContainer.getContainer();
|
Container container = rmContainer.getContainer();
|
||||||
|
|
||||||
// Get the application for the finished container
|
// Get the application for the finished container
|
||||||
@ -768,8 +768,8 @@ protected void completedContainerInternal(
|
|||||||
|
|
||||||
private void addNode(List<NMContainerStatus> containerReports,
|
private void addNode(List<NMContainerStatus> containerReports,
|
||||||
RMNode node) {
|
RMNode node) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
|
FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
|
||||||
usePortForNodeName);
|
usePortForNodeName);
|
||||||
nodeTracker.addNode(schedulerNode);
|
nodeTracker.addNode(schedulerNode);
|
||||||
@ -790,8 +790,8 @@ private void addNode(List<NMContainerStatus> containerReports,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void removeNode(RMNode rmNode) {
|
private void removeNode(RMNode rmNode) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
NodeId nodeId = rmNode.getNodeID();
|
NodeId nodeId = rmNode.getNodeID();
|
||||||
FSSchedulerNode node = nodeTracker.getNode(nodeId);
|
FSSchedulerNode node = nodeTracker.getNode(nodeId);
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
@ -988,8 +988,8 @@ private List<MaxResourceValidationResult> validateResourceRequests(
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeUpdate(RMNode nm) {
|
protected void nodeUpdate(RMNode nm) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
long start = getClock().getTime();
|
long start = getClock().getTime();
|
||||||
super.nodeUpdate(nm);
|
super.nodeUpdate(nm);
|
||||||
|
|
||||||
@ -1089,8 +1089,8 @@ static void assignPreemptedContainers(FSSchedulerNode node) {
|
|||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void attemptScheduling(FSSchedulerNode node) {
|
void attemptScheduling(FSSchedulerNode node) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
|
if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
|
||||||
.isSchedulerReadyForAllocatingContainers()) {
|
.isSchedulerReadyForAllocatingContainers()) {
|
||||||
return;
|
return;
|
||||||
@ -1305,8 +1305,8 @@ public void handle(SchedulerEvent event) {
|
|||||||
private String resolveReservationQueueName(String queueName,
|
private String resolveReservationQueueName(String queueName,
|
||||||
ApplicationId applicationId, ReservationId reservationID,
|
ApplicationId applicationId, ReservationId reservationID,
|
||||||
boolean isRecovering) {
|
boolean isRecovering) {
|
||||||
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
|
||||||
FSQueue queue = queueMgr.getQueue(queueName);
|
FSQueue queue = queueMgr.getQueue(queueName);
|
||||||
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
|
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
|
||||||
return queueName;
|
return queueName;
|
||||||
@ -1372,8 +1372,8 @@ public void setRMContext(RMContext rmContext) {
|
|||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private void initScheduler(Configuration conf) throws IOException {
|
private void initScheduler(Configuration conf) throws IOException {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
this.conf = new FairSchedulerConfiguration(conf);
|
this.conf = new FairSchedulerConfiguration(conf);
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
authorizer = YarnAuthorizationProvider.getInstance(conf);
|
authorizer = YarnAuthorizationProvider.getInstance(conf);
|
||||||
@ -1464,8 +1464,8 @@ private void updateReservationThreshold() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void startSchedulerThreads() {
|
private void startSchedulerThreads() {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
|
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
|
||||||
if (continuousSchedulingEnabled) {
|
if (continuousSchedulingEnabled) {
|
||||||
Preconditions.checkNotNull(schedulingThread,
|
Preconditions.checkNotNull(schedulingThread,
|
||||||
@ -1499,8 +1499,8 @@ public void serviceStart() throws Exception {
|
|||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public void serviceStop() throws Exception {
|
public void serviceStop() throws Exception {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
if (continuousSchedulingEnabled) {
|
if (continuousSchedulingEnabled) {
|
||||||
if (schedulingThread != null) {
|
if (schedulingThread != null) {
|
||||||
schedulingThread.interrupt();
|
schedulingThread.interrupt();
|
||||||
@ -1562,8 +1562,8 @@ public int getNumClusterNodes() {
|
|||||||
@Override
|
@Override
|
||||||
public boolean checkAccess(UserGroupInformation callerUGI,
|
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||||
QueueACL acl, String queueName) {
|
QueueACL acl, String queueName) {
|
||||||
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
|
||||||
FSQueue queue = getQueueManager().getQueue(queueName);
|
FSQueue queue = getQueueManager().getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -1691,8 +1691,8 @@ public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
|||||||
@Override
|
@Override
|
||||||
public String moveApplication(ApplicationId appId,
|
public String moveApplication(ApplicationId appId,
|
||||||
String queueName) throws YarnException {
|
String queueName) throws YarnException {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
|
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
|
||||||
if (app == null) {
|
if (app == null) {
|
||||||
throw new YarnException("App to be moved " + appId + " not found.");
|
throw new YarnException("App to be moved " + appId + " not found.");
|
||||||
@ -1700,8 +1700,8 @@ public String moveApplication(ApplicationId appId,
|
|||||||
FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
|
FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
|
||||||
// To serialize with FairScheduler#allocate, synchronize on app attempt
|
// To serialize with FairScheduler#allocate, synchronize on app attempt
|
||||||
|
|
||||||
|
attempt.getWriteLock().lock();
|
||||||
try {
|
try {
|
||||||
attempt.getWriteLock().lock();
|
|
||||||
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
||||||
// Check if the attempt is already stopped: don't move stopped app
|
// Check if the attempt is already stopped: don't move stopped app
|
||||||
// attempt. The attempt has already been removed from all queues.
|
// attempt. The attempt has already been removed from all queues.
|
||||||
@ -1737,8 +1737,8 @@ public String moveApplication(ApplicationId appId,
|
|||||||
@Override
|
@Override
|
||||||
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
|
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
|
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
|
||||||
if (app == null) {
|
if (app == null) {
|
||||||
throw new YarnException("App to be moved " + appId + " not found.");
|
throw new YarnException("App to be moved " + appId + " not found.");
|
||||||
@ -1747,8 +1747,8 @@ public void preValidateMoveApplication(ApplicationId appId, String newQueue)
|
|||||||
FSAppAttempt attempt = app.getCurrentAppAttempt();
|
FSAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
// To serialize with FairScheduler#allocate, synchronize on app attempt
|
// To serialize with FairScheduler#allocate, synchronize on app attempt
|
||||||
|
|
||||||
|
attempt.getWriteLock().lock();
|
||||||
try {
|
try {
|
||||||
attempt.getWriteLock().lock();
|
|
||||||
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
||||||
String destQueueName = handleMoveToPlanQueue(newQueue);
|
String destQueueName = handleMoveToPlanQueue(newQueue);
|
||||||
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
||||||
@ -1869,8 +1869,8 @@ FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
|
|||||||
@Override
|
@Override
|
||||||
public void updateNodeResource(RMNode nm,
|
public void updateNodeResource(RMNode nm,
|
||||||
ResourceOption resourceOption) {
|
ResourceOption resourceOption) {
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
super.updateNodeResource(nm, resourceOption);
|
super.updateNodeResource(nm, resourceOption);
|
||||||
updateRootQueueMetrics();
|
updateRootQueueMetrics();
|
||||||
queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
|
queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
|
||||||
|
Loading…
Reference in New Issue
Block a user