Fix synchronization issues of AbstractYarnScheduler#nodeUpdate and its implementations. (Naganarasimha G R via wangda)
This commit is contained in:
parent
88731c731a
commit
e0f2379312
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
@ -51,7 +52,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -77,14 +77,15 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
@ -395,8 +396,8 @@ private void killOrphanContainerOnNode(RMNode node,
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void recoverContainersOnNode(
|
||||
List<NMContainerStatus> containerReports, RMNode nm) {
|
||||
public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
|
||||
RMNode nm) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (!rmContext.isWorkPreservingRecoveryEnabled()
|
||||
@ -993,7 +994,7 @@ protected void updateNodeResourceUtilization(RMNode nm) {
|
||||
* Process a heartbeat update from a node.
|
||||
* @param nm The RMNode corresponding to the NodeManager
|
||||
*/
|
||||
protected synchronized void nodeUpdate(RMNode nm) {
|
||||
protected void nodeUpdate(RMNode nm) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("nodeUpdate: " + nm +
|
||||
" cluster capacity: " + getClusterResource());
|
||||
|
@ -103,9 +103,9 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
||||
@ -1016,7 +1016,7 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void nodeUpdate(RMNode rmNode) {
|
||||
protected void nodeUpdate(RMNode rmNode) {
|
||||
try {
|
||||
readLock.lock();
|
||||
setLastNodeUpdateTime(Time.now());
|
||||
|
@ -37,6 +37,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
@ -51,7 +52,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
@ -883,7 +883,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void nodeUpdate(RMNode nm) {
|
||||
protected void nodeUpdate(RMNode nm) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
long start = getClock().getTime();
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
@ -77,7 +78,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
@ -90,6 +90,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
@ -978,4 +979,10 @@ protected synchronized void nodeUpdate(RMNode nm) {
|
||||
|
||||
updateAvailableResourcesMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void recoverContainersOnNode(
|
||||
List<NMContainerStatus> containerReports, RMNode nm) {
|
||||
super.recoverContainersOnNode(containerReports, nm);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user