YARN-8972. [Router] Add support to prevent DoS attack over ApplicationSubmissionContext size. (#5382)

This commit is contained in:
slfan1989 2023-03-09 05:29:30 +08:00 committed by GitHub
parent 487368c4b9
commit b406060c6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 711 additions and 6 deletions

View File

@ -4249,6 +4249,13 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
/**
* ApplicationSubmissionContextInterceptor configurations.
**/
public static final String ROUTER_ASC_INTERCEPTOR_MAX_SIZE =
ROUTER_PREFIX + "asc-interceptor-max-size";
public static final String DEFAULT_ROUTER_ASC_INTERCEPTOR_MAX_SIZE = "1MB";
/**
* The interceptor class used in FederationInterceptorREST should return
* partial AppReports.

View File

@ -5117,6 +5117,16 @@
</description>
</property>
<property>
<name>yarn.router.asc-interceptor-max-size</name>
<value>1MB</value>
<description>
We define the size limit of ApplicationSubmissionContext.
If the size of the ApplicationSubmissionContext is larger than this value,
We will throw an exception. the default value is 1MB.
</description>
</property>
<property>
<description>
The number of threads to use for the Router scheduled executor service.

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.server.router;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.protobuf.GeneratedMessageV3;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
@ -32,10 +35,18 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -43,6 +54,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@ -624,4 +637,118 @@ public final class RouterServerUtil {
return definition;
}
/**
* Checks if the ApplicationSubmissionContext submitted with the application
* is valid.
*
* Current checks:
* - if its size is within limits.
*
* @param appContext the app context to check.
* @throws IOException if an IO error occurred.
* @throws YarnException yarn exception.
*/
@Public
@Unstable
public static void checkAppSubmissionContext(ApplicationSubmissionContextPBImpl appContext,
Configuration conf) throws IOException, YarnException {
// Prevents DoS over the ApplicationClientProtocol by checking the context
// the application was submitted with for any excessively large fields.
double bytesOfMaxAscSize = conf.getStorageSize(
YarnConfiguration.ROUTER_ASC_INTERCEPTOR_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_ASC_INTERCEPTOR_MAX_SIZE, StorageUnit.BYTES);
if (appContext != null) {
int bytesOfSerializedSize = appContext.getProto().getSerializedSize();
if (bytesOfSerializedSize >= bytesOfMaxAscSize) {
logContainerLaunchContext(appContext);
String applicationId = appContext.getApplicationId().toString();
String limit = StringUtils.byteDesc((long) bytesOfMaxAscSize);
String appContentSize = StringUtils.byteDesc(bytesOfSerializedSize);
String errMsg = String.format(
"The size of the ApplicationSubmissionContext of the application %s is " +
"above the limit %s, size = %s.", applicationId, limit, appContentSize);
LOG.error(errMsg);
throw new YarnException(errMsg);
}
}
}
/**
* Private helper for checkAppSubmissionContext that logs the fields in the
* context for debugging.
*
* @param appContext the app context.
* @throws IOException if an IO error occurred.
*/
@Private
@Unstable
private static void logContainerLaunchContext(ApplicationSubmissionContextPBImpl appContext)
throws IOException {
if (appContext == null || appContext.getAMContainerSpec() == null ||
!(appContext.getAMContainerSpec() instanceof ContainerLaunchContextPBImpl)) {
return;
}
ContainerLaunchContext launchContext = appContext.getAMContainerSpec();
ContainerLaunchContextPBImpl clc = (ContainerLaunchContextPBImpl) launchContext;
LOG.warn("ContainerLaunchContext size: {}.", clc.getProto().getSerializedSize());
// ContainerLaunchContext contains:
// 1) Map<String, LocalResource> localResources,
List<StringLocalResourceMapProto> lrs = clc.getProto().getLocalResourcesList();
logContainerLaunchContext("LocalResource size: {}. Length: {}.", lrs);
// 2) Map<String, String> environment, List<String> commands,
List<StringStringMapProto> envs = clc.getProto().getEnvironmentList();
logContainerLaunchContext("Environment size: {}. Length: {}.", envs);
List<String> cmds = clc.getCommands();
if (CollectionUtils.isNotEmpty(cmds)) {
LOG.warn("Commands size: {}. Length: {}.", cmds.size(), serialize(cmds).length);
}
// 3) Map<String, ByteBuffer> serviceData,
List<StringBytesMapProto> serviceData = clc.getProto().getServiceDataList();
logContainerLaunchContext("ServiceData size: {}. Length: {}.", serviceData);
// 4) Map<ApplicationAccessType, String> acls
List<ApplicationACLMapProto> acls = clc.getProto().getApplicationACLsList();
logContainerLaunchContext("ACLs size: {}. Length: {}.", acls);
}
/**
* Log ContainerLaunchContext Data SerializedSize.
*
* @param format format of logging.
* @param lists data list.
* @param <R> generic type R.
*/
private static <R extends GeneratedMessageV3> void logContainerLaunchContext(String format,
List<R> lists) {
if (CollectionUtils.isNotEmpty(lists)) {
int sumLength = 0;
for (R item : lists) {
sumLength += item.getSerializedSize();
}
LOG.warn(format, lists.size(), sumLength);
}
}
/**
* Serialize an object in ByteArray.
*
* @return obj ByteArray.
* @throws IOException if an IO error occurred.
*/
@Private
@Unstable
private static byte[] serialize(Object obj) throws IOException {
try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
try (ObjectOutputStream o = new ObjectOutputStream(b)) {
o.writeObject(obj);
}
return b.toByteArray();
}
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.router.RouterAuditLogger;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_CLIENT_RM_SERVICE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN;
/**
* It prevents DoS attack over the ApplicationClientProtocol. Currently, it
* checks the size of the ApplicationSubmissionContext. If it exceeds the limit
* it can cause Zookeeper failures.
*/
public class ApplicationSubmissionContextInterceptor extends PassThroughClientRequestInterceptor {
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
if (request == null || request.getApplicationSubmissionContext() == null ||
request.getApplicationSubmissionContext().getApplicationId() == null) {
RouterMetrics.getMetrics().incrAppsFailedSubmitted();
String errMsg =
"Missing submitApplication request or applicationSubmissionContext information.";
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, errMsg);
RouterServerUtil.logAndThrowException(errMsg, null);
}
ApplicationSubmissionContext appContext = request.getApplicationSubmissionContext();
ApplicationSubmissionContextPBImpl asc = (ApplicationSubmissionContextPBImpl) appContext;
// Check for excessively large fields, throw exception if found
RouterServerUtil.checkAppSubmissionContext(asc, getConf());
// Check succeeded - app submit will be passed on to the next interceptor
return getNextInterceptor().submitApplication(request);
}
}

View File

@ -0,0 +1,315 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Interceptor that does not do anything other than forwarding it to the next
* Interceptor in the chain.
*/
public class PassThroughClientRequestInterceptor extends AbstractClientRequestInterceptor {
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
return getNextInterceptor().getNewApplication(request);
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
return getNextInterceptor().submitApplication(request);
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
return getNextInterceptor().forceKillApplication(request);
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
return getNextInterceptor().getClusterMetrics(request);
}
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
return getNextInterceptor().getClusterNodes(request);
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
return getNextInterceptor().getQueueInfo(request);
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
return getNextInterceptor().getQueueUserAcls(request);
}
@Override
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request)
throws YarnException, IOException {
return getNextInterceptor().moveApplicationAcrossQueues(request);
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
return getNextInterceptor().getNewReservation(request);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
return getNextInterceptor().submitReservation(request);
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
return getNextInterceptor().listReservations(request);
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
return getNextInterceptor().updateReservation(request);
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
return getNextInterceptor().deleteReservation(request);
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
return getNextInterceptor().getNodeToLabels(request);
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
return getNextInterceptor().getLabelsToNodes(request);
}
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
return getNextInterceptor().getClusterNodeLabels(request);
}
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
return getNextInterceptor().getApplicationReport(request);
}
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
return getNextInterceptor().getApplications(request);
}
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
return getNextInterceptor().getApplicationAttemptReport(request);
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
return getNextInterceptor().getApplicationAttempts(request);
}
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
return getNextInterceptor().getContainerReport(request);
}
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
return getNextInterceptor().getContainers(request);
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
return getNextInterceptor().getDelegationToken(request);
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
return getNextInterceptor().renewDelegationToken(request);
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
return getNextInterceptor().cancelDelegationToken(request);
}
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
return getNextInterceptor().failApplicationAttempt(request);
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request)
throws YarnException, IOException {
return getNextInterceptor().updateApplicationPriority(request);
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return getNextInterceptor().signalToContainer(request);
}
@Override
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
return getNextInterceptor().updateApplicationTimeouts(request);
}
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
return getNextInterceptor().getResourceProfiles(request);
}
@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
return getNextInterceptor().getResourceProfile(request);
}
@Override
public GetAllResourceTypeInfoResponse getResourceTypeInfo(
GetAllResourceTypeInfoRequest request) throws YarnException, IOException {
return getNextInterceptor().getResourceTypeInfo(request);
}
@Override
public GetAttributesToNodesResponse getAttributesToNodes(
GetAttributesToNodesRequest request) throws YarnException, IOException {
return getNextInterceptor().getAttributesToNodes(request);
}
@Override
public GetClusterNodeAttributesResponse getClusterNodeAttributes(
GetClusterNodeAttributesRequest request)
throws YarnException, IOException {
return getNextInterceptor().getClusterNodeAttributes(request);
}
@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
return getNextInterceptor().getNodesToAttributes(request);
}
}

View File

@ -112,6 +112,7 @@ public final class RouterWebServiceUtil {
* @param additionalParam the query parameters as input for a specific REST
* call in case the call has no servlet request
* @param client same client used to reduce number of clients created
* @param conf configuration
* @return the retrieved entity from the REST call
*/
protected static <T> T genericForward(final String webApp,

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.junit.Test;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code ApplicationSubmissionContextInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request interceptor chains.
*/
public class TestApplicationSubmissionContextInterceptor extends BaseRouterClientRMTest {
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request interceptor pipeline for testing. The last one in the
// chain is the application submission context interceptor that checks
// for exceeded submission context size
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," +
ApplicationSubmissionContextInterceptor.class.getName() + "," +
MockClientRequestInterceptor.class.getName());
// Lower the max application context size
conf.set(YarnConfiguration.ROUTER_ASC_INTERCEPTOR_MAX_SIZE, "512B");
return conf;
}
/**
* This test validates the correctness of SubmitApplication in case of empty
* request.
* @throws Exception error occur.
*/
@Test
public void testSubmitApplicationEmptyRequest() throws Exception {
MockRouterClientRMService rmService = getRouterClientRMService();
LambdaTestUtils.intercept(YarnException.class,
"Missing submitApplication request or applicationSubmissionContext information.",
() -> rmService.submitApplication(null));
ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
null, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
LambdaTestUtils.intercept(YarnException.class,
"Missing submitApplication request or applicationSubmissionContext information.",
() -> rmService.submitApplication(null));
}
/**
* This test validates the correctness of SubmitApplication by setting up
* null, valid, and large ContainerLaunchContexts.
* @throws Exception error occur.
*/
@Test
public void testCLCExceedSize() throws Exception {
ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(1, 1), "test", "default",
Priority.newInstance(0), null, false, true, 2,
Resource.newInstance(10, 2), "test");
LocalResource localResource = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources = new HashMap<>();
localResources.put("rsrc", localResource);
Map<String, String> env = new HashMap<>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<>();
serviceData.put("someservice", ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens = ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls = new HashMap<>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds, serviceData, containerTokens, acls);
ApplicationSubmissionContextPBImpl appSubmissionContextPB =
(ApplicationSubmissionContextPBImpl) context;
Configuration configuration = getConf();
// Null ApplicationSubmissionContext
RouterServerUtil.checkAppSubmissionContext(null, configuration);
// Null ContainerLaunchContext
RouterServerUtil.checkAppSubmissionContext(appSubmissionContextPB, configuration);
// Valid ContainerLaunchContext
context.setAMContainerSpec(clc);
RouterServerUtil.checkAppSubmissionContext(appSubmissionContextPB, configuration);
// ContainerLaunchContext exceeds 1MB
for (int i = 0; i < 1000; i++) {
localResources.put("rsrc" + i, localResource);
}
ContainerLaunchContext clcExceedSize = ContainerLaunchContext.newInstance(
localResources, env, containerCmds, serviceData, containerTokens, acls);
context.setAMContainerSpec(clcExceedSize);
LambdaTestUtils.intercept(YarnException.class,
"The size of the ApplicationSubmissionContext of the application",
() -> RouterServerUtil.checkAppSubmissionContext(appSubmissionContextPB, configuration));
}
}

View File

@ -91,8 +91,7 @@ of the desirable properties of balance, optimal cluster utilization and global i
This part of the federation system is part of future work in [YARN-5597](https://issues.apache.org/jira/browse/YARN-5597).
###Federation State-Store
### Federation State-Store
The Federation State defines the additional state that needs to be maintained to loosely couple multiple individual sub-clusters into a single large federated cluster. This includes the following information:
####Sub-cluster Membership
@ -255,10 +254,30 @@ Optional:
These are extra configurations that should appear in the **conf/yarn-site.xml** at each Router.
| Property | Example | Description |
|:---- |:---- |:---- |
|`yarn.router.bind-host` | `0.0.0.0` | Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0. |
| `yarn.router.clientrm.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor. |
| Property | Example | Description |
|:--------------------------------------------------|:----------------------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `yarn.router.bind-host` | `0.0.0.0` | Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0. |
| `yarn.router.clientrm.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor. |
> Enable ApplicationSubmissionContextInterceptor
- If the `FederationStateStore` is configured with `Zookpeer` storage, the app information will be stored in `Zookpeer`. If the size of the app information exceeds `1MB`, `Zookpeer` may fail. `ApplicationSubmissionContextInterceptor` will check the size of `ApplicationSubmissionContext`, if the size exceeds the limit(default 1MB), an exception will be thrown.
- The size of the ApplicationSubmissionContext of the application application_123456789_0001 is above the limit. Size = 1.02 MB.
- The required configuration is as follows:
```
<property>
<name>yarn.router.clientrm.interceptor-class.pipeline</name>
<value>org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor,
org.apache.hadoop.yarn.server.router.clientrm.ApplicationSubmissionContextInterceptor,
org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor</value>
</property>
<property>
<name>yarn.router.asc-interceptor-max-size</name>
<value>1MB</value>
</property>
```
Optional: