MAPREDUCE-7151. RMContainerAllocator#handleJobPriorityChange expects application_priority always. Contributed by Bilwa S T.

This commit is contained in:
bibinchundatt 2018-10-22 15:59:14 +05:30
parent c1874046e2
commit 74a5e683fe
2 changed files with 22 additions and 6 deletions

View File

@ -1020,13 +1020,15 @@ private void handleUpdatedNodes(AllocateResponse response) {
} }
} }
private void handleJobPriorityChange(AllocateResponse response) { void handleJobPriorityChange(AllocateResponse response) {
Priority priorityFromResponse = Priority.newInstance(response Priority applicationPriority = response.getApplicationPriority();
.getApplicationPriority().getPriority()); if (null != applicationPriority) {
Priority priorityFromResponse = Priority
.newInstance(applicationPriority.getPriority());
// Update the job priority to Job directly. // Update the job priority to Job directly.
getJob().setJobPriority(priorityFromResponse); getJob().setJobPriority(priorityFromResponse);
} }
}
@Private @Private
public Resource getResourceLimit() { public Resource getResourceLimit() {

View File

@ -2160,6 +2160,20 @@ protected AllocateResponse makeRemoteRequest() throws IOException,
} }
} }
@Test
public void testIfApplicationPriorityIsNotSet() {
Job mockJob = mock(Job.class);
RMCommunicator communicator = mock(RMCommunicator.class);
ClientService service = mock(ClientService.class);
AppContext context = mock(AppContext.class);
AMPreemptionPolicy policy = mock(AMPreemptionPolicy.class);
when(communicator.getJob()).thenReturn(mockJob);
RMContainerAllocator allocator = new RMContainerAllocator(service, context,
policy);
AllocateResponse response = Records.newRecord(AllocateResponse.class);
allocator.handleJobPriorityChange(response);
}
@Test @Test
public void testReduceScheduling() throws Exception { public void testReduceScheduling() throws Exception {
int totalMaps = 10; int totalMaps = 10;