YARN-11577. Improve FederationInterceptorREST Method Result. (#6190) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
53c3ae1c89
commit
616e381c9f
@ -105,8 +105,9 @@ public static void logAndThrowException(Throwable t, String errMsgFormat, Object
|
|||||||
throws YarnException {
|
throws YarnException {
|
||||||
String msg = String.format(errMsgFormat, args);
|
String msg = String.format(errMsgFormat, args);
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(msg, t);
|
String newErrMsg = getErrorMsg(msg, t);
|
||||||
throw new YarnException(msg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
throw new YarnException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
throw new YarnException(msg);
|
throw new YarnException(msg);
|
||||||
@ -234,8 +235,9 @@ private static List<String> getInterceptorClassNames(Configuration conf,
|
|||||||
public static void logAndThrowIOException(String errMsg, Throwable t)
|
public static void logAndThrowIOException(String errMsg, Throwable t)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(errMsg, t);
|
String newErrMsg = getErrorMsg(errMsg, t);
|
||||||
throw new IOException(errMsg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
throw new IOException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(errMsg);
|
LOG.error(errMsg);
|
||||||
throw new IOException(errMsg);
|
throw new IOException(errMsg);
|
||||||
@ -256,8 +258,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
String msg = String.format(errMsgFormat, args);
|
String msg = String.format(errMsgFormat, args);
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(msg, t);
|
String newErrMsg = getErrorMsg(msg, t);
|
||||||
throw new IOException(msg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
throw new IOException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
throw new IOException(msg);
|
throw new IOException(msg);
|
||||||
@ -276,8 +279,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
|
|||||||
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
|
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
|
||||||
throws RuntimeException {
|
throws RuntimeException {
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(errMsg, t);
|
String newErrMsg = getErrorMsg(errMsg, t);
|
||||||
throw new RuntimeException(errMsg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
throw new RuntimeException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(errMsg);
|
LOG.error(errMsg);
|
||||||
throw new RuntimeException(errMsg);
|
throw new RuntimeException(errMsg);
|
||||||
@ -298,8 +302,9 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat,
|
|||||||
throws RuntimeException {
|
throws RuntimeException {
|
||||||
String msg = String.format(errMsgFormat, args);
|
String msg = String.format(errMsgFormat, args);
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(msg, t);
|
String newErrMsg = getErrorMsg(msg, t);
|
||||||
throw new RuntimeException(msg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
throw new RuntimeException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
throw new RuntimeException(msg);
|
throw new RuntimeException(msg);
|
||||||
@ -320,8 +325,9 @@ public static RuntimeException logAndReturnRunTimeException(
|
|||||||
Throwable t, String errMsgFormat, Object... args) {
|
Throwable t, String errMsgFormat, Object... args) {
|
||||||
String msg = String.format(errMsgFormat, args);
|
String msg = String.format(errMsgFormat, args);
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(msg, t);
|
String newErrMsg = getErrorMsg(msg, t);
|
||||||
return new RuntimeException(msg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
return new RuntimeException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
return new RuntimeException(msg);
|
return new RuntimeException(msg);
|
||||||
@ -356,8 +362,9 @@ public static YarnRuntimeException logAndReturnYarnRunTimeException(
|
|||||||
Throwable t, String errMsgFormat, Object... args) {
|
Throwable t, String errMsgFormat, Object... args) {
|
||||||
String msg = String.format(errMsgFormat, args);
|
String msg = String.format(errMsgFormat, args);
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
LOG.error(msg, t);
|
String newErrMsg = getErrorMsg(msg, t);
|
||||||
return new YarnRuntimeException(msg, t);
|
LOG.error(newErrMsg, t);
|
||||||
|
return new YarnRuntimeException(newErrMsg, t);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
return new YarnRuntimeException(msg);
|
return new YarnRuntimeException(msg);
|
||||||
|
@ -341,6 +341,7 @@ protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String app
|
|||||||
|
|
||||||
// Get homeSubCluster By appId
|
// Get homeSubCluster By appId
|
||||||
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
|
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
|
||||||
|
LOG.info("appId = {} : subClusterInfo = {}.", appId, subClusterInfo.getSubClusterId());
|
||||||
return getOrCreateInterceptorForSubCluster(subClusterInfo);
|
return getOrCreateInterceptorForSubCluster(subClusterInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -827,7 +828,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (apps.getApps().isEmpty()) {
|
if (apps.getApps().isEmpty()) {
|
||||||
return null;
|
return new AppsInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge all the application reports got from all the available YARN RMs
|
// Merge all the application reports got from all the available YARN RMs
|
||||||
@ -1135,7 +1136,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId)
|
|||||||
} catch (YarnException | IllegalArgumentException e) {
|
} catch (YarnException | IllegalArgumentException e) {
|
||||||
LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e);
|
LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e);
|
||||||
}
|
}
|
||||||
return null;
|
return new AppState();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -3371,17 +3372,19 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
|
|||||||
}
|
}
|
||||||
|
|
||||||
Exception exception = result.getException();
|
Exception exception = result.getException();
|
||||||
|
if (exception != null) {
|
||||||
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
|
|
||||||
// an exception will be thrown directly.
|
|
||||||
if (!allowPartialResult && exception != null) {
|
|
||||||
throw exception;
|
throw exception;
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
String subClusterId = subClusterInfo != null ?
|
String subClusterId = subClusterInfo != null ?
|
||||||
subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
|
subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
|
||||||
LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
|
LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
|
||||||
throw new YarnRuntimeException(e.getCause().getMessage(), e);
|
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
|
||||||
|
// an exception will be thrown directly.
|
||||||
|
if (!allowPartialResult) {
|
||||||
|
throw new YarnException("SubCluster " + subClusterId +
|
||||||
|
" failed to " + request.getMethodName() + " report.", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@
|
|||||||
* main difference with AMRMProxyService is the protocol they implement.
|
* main difference with AMRMProxyService is the protocol they implement.
|
||||||
**/
|
**/
|
||||||
@Singleton
|
@Singleton
|
||||||
@Path("/ws/v1/cluster")
|
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||||
public class RouterWebServices implements RMWebServiceProtocol {
|
public class RouterWebServices implements RMWebServiceProtocol {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
@ -424,7 +424,7 @@ public BulkActivitiesInfo getBulkActivities(
|
|||||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
@Override
|
@Override
|
||||||
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
|
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
|
||||||
@QueryParam(RMWSConsts.APP_ID) String appId,
|
@PathParam(RMWSConsts.APPID) String appId,
|
||||||
@QueryParam(RMWSConsts.MAX_TIME) String time,
|
@QueryParam(RMWSConsts.MAX_TIME) String time,
|
||||||
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
|
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
|
||||||
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
|
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
@ -39,6 +40,13 @@
|
|||||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.router.webapp.HTTPMethods;
|
||||||
import org.apache.hadoop.yarn.server.router.webapp.JavaProcess;
|
import org.apache.hadoop.yarn.server.router.webapp.JavaProcess;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -48,11 +56,20 @@
|
|||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||||
|
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
|
||||||
import static javax.ws.rs.core.MediaType.APPLICATION_XML;
|
import static javax.ws.rs.core.MediaType.APPLICATION_XML;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS;
|
||||||
import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning;
|
import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
@ -190,6 +207,8 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
|
|||||||
final String queryValue) throws IOException, InterruptedException {
|
final String queryValue) throws IOException, InterruptedException {
|
||||||
|
|
||||||
Client clientToRouter = Client.create();
|
Client clientToRouter = Client.create();
|
||||||
|
clientToRouter.setReadTimeout(5000);
|
||||||
|
clientToRouter.setConnectTimeout(5000);
|
||||||
WebResource toRouter = clientToRouter.resource(routerAddress).path(path);
|
WebResource toRouter = clientToRouter.resource(routerAddress).path(path);
|
||||||
|
|
||||||
final WebResource.Builder toRouterBuilder;
|
final WebResource.Builder toRouterBuilder;
|
||||||
@ -207,4 +226,120 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
|
|||||||
return response.getEntity(returnType);
|
return response.getEntity(returnType);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ClientResponse performCall(final String routerAddress, final String webAddress,
|
||||||
|
final String queryKey, final String queryValue, final Object context,
|
||||||
|
final HTTPMethods method) throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
return UserGroupInformation.createRemoteUser(userName).doAs(
|
||||||
|
(PrivilegedExceptionAction<ClientResponse>) () -> {
|
||||||
|
Client clientToRouter = Client.create();
|
||||||
|
WebResource toRouter = clientToRouter.resource(routerAddress).path(webAddress);
|
||||||
|
|
||||||
|
WebResource toRouterWR = toRouter;
|
||||||
|
if (queryKey != null && queryValue != null) {
|
||||||
|
toRouterWR = toRouterWR.queryParam(queryKey, queryValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
WebResource.Builder builder;
|
||||||
|
if (context != null) {
|
||||||
|
builder = toRouterWR.entity(context, APPLICATION_JSON);
|
||||||
|
builder = builder.accept(APPLICATION_JSON);
|
||||||
|
} else {
|
||||||
|
builder = toRouterWR.accept(APPLICATION_JSON);
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientResponse response = null;
|
||||||
|
|
||||||
|
switch (method) {
|
||||||
|
case DELETE:
|
||||||
|
response = builder.delete(ClientResponse.class);
|
||||||
|
break;
|
||||||
|
case POST:
|
||||||
|
response = builder.post(ClientResponse.class);
|
||||||
|
break;
|
||||||
|
case PUT:
|
||||||
|
response = builder.put(ClientResponse.class);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNodeId(String rmAddress) {
|
||||||
|
Client clientToRM = Client.create();
|
||||||
|
clientToRM.setConnectTimeout(3000);
|
||||||
|
clientToRM.setReadTimeout(3000);
|
||||||
|
WebResource toRM = clientToRM.resource(rmAddress).path(RM_WEB_SERVICE_PATH + NODES);
|
||||||
|
ClientResponse response =
|
||||||
|
toRM.accept(APPLICATION_XML).get(ClientResponse.class);
|
||||||
|
NodesInfo ci = response.getEntity(NodesInfo.class);
|
||||||
|
List<NodeInfo> nodes = ci.getNodes();
|
||||||
|
if (nodes.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
clientToRM.destroy();
|
||||||
|
return nodes.get(0).getNodeId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public NewApplication getNewApplicationId(String routerAddress) {
|
||||||
|
Client clientToRM = Client.create();
|
||||||
|
clientToRM.setConnectTimeout(3000);
|
||||||
|
clientToRM.setReadTimeout(3000);
|
||||||
|
WebResource toRM = clientToRM.resource(routerAddress).path(
|
||||||
|
RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION);
|
||||||
|
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
|
||||||
|
clientToRM.destroy();
|
||||||
|
return response.getEntity(NewApplication.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String submitApplication(String routerAddress) {
|
||||||
|
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
|
||||||
|
String appId = getNewApplicationId(routerAddress).getApplicationId();
|
||||||
|
context.setApplicationId(appId);
|
||||||
|
Client clientToRouter = Client.create();
|
||||||
|
clientToRouter.setConnectTimeout(3000);
|
||||||
|
clientToRouter.setReadTimeout(3000);
|
||||||
|
WebResource toRM = clientToRouter.resource(routerAddress).path(
|
||||||
|
RM_WEB_SERVICE_PATH + APPS);
|
||||||
|
toRM.entity(context, APPLICATION_XML).accept(APPLICATION_XML).post(ClientResponse.class);
|
||||||
|
clientToRouter.destroy();
|
||||||
|
return appId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NewReservation getNewReservationId(String routerAddress) {
|
||||||
|
Client clientToRM = Client.create();
|
||||||
|
clientToRM.setConnectTimeout(3000);
|
||||||
|
clientToRM.setReadTimeout(3000);
|
||||||
|
WebResource toRM = clientToRM.resource(routerAddress).
|
||||||
|
path(RM_WEB_SERVICE_PATH + RESERVATION_NEW);
|
||||||
|
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
|
||||||
|
return response.getEntity(NewReservation.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String addNodeLabel(String routerAddress) {
|
||||||
|
Client clientToRM = Client.create();
|
||||||
|
clientToRM.setConnectTimeout(3000);
|
||||||
|
clientToRM.setReadTimeout(3000);
|
||||||
|
WebResource toRM = clientToRM.resource(routerAddress)
|
||||||
|
.path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS);
|
||||||
|
List<NodeLabel> nodeLabels = new ArrayList<>();
|
||||||
|
nodeLabels.add(NodeLabel.newInstance("default"));
|
||||||
|
NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels);
|
||||||
|
ClientResponse response = toRM
|
||||||
|
.entity(context, APPLICATION_XML)
|
||||||
|
.accept(APPLICATION_XML)
|
||||||
|
.post(ClientResponse.class);
|
||||||
|
return response.getEntity(String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String format(String format, Object... args) {
|
||||||
|
Pattern p = Pattern.compile("\\{.*?}");
|
||||||
|
Matcher m = p.matcher(format);
|
||||||
|
String newFormat = m.replaceAll("%s");
|
||||||
|
return String.format(newFormat, args);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,21 +17,93 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.router.subcluster.capacity;
|
package org.apache.hadoop.yarn.server.router.subcluster.capacity;
|
||||||
|
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED;
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Sets;
|
import org.apache.hadoop.util.Sets;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
|
||||||
import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster;
|
import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster;
|
||||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.INFO;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.CLUSTER_USER_INFO;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.STATES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_ACTIVITIES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_STATISTICS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_STATE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_PRIORITY;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_QUEUE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUTS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_SUBMIT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_UPDATE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_DELETE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_TO_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REMOVE_NODE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REPLACE_NODE_TO_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster.format;
|
||||||
|
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST;
|
||||||
|
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -41,6 +113,8 @@ public class TestYarnFederationWithCapacityScheduler {
|
|||||||
private static TestFederationSubCluster testFederationSubCluster;
|
private static TestFederationSubCluster testFederationSubCluster;
|
||||||
private static Set<String> subClusters;
|
private static Set<String> subClusters;
|
||||||
private static final String ROUTER_WEB_ADDRESS = "http://localhost:18089";
|
private static final String ROUTER_WEB_ADDRESS = "http://localhost:18089";
|
||||||
|
private static final String SC1_RM_WEB_ADDRESS = "http://localhost:18088";
|
||||||
|
private static final String SC2_RM_WEB_ADDRESS = "http://localhost:28088";
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp()
|
public static void setUp()
|
||||||
@ -73,4 +147,469 @@ public void testGetClusterInfo() throws InterruptedException, IOException {
|
|||||||
assertTrue(subClusters.contains(clusterInfo.getSubClusterId()));
|
assertTrue(subClusters.contains(clusterInfo.getSubClusterId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfo() throws InterruptedException, IOException {
|
||||||
|
FederationClusterInfo federationClusterInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + INFO,
|
||||||
|
FederationClusterInfo.class, null, null);
|
||||||
|
List<ClusterInfo> clusterInfos = federationClusterInfo.getList();
|
||||||
|
assertNotNull(clusterInfos);
|
||||||
|
assertEquals(2, clusterInfos.size());
|
||||||
|
for (ClusterInfo clusterInfo : clusterInfos) {
|
||||||
|
assertNotNull(clusterInfo);
|
||||||
|
assertTrue(subClusters.contains(clusterInfo.getSubClusterId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClusterUserInfo() throws Exception {
|
||||||
|
FederationClusterUserInfo federationClusterUserInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + CLUSTER_USER_INFO,
|
||||||
|
FederationClusterUserInfo.class, null, null);
|
||||||
|
List<ClusterUserInfo> clusterUserInfos = federationClusterUserInfo.getList();
|
||||||
|
assertNotNull(clusterUserInfos);
|
||||||
|
assertEquals(2, clusterUserInfos.size());
|
||||||
|
for (ClusterUserInfo clusterUserInfo : clusterUserInfos) {
|
||||||
|
assertNotNull(clusterUserInfo);
|
||||||
|
assertTrue(subClusters.contains(clusterUserInfo.getSubClusterId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsInfo() throws Exception {
|
||||||
|
// It takes time to start the sub-cluster.
|
||||||
|
// We need to wait for the sub-cluster to be completely started,
|
||||||
|
// so we need to set the waiting time.
|
||||||
|
// The resources of the two sub-clusters we registered are 24C and 12G,
|
||||||
|
// so the resources that the Router should collect are 48C and 24G.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
ClusterMetricsInfo clusterMetricsInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + METRICS, ClusterMetricsInfo.class, null, null);
|
||||||
|
assertNotNull(clusterMetricsInfo);
|
||||||
|
return (48 == clusterMetricsInfo.getTotalVirtualCores() &&
|
||||||
|
24576 == clusterMetricsInfo.getTotalMB());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 5000, 50 * 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulerInfo() throws Exception {
|
||||||
|
FederationSchedulerTypeInfo schedulerTypeInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + SCHEDULER, FederationSchedulerTypeInfo.class, null, null);
|
||||||
|
assertNotNull(schedulerTypeInfo);
|
||||||
|
List<SchedulerTypeInfo> schedulerTypeInfos = schedulerTypeInfo.getList();
|
||||||
|
assertNotNull(schedulerTypeInfos);
|
||||||
|
assertEquals(2, schedulerTypeInfos.size());
|
||||||
|
for (SchedulerTypeInfo schedulerTypeInfoItem : schedulerTypeInfos) {
|
||||||
|
assertNotNull(schedulerTypeInfoItem);
|
||||||
|
assertTrue(subClusters.contains(schedulerTypeInfoItem.getSubClusterId()));
|
||||||
|
CapacitySchedulerInfo schedulerInfo =
|
||||||
|
(CapacitySchedulerInfo) schedulerTypeInfoItem.getSchedulerInfo();
|
||||||
|
assertNotNull(schedulerInfo);
|
||||||
|
assertEquals(3, schedulerInfo.getQueues().getQueueInfoList().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodesEmpty() throws Exception {
|
||||||
|
// We are in 2 sub-clusters, each with 3 nodes, so our Router should correspond to 6 nodes.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
NodesInfo nodesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, null, null);
|
||||||
|
assertNotNull(nodesInfo);
|
||||||
|
ArrayList<NodeInfo> nodes = nodesInfo.getNodes();
|
||||||
|
assertNotNull(nodes);
|
||||||
|
return (6 == nodes.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 5000, 50 * 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodesLost() throws Exception {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
NodesInfo nodesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, STATES, "LOST");
|
||||||
|
assertNotNull(nodesInfo);
|
||||||
|
ArrayList<NodeInfo> nodes = nodesInfo.getNodes();
|
||||||
|
assertNotNull(nodes);
|
||||||
|
return nodes.isEmpty();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 5000, 50 * 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNode() throws Exception {
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
NodeInfo nodeInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertNotNull(nodeInfo);
|
||||||
|
assertEquals(rm1NodeId, nodeInfo.getNodeId());
|
||||||
|
|
||||||
|
String rm2NodeId = testFederationSubCluster.getNodeId(SC2_RM_WEB_ADDRESS);
|
||||||
|
NodeInfo nodeInfo2 =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm2NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertNotNull(nodeInfo2);
|
||||||
|
assertEquals(rm2NodeId, nodeInfo2.getNodeId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateNodeResource() throws Exception {
|
||||||
|
// wait until a node shows up and check the resources
|
||||||
|
GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null,
|
||||||
|
100, 5 * 1000);
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
|
||||||
|
// assert memory and default vcores
|
||||||
|
NodeInfo nodeInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertEquals(4096, nodeInfo.getTotalResource().getMemorySize());
|
||||||
|
assertEquals(8, nodeInfo.getTotalResource().getvCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testActivies() throws Exception {
|
||||||
|
// wait until a node shows up and check the resources
|
||||||
|
GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null,
|
||||||
|
100, 5 * 1000);
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
|
||||||
|
ActivitiesInfo activitiesInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + SCHEDULER_ACTIVITIES, ActivitiesInfo.class, "nodeId", rm1NodeId);
|
||||||
|
|
||||||
|
assertNotNull(activitiesInfo);
|
||||||
|
assertEquals(rm1NodeId, activitiesInfo.getNodeId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppActivitiesXML() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppActivitiesInfo appActivitiesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + "/scheduler/app-activities/" + appId,
|
||||||
|
AppActivitiesInfo.class, null, null);
|
||||||
|
assertNotNull(appActivitiesInfo);
|
||||||
|
assertEquals(appId, appActivitiesInfo.getApplicationId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppStatistics() throws Exception {
|
||||||
|
ApplicationStatisticsInfo applicationStatisticsInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + APP_STATISTICS, ApplicationStatisticsInfo.class, STATES, "RUNNING");
|
||||||
|
assertNotNull(applicationStatisticsInfo);
|
||||||
|
ArrayList<StatisticsItemInfo> statItems = applicationStatisticsInfo.getStatItems();
|
||||||
|
assertNotNull(statItems);
|
||||||
|
assertEquals(1, statItems.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewApplication() throws Exception {
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null,
|
||||||
|
null, null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
NewApplication ci = response.getEntity(NewApplication.class);
|
||||||
|
assertNotNull(ci);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitApplicationXML() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApps() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppsInfo appsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null);
|
||||||
|
assertNotNull(appsInfo);
|
||||||
|
assertEquals(1, appsInfo.getApps().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApp() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppInfo appInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID, appId),
|
||||||
|
AppInfo.class, null, null);
|
||||||
|
assertNotNull(appInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppAttempt() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppAttemptsInfo appAttemptsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId),
|
||||||
|
AppAttemptsInfo.class, null, null);
|
||||||
|
assertNotNull(appAttemptsInfo);
|
||||||
|
ArrayList<AppAttemptInfo> attempts = appAttemptsInfo.getAttempts();
|
||||||
|
assertNotNull(attempts);
|
||||||
|
assertEquals(1, attempts.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppState() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppState appState = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId),
|
||||||
|
AppState.class, null, null);
|
||||||
|
assertNotNull(appState);
|
||||||
|
String state = appState.getState();
|
||||||
|
assertNotNull(state);
|
||||||
|
assertEquals("ACCEPTED", state);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppState() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppState appState = new AppState("KILLED");
|
||||||
|
String pathApp = RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
pathApp, null, null, appState, PUT);
|
||||||
|
assertNotNull(response);
|
||||||
|
assertEquals(SC_ACCEPTED, response.getStatus());
|
||||||
|
AppState appState1 = response.getEntity(AppState.class);
|
||||||
|
assertNotNull(appState1);
|
||||||
|
assertNotNull(appState1.getState());
|
||||||
|
assertEquals("KILLING", appState1.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppPriority() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppPriority appPriority = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId),
|
||||||
|
AppPriority.class, null, null);
|
||||||
|
assertNotNull(appPriority);
|
||||||
|
assertEquals(-1, appPriority.getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppPriority() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppPriority appPriority = new AppPriority(1);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId),
|
||||||
|
null, null, appPriority, PUT);
|
||||||
|
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
AppPriority ci = response.getEntity(AppPriority.class);
|
||||||
|
assertNotNull(ci);
|
||||||
|
assertNotNull(ci.getPriority());
|
||||||
|
assertEquals(1, ci.getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppQueue() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppQueue appQueue = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId),
|
||||||
|
AppQueue.class, null, null);
|
||||||
|
assertNotNull(appQueue);
|
||||||
|
String queue = appQueue.getQueue();
|
||||||
|
assertEquals("root.default", queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppQueue() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppQueue appQueue = new AppQueue("root.default");
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId),
|
||||||
|
null, null, appQueue, PUT);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
AppQueue appQueue1 = response.getEntity(AppQueue.class);
|
||||||
|
assertNotNull(appQueue1);
|
||||||
|
String queue1 = appQueue1.getQueue();
|
||||||
|
assertEquals("root.default", queue1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppTimeouts() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppTimeoutsInfo appTimeoutsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId),
|
||||||
|
AppTimeoutsInfo.class, null, null);
|
||||||
|
assertNotNull(appTimeoutsInfo);
|
||||||
|
ArrayList<AppTimeoutInfo> appTimeouts = appTimeoutsInfo.getAppTimeouts();
|
||||||
|
assertNotNull(appTimeouts);
|
||||||
|
assertEquals(1, appTimeouts.size());
|
||||||
|
AppTimeoutInfo appTimeoutInfo = appTimeouts.get(0);
|
||||||
|
assertNotNull(appTimeoutInfo);
|
||||||
|
assertEquals(ApplicationTimeoutType.LIFETIME, appTimeoutInfo.getTimeoutType());
|
||||||
|
assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppTimeout() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
String pathApp = RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId);
|
||||||
|
AppTimeoutInfo appTimeoutInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
pathApp + "/" + "LIFETIME", AppTimeoutInfo.class, null, null);
|
||||||
|
assertNotNull(appTimeoutInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppTimeouts() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppTimeoutInfo appTimeoutInfo = new AppTimeoutInfo();
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId),
|
||||||
|
null, null, appTimeoutInfo, PUT);
|
||||||
|
assertEquals(SC_BAD_REQUEST, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewReservation() throws Exception {
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_NEW,
|
||||||
|
null, null, null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
NewReservation ci = response.getEntity(NewReservation.class);
|
||||||
|
assertNotNull(ci);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitReservation() throws Exception {
|
||||||
|
ReservationSubmissionRequestInfo context = new ReservationSubmissionRequestInfo();
|
||||||
|
NewReservation newReservationId =
|
||||||
|
testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS);
|
||||||
|
context.setReservationId(newReservationId.getReservationId());
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, null, context, POST);
|
||||||
|
assertEquals(SC_BAD_REQUEST, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateReservation() throws Exception {
|
||||||
|
NewReservation newReservationId =
|
||||||
|
testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS);
|
||||||
|
String reservationId = newReservationId.getReservationId();
|
||||||
|
ReservationUpdateRequestInfo context = new ReservationUpdateRequestInfo();
|
||||||
|
context.setReservationId(reservationId);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, context, POST);
|
||||||
|
assertEquals(SC_BAD_REQUEST, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteReservation() throws Exception {
|
||||||
|
NewReservation newReservationId =
|
||||||
|
testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS);
|
||||||
|
String reservationId = newReservationId.getReservationId();
|
||||||
|
ReservationDeleteRequestInfo context = new ReservationDeleteRequestInfo();
|
||||||
|
context.setReservationId(reservationId);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, context, POST);
|
||||||
|
assertEquals(SC_SERVICE_UNAVAILABLE, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetClusterNodeLabels() throws Exception {
|
||||||
|
NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + GET_NODE_LABELS, NodeLabelsInfo.class, null, null);
|
||||||
|
assertNotNull(nodeLabelsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsOnNode() throws Exception {
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID_GETLABELS, rm1NodeId),
|
||||||
|
NodeLabelsInfo.class, null, null);
|
||||||
|
assertNotNull(nodeLabelsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsMappingEmpty() throws Exception {
|
||||||
|
LabelsToNodesInfo labelsToNodesInfo = TestFederationSubCluster.performGetCalls(
|
||||||
|
ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + LABEL_MAPPINGS,
|
||||||
|
LabelsToNodesInfo.class, null, null);
|
||||||
|
assertNotNull(labelsToNodesInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsMapping() throws Exception {
|
||||||
|
LabelsToNodesInfo labelsToNodesInfo = TestFederationSubCluster.performGetCalls(
|
||||||
|
ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + LABEL_MAPPINGS,
|
||||||
|
LabelsToNodesInfo.class, LABELS, "label1");
|
||||||
|
assertNotNull(labelsToNodesInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddToClusterNodeLabels() throws Exception {
|
||||||
|
List<NodeLabel> nodeLabels = new ArrayList<>();
|
||||||
|
nodeLabels.add(NodeLabel.newInstance("default"));
|
||||||
|
NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, null, null, context, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNodeToLabels() throws Exception {
|
||||||
|
NodeToLabelsInfo nodeToLabelsInfo = TestFederationSubCluster.performGetCalls(
|
||||||
|
ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + GET_NODE_TO_LABELS,
|
||||||
|
NodeToLabelsInfo.class, null, null);
|
||||||
|
assertNotNull(nodeToLabelsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveFromClusterNodeLabels() throws Exception {
|
||||||
|
testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS,
|
||||||
|
LABELS, "default", null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplaceLabelsOnNodes() throws Exception {
|
||||||
|
testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS);
|
||||||
|
NodeToLabelsEntryList context = new NodeToLabelsEntryList();
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS,
|
||||||
|
null, null, context, POST);
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,21 +17,99 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.router.subcluster.fair;
|
package org.apache.hadoop.yarn.server.router.subcluster.fair;
|
||||||
|
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED;
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Sets;
|
import org.apache.hadoop.util.Sets;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerQueueInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
|
||||||
import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster;
|
import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster;
|
||||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||||
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.INFO;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.CLUSTER_USER_INFO;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.STATES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_REPLACE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_ACTIVITIES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_STATISTICS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_ID;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_STATE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_PRIORITY;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_QUEUE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUTS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_SUBMIT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_UPDATE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_DELETE;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_TO_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REMOVE_NODE_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REPLACE_NODE_TO_LABELS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODE_RESOURCE;
|
||||||
|
import static org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster.format;
|
||||||
|
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST;
|
||||||
|
import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT;
|
||||||
|
import static org.apache.http.HttpStatus.SC_OK;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -40,6 +118,8 @@ public class TestYarnFederationWithFairScheduler {
|
|||||||
private static TestFederationSubCluster testFederationSubCluster;
|
private static TestFederationSubCluster testFederationSubCluster;
|
||||||
private static Set<String> subClusters;
|
private static Set<String> subClusters;
|
||||||
private static final String ROUTER_WEB_ADDRESS = "http://localhost:28089";
|
private static final String ROUTER_WEB_ADDRESS = "http://localhost:28089";
|
||||||
|
private static final String SC1_RM_WEB_ADDRESS = "http://localhost:38088";
|
||||||
|
private static final String SC2_RM_WEB_ADDRESS = "http://localhost:48088";
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp()
|
public static void setUp()
|
||||||
@ -72,4 +152,488 @@ public void testGetClusterInfo() throws InterruptedException, IOException {
|
|||||||
assertTrue(subClusters.contains(clusterInfo.getSubClusterId()));
|
assertTrue(subClusters.contains(clusterInfo.getSubClusterId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfo() throws InterruptedException, IOException {
|
||||||
|
FederationClusterInfo federationClusterInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + INFO,
|
||||||
|
FederationClusterInfo.class, null, null);
|
||||||
|
List<ClusterInfo> clusterInfos = federationClusterInfo.getList();
|
||||||
|
assertNotNull(clusterInfos);
|
||||||
|
assertEquals(2, clusterInfos.size());
|
||||||
|
for (ClusterInfo clusterInfo : clusterInfos) {
|
||||||
|
assertNotNull(clusterInfo);
|
||||||
|
assertTrue(subClusters.contains(clusterInfo.getSubClusterId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClusterUserInfo() throws Exception {
|
||||||
|
FederationClusterUserInfo federationClusterUserInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + CLUSTER_USER_INFO,
|
||||||
|
FederationClusterUserInfo.class, null, null);
|
||||||
|
List<ClusterUserInfo> clusterUserInfos = federationClusterUserInfo.getList();
|
||||||
|
assertNotNull(clusterUserInfos);
|
||||||
|
assertEquals(2, clusterUserInfos.size());
|
||||||
|
for (ClusterUserInfo clusterUserInfo : clusterUserInfos) {
|
||||||
|
assertNotNull(clusterUserInfo);
|
||||||
|
assertTrue(subClusters.contains(clusterUserInfo.getSubClusterId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsInfo() throws Exception {
|
||||||
|
// It takes time to start the sub-cluster.
|
||||||
|
// We need to wait for the sub-cluster to be completely started,
|
||||||
|
// so we need to set the waiting time.
|
||||||
|
// The resources of the two sub-clusters we registered are 24C and 12G,
|
||||||
|
// so the resources that the Router should collect are 48C and 24G.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
ClusterMetricsInfo clusterMetricsInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + METRICS, ClusterMetricsInfo.class, null, null);
|
||||||
|
assertNotNull(clusterMetricsInfo);
|
||||||
|
return (48 == clusterMetricsInfo.getTotalVirtualCores() &&
|
||||||
|
24576 == clusterMetricsInfo.getTotalMB());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 5000, 50 * 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulerInfo() throws Exception {
|
||||||
|
FederationSchedulerTypeInfo schedulerTypeInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + SCHEDULER, FederationSchedulerTypeInfo.class, null, null);
|
||||||
|
assertNotNull(schedulerTypeInfo);
|
||||||
|
List<SchedulerTypeInfo> schedulerTypeInfos = schedulerTypeInfo.getList();
|
||||||
|
assertNotNull(schedulerTypeInfos);
|
||||||
|
assertEquals(2, schedulerTypeInfos.size());
|
||||||
|
for (SchedulerTypeInfo schedulerTypeInfoItem : schedulerTypeInfos) {
|
||||||
|
assertNotNull(schedulerTypeInfoItem);
|
||||||
|
assertTrue(subClusters.contains(schedulerTypeInfoItem.getSubClusterId()));
|
||||||
|
FairSchedulerQueueInfo rootQueueInfo =
|
||||||
|
((FairSchedulerInfo) schedulerTypeInfoItem.getSchedulerInfo()).getRootQueueInfo();
|
||||||
|
assertNotNull(rootQueueInfo);
|
||||||
|
assertEquals("fair", rootQueueInfo.getSchedulingPolicy());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodesEmpty() throws Exception {
|
||||||
|
// We are in 2 sub-clusters, each with 3 nodes, so our Router should correspond to 6 nodes.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
NodesInfo nodesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, null, null);
|
||||||
|
assertNotNull(nodesInfo);
|
||||||
|
ArrayList<NodeInfo> nodes = nodesInfo.getNodes();
|
||||||
|
assertNotNull(nodes);
|
||||||
|
return (6 == nodes.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 5000, 50 * 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodesLost() throws Exception {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
NodesInfo nodesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, STATES, "LOST");
|
||||||
|
assertNotNull(nodesInfo);
|
||||||
|
ArrayList<NodeInfo> nodes = nodesInfo.getNodes();
|
||||||
|
assertNotNull(nodes);
|
||||||
|
return nodes.isEmpty();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 5000, 50 * 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNode() throws Exception {
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
NodeInfo nodeInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertNotNull(nodeInfo);
|
||||||
|
assertEquals(rm1NodeId, nodeInfo.getNodeId());
|
||||||
|
|
||||||
|
String rm2NodeId = testFederationSubCluster.getNodeId(SC2_RM_WEB_ADDRESS);
|
||||||
|
NodeInfo nodeInfo2 =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm2NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertNotNull(nodeInfo2);
|
||||||
|
assertEquals(rm2NodeId, nodeInfo2.getNodeId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateNodeResource() throws Exception {
|
||||||
|
// wait until a node shows up and check the resources
|
||||||
|
GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null,
|
||||||
|
100, 5 * 1000);
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
|
||||||
|
// assert memory and default vcores
|
||||||
|
NodeInfo nodeInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertEquals(4096, nodeInfo.getTotalResource().getMemorySize());
|
||||||
|
assertEquals(8, nodeInfo.getTotalResource().getvCores());
|
||||||
|
|
||||||
|
Resource resource = Resource.newInstance(4096, 5);
|
||||||
|
ResourceOptionInfo resourceOption = new ResourceOptionInfo(
|
||||||
|
ResourceOption.newInstance(resource, 1000));
|
||||||
|
ClientResponse routerResponse = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODE_RESOURCE, rm1NodeId),
|
||||||
|
null, null, resourceOption, POST);
|
||||||
|
JSONObject json = routerResponse.getEntity(JSONObject.class);
|
||||||
|
JSONObject totalResource = json.getJSONObject("resourceInfo");
|
||||||
|
assertEquals(resource.getMemorySize(), totalResource.getLong("memory"));
|
||||||
|
assertEquals(resource.getVirtualCores(), totalResource.getLong("vCores"));
|
||||||
|
|
||||||
|
// assert updated memory and cores
|
||||||
|
NodeInfo nodeInfo1 = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID, rm1NodeId),
|
||||||
|
NodeInfo.class, null, null);
|
||||||
|
assertEquals(4096, nodeInfo1.getTotalResource().getMemorySize());
|
||||||
|
assertEquals(5, nodeInfo1.getTotalResource().getvCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testActivies() throws Exception {
|
||||||
|
// wait until a node shows up and check the resources
|
||||||
|
GenericTestUtils.waitFor(() -> testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS) != null,
|
||||||
|
100, 5 * 1000);
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
|
||||||
|
ActivitiesInfo activitiesInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + SCHEDULER_ACTIVITIES, ActivitiesInfo.class, "nodeId", rm1NodeId);
|
||||||
|
|
||||||
|
assertNotNull(activitiesInfo);
|
||||||
|
assertEquals(rm1NodeId, activitiesInfo.getNodeId());
|
||||||
|
assertEquals("Not Capacity Scheduler", activitiesInfo.getDiagnostic());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppActivities() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppActivitiesInfo appActivitiesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + "/scheduler/app-activities/" + appId,
|
||||||
|
AppActivitiesInfo.class, APP_ID, appId);
|
||||||
|
assertNotNull(appActivitiesInfo);
|
||||||
|
assertEquals(appId, appActivitiesInfo.getApplicationId());
|
||||||
|
assertEquals("Not Capacity Scheduler", appActivitiesInfo.getDiagnostic());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppStatistics() throws Exception {
|
||||||
|
ApplicationStatisticsInfo applicationStatisticsInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + APP_STATISTICS, ApplicationStatisticsInfo.class, STATES, "RUNNING");
|
||||||
|
assertNotNull(applicationStatisticsInfo);
|
||||||
|
ArrayList<StatisticsItemInfo> statItems = applicationStatisticsInfo.getStatItems();
|
||||||
|
assertNotNull(statItems);
|
||||||
|
assertEquals(1, statItems.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewApplication() throws Exception {
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null,
|
||||||
|
null, null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
NewApplication ci = response.getEntity(NewApplication.class);
|
||||||
|
assertNotNull(ci);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitApplication() {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApps() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppsInfo appsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null);
|
||||||
|
assertNotNull(appsInfo);
|
||||||
|
assertEquals(1, appsInfo.getApps().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppAttempt() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppAttemptsInfo appAttemptsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId),
|
||||||
|
AppAttemptsInfo.class, null, null);
|
||||||
|
assertNotNull(appAttemptsInfo);
|
||||||
|
ArrayList<AppAttemptInfo> attempts = appAttemptsInfo.getAttempts();
|
||||||
|
assertNotNull(attempts);
|
||||||
|
assertEquals(1, attempts.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppState() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppState appState = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId),
|
||||||
|
AppState.class, null, null);
|
||||||
|
assertNotNull(appState);
|
||||||
|
String state = appState.getState();
|
||||||
|
assertNotNull(state);
|
||||||
|
assertEquals("ACCEPTED", state);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppState() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppState appState = new AppState("KILLED");
|
||||||
|
String pathApp = RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
pathApp, null, null, appState, PUT);
|
||||||
|
assertNotNull(response);
|
||||||
|
assertEquals(SC_ACCEPTED, response.getStatus());
|
||||||
|
AppState appState1 = response.getEntity(AppState.class);
|
||||||
|
assertNotNull(appState1);
|
||||||
|
assertNotNull(appState1.getState());
|
||||||
|
assertEquals("KILLING", appState1.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppPriority() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
assertNotNull(appId);
|
||||||
|
AppPriority appPriority = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId),
|
||||||
|
AppPriority.class, null, null);
|
||||||
|
assertNotNull(appPriority);
|
||||||
|
assertEquals(0, appPriority.getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppPriority() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppPriority appPriority = new AppPriority(1);
|
||||||
|
// FairScheduler does not support Update Application Priority.
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId),
|
||||||
|
null, null, appPriority, PUT);
|
||||||
|
assertEquals(SC_SERVICE_UNAVAILABLE, response.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppQueue() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppQueue appQueue = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId),
|
||||||
|
AppQueue.class, null, null);
|
||||||
|
assertNotNull(appQueue);
|
||||||
|
String queue = appQueue.getQueue();
|
||||||
|
assertEquals("root.dr_dot_who", queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppQueue() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppQueue appQueue = new AppQueue("root.a");
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId),
|
||||||
|
null, null, appQueue, PUT);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
AppQueue appQueue1 = response.getEntity(AppQueue.class);
|
||||||
|
assertNotNull(appQueue1);
|
||||||
|
String queue1 = appQueue1.getQueue();
|
||||||
|
assertEquals("root.a", queue1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppTimeouts() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppTimeoutsInfo appTimeoutsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId),
|
||||||
|
AppTimeoutsInfo.class, null, null);
|
||||||
|
assertNotNull(appTimeoutsInfo);
|
||||||
|
ArrayList<AppTimeoutInfo> appTimeouts = appTimeoutsInfo.getAppTimeouts();
|
||||||
|
assertNotNull(appTimeouts);
|
||||||
|
assertEquals(1, appTimeouts.size());
|
||||||
|
AppTimeoutInfo appTimeoutInfo = appTimeouts.get(0);
|
||||||
|
assertNotNull(appTimeoutInfo);
|
||||||
|
assertEquals(ApplicationTimeoutType.LIFETIME, appTimeoutInfo.getTimeoutType());
|
||||||
|
assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppTimeout() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
String pathApp = RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId);
|
||||||
|
AppTimeoutInfo appTimeoutInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
pathApp + "/" + "LIFETIME", AppTimeoutInfo.class, null, null);
|
||||||
|
assertNotNull(appTimeoutInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppTimeouts() throws Exception {
|
||||||
|
String appId = testFederationSubCluster.submitApplication(ROUTER_WEB_ADDRESS);
|
||||||
|
AppTimeoutInfo appTimeoutInfo = new AppTimeoutInfo();
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId),
|
||||||
|
null, null, appTimeoutInfo, PUT);
|
||||||
|
assertEquals(SC_BAD_REQUEST, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewReservation() throws Exception {
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_NEW,
|
||||||
|
null, null, null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
NewReservation ci = response.getEntity(NewReservation.class);
|
||||||
|
assertNotNull(ci);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitReservation() throws Exception {
|
||||||
|
ReservationSubmissionRequestInfo context = new ReservationSubmissionRequestInfo();
|
||||||
|
NewReservation newReservationId =
|
||||||
|
testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS);
|
||||||
|
context.setReservationId(newReservationId.getReservationId());
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, null, context, POST);
|
||||||
|
assertEquals(SC_BAD_REQUEST, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateReservation() throws Exception {
|
||||||
|
NewReservation newReservationId =
|
||||||
|
testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS);
|
||||||
|
String reservationId = newReservationId.getReservationId();
|
||||||
|
ReservationUpdateRequestInfo context = new ReservationUpdateRequestInfo();
|
||||||
|
context.setReservationId(reservationId);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, context, POST);
|
||||||
|
assertEquals(SC_BAD_REQUEST, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteReservation() throws Exception {
|
||||||
|
NewReservation newReservationId =
|
||||||
|
testFederationSubCluster.getNewReservationId(ROUTER_WEB_ADDRESS);
|
||||||
|
String reservationId = newReservationId.getReservationId();
|
||||||
|
ReservationDeleteRequestInfo context = new ReservationDeleteRequestInfo();
|
||||||
|
context.setReservationId(reservationId);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, context, POST);
|
||||||
|
assertEquals(SC_SERVICE_UNAVAILABLE, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetClusterNodeLabels() throws Exception {
|
||||||
|
NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + GET_NODE_LABELS, NodeLabelsInfo.class, null, null);
|
||||||
|
assertNotNull(nodeLabelsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsOnNode() throws Exception {
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
NodeLabelsInfo nodeLabelsInfo = TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + format(NODES_NODEID_GETLABELS, rm1NodeId),
|
||||||
|
NodeLabelsInfo.class, null, null);
|
||||||
|
assertNotNull(nodeLabelsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsMappingEmpty() throws Exception {
|
||||||
|
LabelsToNodesInfo labelsToNodesInfo =
|
||||||
|
TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, LabelsToNodesInfo.class, null, null);
|
||||||
|
assertNotNull(labelsToNodesInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLabelsMapping() throws Exception {
|
||||||
|
LabelsToNodesInfo labelsToNodesInfo = TestFederationSubCluster.performGetCalls(
|
||||||
|
ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + LABEL_MAPPINGS,
|
||||||
|
LabelsToNodesInfo.class, LABELS, "label1");
|
||||||
|
assertNotNull(labelsToNodesInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddToClusterNodeLabels() throws Exception {
|
||||||
|
List<NodeLabel> nodeLabels = new ArrayList<>();
|
||||||
|
nodeLabels.add(NodeLabel.newInstance("default"));
|
||||||
|
NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, null, null, context, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNodeToLabels() throws Exception {
|
||||||
|
NodeToLabelsInfo nodeToLabelsInfo = TestFederationSubCluster.performGetCalls(
|
||||||
|
ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH + GET_NODE_TO_LABELS,
|
||||||
|
NodeToLabelsInfo.class, null, null);
|
||||||
|
assertNotNull(nodeToLabelsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveFromClusterNodeLabels() throws Exception {
|
||||||
|
testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS,
|
||||||
|
LABELS, "default", null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplaceLabelsOnNodes() throws Exception {
|
||||||
|
testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS);
|
||||||
|
NodeToLabelsEntryList context = new NodeToLabelsEntryList();
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS,
|
||||||
|
null, null, context, POST);
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplaceLabelsOnNode() throws Exception {
|
||||||
|
String rm1NodeId = testFederationSubCluster.getNodeId(SC1_RM_WEB_ADDRESS);
|
||||||
|
String pathNode = RM_WEB_SERVICE_PATH +
|
||||||
|
format(NODES_NODEID_REPLACE_LABELS, rm1NodeId);
|
||||||
|
testFederationSubCluster.addNodeLabel(ROUTER_WEB_ADDRESS);
|
||||||
|
ClientResponse response = TestFederationSubCluster.performCall(ROUTER_WEB_ADDRESS,
|
||||||
|
pathNode, LABELS, "default", null, POST);
|
||||||
|
assertEquals(SC_OK, response.getStatus());
|
||||||
|
String entity = response.getEntity(String.class);
|
||||||
|
assertNotNull(entity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,4 +47,8 @@
|
|||||||
<name>yarn.resourcemanager.cluster-id</name>
|
<name>yarn.resourcemanager.cluster-id</name>
|
||||||
<value>local-cluster</value>
|
<value>local-cluster</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>yarn.router.interceptor.allow-partial-result.enable</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
Loading…
Reference in New Issue
Block a user