YARN-8232. RMContainer lost queue name when RM HA happens. (Hu Ziqian via wangda)
Change-Id: Ia21e1da6871570c993bbedde76ce32929e95970f
This commit is contained in:
parent
d72c165161
commit
6b96a73bb0
@ -531,7 +531,8 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create container
|
// create container
|
||||||
RMContainer rmContainer = recoverAndCreateContainer(container, nm);
|
RMContainer rmContainer = recoverAndCreateContainer(container, nm,
|
||||||
|
schedulerApp.getQueue().getQueueName());
|
||||||
|
|
||||||
// recover RMContainer
|
// recover RMContainer
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
@ -581,7 +582,7 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private RMContainer recoverAndCreateContainer(NMContainerStatus status,
|
private RMContainer recoverAndCreateContainer(NMContainerStatus status,
|
||||||
RMNode node) {
|
RMNode node, String queueName) {
|
||||||
Container container =
|
Container container =
|
||||||
Container.newInstance(status.getContainerId(), node.getNodeID(),
|
Container.newInstance(status.getContainerId(), node.getNodeID(),
|
||||||
node.getHttpAddress(), status.getAllocatedResource(),
|
node.getHttpAddress(), status.getAllocatedResource(),
|
||||||
@ -596,6 +597,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status,
|
|||||||
SchedulerRequestKey.extractFrom(container), attemptId, node.getNodeID(),
|
SchedulerRequestKey.extractFrom(container), attemptId, node.getNodeID(),
|
||||||
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
|
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
|
||||||
status.getCreationTime(), status.getNodeLabelExpression());
|
status.getCreationTime(), status.getNodeLabelExpression());
|
||||||
|
((RMContainerImpl) rmContainer).setQueueName(queueName);
|
||||||
return rmContainer;
|
return rmContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,22 +30,14 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.*;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
@ -59,6 +51,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
@ -874,4 +867,51 @@ public void testUpdateThreadLifeCycle() throws Exception {
|
|||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testContainerRecoveredByNode() throws Exception {
|
||||||
|
System.out.println("Starting testContainerRecoveredByNode");
|
||||||
|
final int maxMemory = 10 * 1024;
|
||||||
|
YarnConfiguration conf = getConf();
|
||||||
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
|
||||||
|
conf.set(
|
||||||
|
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
try {
|
||||||
|
rm1.start();
|
||||||
|
RMApp app1 =
|
||||||
|
rm1.submitApp(200, "name", "user",
|
||||||
|
new HashMap<ApplicationAccessType, String>(), false, "default",
|
||||||
|
-1, null, "Test", false, true);
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
YarnScheduler scheduler = rm1.getResourceScheduler();
|
||||||
|
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(
|
||||||
|
0, Resources.createResource(maxMemory), 1, "127.0.0.2");
|
||||||
|
ContainerId containerId = ContainerId.newContainerId(
|
||||||
|
app1.getCurrentAppAttempt().getAppAttemptId(), 2);
|
||||||
|
NMContainerStatus containerReport =
|
||||||
|
NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
|
||||||
|
Resource.newInstance(1024, 1), "recover container", 0,
|
||||||
|
Priority.newInstance(0), 0);
|
||||||
|
List<NMContainerStatus> containerReports = new ArrayList<>();
|
||||||
|
containerReports.add(containerReport);
|
||||||
|
scheduler.handle(new NodeAddedSchedulerEvent(node1, containerReports));
|
||||||
|
RMContainer rmContainer = scheduler.getRMContainer(containerId);
|
||||||
|
|
||||||
|
//verify queue name when rmContainer is recovered
|
||||||
|
Assert.assertEquals(app1.getQueue(), rmContainer.getQueueName());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
rm1.stop();
|
||||||
|
System.out.println("Stopping testContainerRecoveredByNode");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user