YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via jeagles)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569853 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7e8d85ebb3
commit
996acc834e
@ -22,6 +22,8 @@ Release 2.5.0 - UNRELEASED
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
|
||||
jeagles)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
@ -403,6 +403,15 @@ public AllocateResponse allocate(AllocateRequest request)
|
||||
return resync;
|
||||
}
|
||||
|
||||
//filter illegal progress values
|
||||
float filteredProgress = request.getProgress();
|
||||
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
|
||||
|| filteredProgress < 0) {
|
||||
request.setProgress(0);
|
||||
} else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
|
||||
request.setProgress(1);
|
||||
}
|
||||
|
||||
// Send the status update to the appAttempt.
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptStatusupdateEvent(appAttemptId, request
|
||||
|
@ -197,6 +197,29 @@ public AllocateResponse run() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
public AllocateResponse allocate(AllocateRequest allocateRequest)
|
||||
throws Exception {
|
||||
final AllocateRequest req = allocateRequest;
|
||||
req.setResponseId(++responseId);
|
||||
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||
Token<AMRMTokenIdentifier> token =
|
||||
context.getRMApps().get(attemptId.getApplicationId())
|
||||
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||
try {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||
@Override
|
||||
public AllocateResponse run() throws Exception {
|
||||
return amRMProtocol.allocate(req);
|
||||
}
|
||||
});
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw (Exception) e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
public void unregisterAppAttempt() throws Exception {
|
||||
waitForState(RMAppAttemptState.RUNNING);
|
||||
final FinishApplicationMasterRequest req =
|
||||
|
@ -18,23 +18,58 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyList;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestApplicationMasterService {
|
||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||
@ -73,7 +108,7 @@ public void testRMIdentifierOnContainerAllocation() throws Exception {
|
||||
nm1.nodeHeartbeat(true);
|
||||
while (alloc1Response.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(1000);
|
||||
sleep(1000);
|
||||
alloc1Response = am1.schedule();
|
||||
}
|
||||
|
||||
@ -113,7 +148,7 @@ public void testInvalidContainerReleaseRequest() throws Exception {
|
||||
nm1.nodeHeartbeat(true);
|
||||
while (alloc1Response.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(1000);
|
||||
sleep(1000);
|
||||
alloc1Response = am1.schedule();
|
||||
}
|
||||
|
||||
@ -145,4 +180,70 @@ public void testInvalidContainerReleaseRequest() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=1200000)
|
||||
public void testProgressFilter() throws Exception{
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
// Register node1
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
|
||||
// Submit an application
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
am1.setAMRMProtocol(rm.getApplicationMasterService());
|
||||
|
||||
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
allocateRequest.setReleaseList(release);
|
||||
allocateRequest.setAskList(ask);
|
||||
|
||||
allocateRequest.setProgress(Float.POSITIVE_INFINITY);
|
||||
am1.allocate(allocateRequest);
|
||||
while(attempt1.getProgress()!=1){
|
||||
LOG.info("Waiting for allocate event to be handled ...");
|
||||
sleep(100);
|
||||
}
|
||||
|
||||
allocateRequest.setProgress(Float.NaN);
|
||||
am1.allocate(allocateRequest);
|
||||
while(attempt1.getProgress()!=0){
|
||||
LOG.info("Waiting for allocate event to be handled ...");
|
||||
sleep(100);
|
||||
}
|
||||
|
||||
allocateRequest.setProgress((float)9);
|
||||
am1.allocate(allocateRequest);
|
||||
while(attempt1.getProgress()!=1){
|
||||
LOG.info("Waiting for allocate event to be handled ...");
|
||||
sleep(100);
|
||||
}
|
||||
|
||||
allocateRequest.setProgress(Float.NEGATIVE_INFINITY);
|
||||
am1.allocate(allocateRequest);
|
||||
while(attempt1.getProgress()!=0){
|
||||
LOG.info("Waiting for allocate event to be handled ...");
|
||||
sleep(100);
|
||||
}
|
||||
|
||||
allocateRequest.setProgress((float)0.5);
|
||||
am1.allocate(allocateRequest);
|
||||
while(attempt1.getProgress()!=0.5){
|
||||
LOG.info("Waiting for allocate event to be handled ...");
|
||||
sleep(100);
|
||||
}
|
||||
|
||||
allocateRequest.setProgress((float)-1);
|
||||
am1.allocate(allocateRequest);
|
||||
while(attempt1.getProgress()!=0){
|
||||
LOG.info("Waiting for allocate event to be handled ...");
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user