From b406060c6b04a6115a920ad6800ee4a52a2f291f Mon Sep 17 00:00:00 2001
From: slfan1989 <55643692+slfan1989@users.noreply.github.com>
Date: Thu, 9 Mar 2023 05:29:30 +0800
Subject: [PATCH] YARN-8972. [Router] Add support to prevent DoS attack over
ApplicationSubmissionContext size. (#5382)
---
.../hadoop/yarn/conf/YarnConfiguration.java | 7 +
.../src/main/resources/yarn-default.xml | 10 +
.../yarn/server/router/RouterServerUtil.java | 127 +++++++
...plicationSubmissionContextInterceptor.java | 66 ++++
.../PassThroughClientRequestInterceptor.java | 315 ++++++++++++++++++
.../router/webapp/RouterWebServiceUtil.java | 1 +
...plicationSubmissionContextInterceptor.java | 160 +++++++++
.../src/site/markdown/Federation.md | 31 +-
8 files changed, 711 insertions(+), 6 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ApplicationSubmissionContextInterceptor.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestApplicationSubmissionContextInterceptor.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 699059f068..eb7d3143ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4249,6 +4249,13 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
+ /**
+ * ApplicationSubmissionContextInterceptor configurations.
+ **/
+ public static final String ROUTER_ASC_INTERCEPTOR_MAX_SIZE =
+ ROUTER_PREFIX + "asc-interceptor-max-size";
+ public static final String DEFAULT_ROUTER_ASC_INTERCEPTOR_MAX_SIZE = "1MB";
+
/**
* The interceptor class used in FederationInterceptorREST should return
* partial AppReports.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dc58f2f828..ab42233078 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5117,6 +5117,16 @@
+
+ yarn.router.asc-interceptor-max-size
+ 1MB
+
+ We define the size limit of ApplicationSubmissionContext.
+ If the size of the ApplicationSubmissionContext is larger than this value,
+ We will throw an exception. the default value is 1MB.
+
+
+
The number of threads to use for the Router scheduled executor service.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 8fa6ca2f05..0dbead33f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.server.router;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.thirdparty.protobuf.GeneratedMessageV3;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
@@ -32,10 +35,18 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -43,6 +54,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -624,4 +637,118 @@ public final class RouterServerUtil {
return definition;
}
+
+ /**
+ * Checks if the ApplicationSubmissionContext submitted with the application
+ * is valid.
+ *
+ * Current checks:
+ * - if its size is within limits.
+ *
+ * @param appContext the app context to check.
+ * @throws IOException if an IO error occurred.
+ * @throws YarnException yarn exception.
+ */
+ @Public
+ @Unstable
+ public static void checkAppSubmissionContext(ApplicationSubmissionContextPBImpl appContext,
+ Configuration conf) throws IOException, YarnException {
+ // Prevents DoS over the ApplicationClientProtocol by checking the context
+ // the application was submitted with for any excessively large fields.
+ double bytesOfMaxAscSize = conf.getStorageSize(
+ YarnConfiguration.ROUTER_ASC_INTERCEPTOR_MAX_SIZE,
+ YarnConfiguration.DEFAULT_ROUTER_ASC_INTERCEPTOR_MAX_SIZE, StorageUnit.BYTES);
+ if (appContext != null) {
+ int bytesOfSerializedSize = appContext.getProto().getSerializedSize();
+ if (bytesOfSerializedSize >= bytesOfMaxAscSize) {
+ logContainerLaunchContext(appContext);
+ String applicationId = appContext.getApplicationId().toString();
+ String limit = StringUtils.byteDesc((long) bytesOfMaxAscSize);
+ String appContentSize = StringUtils.byteDesc(bytesOfSerializedSize);
+ String errMsg = String.format(
+ "The size of the ApplicationSubmissionContext of the application %s is " +
+ "above the limit %s, size = %s.", applicationId, limit, appContentSize);
+ LOG.error(errMsg);
+ throw new YarnException(errMsg);
+ }
+ }
+ }
+
+ /**
+ * Private helper for checkAppSubmissionContext that logs the fields in the
+ * context for debugging.
+ *
+ * @param appContext the app context.
+ * @throws IOException if an IO error occurred.
+ */
+ @Private
+ @Unstable
+ private static void logContainerLaunchContext(ApplicationSubmissionContextPBImpl appContext)
+ throws IOException {
+ if (appContext == null || appContext.getAMContainerSpec() == null ||
+ !(appContext.getAMContainerSpec() instanceof ContainerLaunchContextPBImpl)) {
+ return;
+ }
+
+ ContainerLaunchContext launchContext = appContext.getAMContainerSpec();
+ ContainerLaunchContextPBImpl clc = (ContainerLaunchContextPBImpl) launchContext;
+ LOG.warn("ContainerLaunchContext size: {}.", clc.getProto().getSerializedSize());
+
+ // ContainerLaunchContext contains:
+ // 1) Map localResources,
+ List lrs = clc.getProto().getLocalResourcesList();
+ logContainerLaunchContext("LocalResource size: {}. Length: {}.", lrs);
+
+ // 2) Map environment, List commands,
+ List envs = clc.getProto().getEnvironmentList();
+ logContainerLaunchContext("Environment size: {}. Length: {}.", envs);
+
+ List cmds = clc.getCommands();
+ if (CollectionUtils.isNotEmpty(cmds)) {
+ LOG.warn("Commands size: {}. Length: {}.", cmds.size(), serialize(cmds).length);
+ }
+
+ // 3) Map serviceData,
+ List serviceData = clc.getProto().getServiceDataList();
+ logContainerLaunchContext("ServiceData size: {}. Length: {}.", serviceData);
+
+ // 4) Map acls
+ List acls = clc.getProto().getApplicationACLsList();
+ logContainerLaunchContext("ACLs size: {}. Length: {}.", acls);
+ }
+
+ /**
+ * Log ContainerLaunchContext Data SerializedSize.
+ *
+ * @param format format of logging.
+ * @param lists data list.
+ * @param generic type R.
+ */
+ private static void logContainerLaunchContext(String format,
+ List lists) {
+ if (CollectionUtils.isNotEmpty(lists)) {
+ int sumLength = 0;
+ for (R item : lists) {
+ sumLength += item.getSerializedSize();
+ }
+ LOG.warn(format, lists.size(), sumLength);
+ }
+ }
+
+ /**
+ * Serialize an object in ByteArray.
+ *
+ * @return obj ByteArray.
+ * @throws IOException if an IO error occurred.
+ */
+ @Private
+ @Unstable
+ private static byte[] serialize(Object obj) throws IOException {
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
+ try (ObjectOutputStream o = new ObjectOutputStream(b)) {
+ o.writeObject(obj);
+ }
+ return b.toByteArray();
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ApplicationSubmissionContextInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ApplicationSubmissionContextInterceptor.java
new file mode 100644
index 0000000000..6ec3fe334d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ApplicationSubmissionContextInterceptor.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.router.RouterAuditLogger;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_CLIENT_RM_SERVICE;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN;
+
+/**
+ * It prevents DoS attack over the ApplicationClientProtocol. Currently, it
+ * checks the size of the ApplicationSubmissionContext. If it exceeds the limit
+ * it can cause Zookeeper failures.
+ */
+public class ApplicationSubmissionContextInterceptor extends PassThroughClientRequestInterceptor {
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+
+ if (request == null || request.getApplicationSubmissionContext() == null ||
+ request.getApplicationSubmissionContext().getApplicationId() == null) {
+ RouterMetrics.getMetrics().incrAppsFailedSubmitted();
+ String errMsg =
+ "Missing submitApplication request or applicationSubmissionContext information.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg);
+ RouterServerUtil.logAndThrowException(errMsg, null);
+ }
+
+ ApplicationSubmissionContext appContext = request.getApplicationSubmissionContext();
+ ApplicationSubmissionContextPBImpl asc = (ApplicationSubmissionContextPBImpl) appContext;
+
+ // Check for excessively large fields, throw exception if found
+ RouterServerUtil.checkAppSubmissionContext(asc, getConf());
+
+ // Check succeeded - app submit will be passed on to the next interceptor
+ return getNextInterceptor().submitApplication(request);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
new file mode 100644
index 0000000000..fa830d3881
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Interceptor that does not do anything other than forwarding it to the next
+ * Interceptor in the chain.
+ */
+public class PassThroughClientRequestInterceptor extends AbstractClientRequestInterceptor {
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return getNextInterceptor().submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return getNextInterceptor().listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return getNextInterceptor().updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return getNextInterceptor().deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return getNextInterceptor().failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return getNextInterceptor().signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().updateApplicationTimeouts(request);
+ }
+
+ @Override
+ public GetAllResourceProfilesResponse getResourceProfiles(
+ GetAllResourceProfilesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getResourceProfiles(request);
+ }
+
+ @Override
+ public GetResourceProfileResponse getResourceProfile(
+ GetResourceProfileRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getResourceProfile(request);
+ }
+
+ @Override
+ public GetAllResourceTypeInfoResponse getResourceTypeInfo(
+ GetAllResourceTypeInfoRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getResourceTypeInfo(request);
+ }
+
+ @Override
+ public GetAttributesToNodesResponse getAttributesToNodes(
+ GetAttributesToNodesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getAttributesToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeAttributesResponse getClusterNodeAttributes(
+ GetClusterNodeAttributesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodeAttributes(request);
+ }
+
+ @Override
+ public GetNodesToAttributesResponse getNodesToAttributes(
+ GetNodesToAttributesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNodesToAttributes(request);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
index e33ce15507..7af470dc58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
@@ -112,6 +112,7 @@ public final class RouterWebServiceUtil {
* @param additionalParam the query parameters as input for a specific REST
* call in case the call has no servlet request
* @param client same client used to reduce number of clients created
+ * @param conf configuration
* @return the retrieved entity from the REST call
*/
protected static T genericForward(final String webApp,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestApplicationSubmissionContextInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestApplicationSubmissionContextInterceptor.java
new file mode 100644
index 0000000000..d3cf6de4ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestApplicationSubmissionContextInterceptor.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.junit.Test;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code ApplicationSubmissionContextInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request interceptor chains.
+ */
+public class TestApplicationSubmissionContextInterceptor extends BaseRouterClientRMTest {
+
+ @Override
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration conf = new YarnConfiguration();
+ String mockPassThroughInterceptorClass =
+ PassThroughClientRequestInterceptor.class.getName();
+
+ // Create a request interceptor pipeline for testing. The last one in the
+ // chain is the application submission context interceptor that checks
+ // for exceeded submission context size
+ // The others in the chain will simply forward it to the next one in the
+ // chain
+ conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," +
+ ApplicationSubmissionContextInterceptor.class.getName() + "," +
+ MockClientRequestInterceptor.class.getName());
+
+ // Lower the max application context size
+ conf.set(YarnConfiguration.ROUTER_ASC_INTERCEPTOR_MAX_SIZE, "512B");
+
+ return conf;
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication in case of empty
+ * request.
+ * @throws Exception error occur.
+ */
+ @Test
+ public void testSubmitApplicationEmptyRequest() throws Exception {
+
+ MockRouterClientRMService rmService = getRouterClientRMService();
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitApplication request or applicationSubmissionContext information.",
+ () -> rmService.submitApplication(null));
+
+ ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
+ null, "", "", null, null, false, false, -1, null, null);
+ SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitApplication request or applicationSubmissionContext information.",
+ () -> rmService.submitApplication(null));
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication by setting up
+ * null, valid, and large ContainerLaunchContexts.
+ * @throws Exception error occur.
+ */
+ @Test
+ public void testCLCExceedSize() throws Exception {
+
+ ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
+ ApplicationId.newInstance(1, 1), "test", "default",
+ Priority.newInstance(0), null, false, true, 2,
+ Resource.newInstance(10, 2), "test");
+
+ LocalResource localResource = LocalResource.newInstance(
+ URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+ 1234567890L);
+
+ Map localResources = new HashMap<>();
+ localResources.put("rsrc", localResource);
+
+ Map env = new HashMap<>();
+ env.put("somevar", "someval");
+
+ List containerCmds = new ArrayList<>();
+ containerCmds.add("somecmd");
+ containerCmds.add("somearg");
+
+ Map serviceData = new HashMap<>();
+ serviceData.put("someservice", ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
+ ByteBuffer containerTokens = ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
+
+ Map acls = new HashMap<>();
+ acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+ acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ localResources, env, containerCmds, serviceData, containerTokens, acls);
+ ApplicationSubmissionContextPBImpl appSubmissionContextPB =
+ (ApplicationSubmissionContextPBImpl) context;
+ Configuration configuration = getConf();
+
+ // Null ApplicationSubmissionContext
+ RouterServerUtil.checkAppSubmissionContext(null, configuration);
+
+ // Null ContainerLaunchContext
+ RouterServerUtil.checkAppSubmissionContext(appSubmissionContextPB, configuration);
+
+ // Valid ContainerLaunchContext
+ context.setAMContainerSpec(clc);
+ RouterServerUtil.checkAppSubmissionContext(appSubmissionContextPB, configuration);
+
+ // ContainerLaunchContext exceeds 1MB
+ for (int i = 0; i < 1000; i++) {
+ localResources.put("rsrc" + i, localResource);
+ }
+ ContainerLaunchContext clcExceedSize = ContainerLaunchContext.newInstance(
+ localResources, env, containerCmds, serviceData, containerTokens, acls);
+ context.setAMContainerSpec(clcExceedSize);
+ LambdaTestUtils.intercept(YarnException.class,
+ "The size of the ApplicationSubmissionContext of the application",
+ () -> RouterServerUtil.checkAppSubmissionContext(appSubmissionContextPB, configuration));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index f547e8d6b7..3f7acee2d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -91,8 +91,7 @@ of the desirable properties of balance, optimal cluster utilization and global i
This part of the federation system is part of future work in [YARN-5597](https://issues.apache.org/jira/browse/YARN-5597).
-
-###Federation State-Store
+### Federation State-Store
The Federation State defines the additional state that needs to be maintained to loosely couple multiple individual sub-clusters into a single large federated cluster. This includes the following information:
####Sub-cluster Membership
@@ -255,10 +254,30 @@ Optional:
These are extra configurations that should appear in the **conf/yarn-site.xml** at each Router.
-| Property | Example | Description |
-|:---- |:---- |:---- |
-|`yarn.router.bind-host` | `0.0.0.0` | Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0. |
-| `yarn.router.clientrm.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor. |
+| Property | Example | Description |
+|:--------------------------------------------------|:----------------------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `yarn.router.bind-host` | `0.0.0.0` | Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0. |
+| `yarn.router.clientrm.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor. |
+
+> Enable ApplicationSubmissionContextInterceptor
+
+- If the `FederationStateStore` is configured with `Zookpeer` storage, the app information will be stored in `Zookpeer`. If the size of the app information exceeds `1MB`, `Zookpeer` may fail. `ApplicationSubmissionContextInterceptor` will check the size of `ApplicationSubmissionContext`, if the size exceeds the limit(default 1MB), an exception will be thrown.
+ - The size of the ApplicationSubmissionContext of the application application_123456789_0001 is above the limit. Size = 1.02 MB.
+
+- The required configuration is as follows:
+
+```
+
+ yarn.router.clientrm.interceptor-class.pipeline
+ org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor,
+ org.apache.hadoop.yarn.server.router.clientrm.ApplicationSubmissionContextInterceptor,
+ org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor
+
+
+ yarn.router.asc-interceptor-max-size
+ 1MB
+
+```
Optional: