diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5e4c826a42..ca71d35718 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -561,6 +561,11 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_NUM_RETRIES = 1000;
+ /** Zookeeper znode limit */
+ public static final String RM_ZK_ZNODE_SIZE_LIMIT_BYTES =
+ RM_ZK_PREFIX + "max-znode-size.bytes";
+ public static final int DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES = 1024 * 1024;
+
public static final String RM_ZK_RETRY_INTERVAL_MS =
RM_ZK_PREFIX + "retry-interval-ms";
public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e687eef370..d4b7bdeaaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -628,6 +628,14 @@
0
+
+ Specifies the maximum size of the data that can be stored
+ in a znode.Value should be same or less than jute.maxbuffer configured
+ in zookeeper.Default value configured is 1MB.
+ yarn.resourcemanager.zk-max-znode-size.bytes
+ 1048576
+
+
Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 975847cfdf..e945b597cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -48,7 +48,6 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -217,14 +216,20 @@ public RMStateStoreState transition(RMStateStore store,
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appState);
- store.notifyApplication(new RMAppEvent(appId,
- RMAppEventType.APP_NEW_SAVED));
+ store.notifyApplication(
+ new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
- isFenced = store.notifyStoreOperationFailedInternal(e);
+ if (e instanceof StoreLimitException) {
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_REJECTED, e.getMessage(), false));
+ } else {
+ isFenced = store.notifyStoreOperationFailedInternal(e);
+ }
}
return finalState(isFenced);
};
+
}
private static class UpdateAppTransition implements
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java
new file mode 100644
index 0000000000..289ea445c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This exception is thrown when Application Data size exceeds limit RM state
+ * store.
+ *
+ */
+public class StoreLimitException extends YarnException {
+ private static final long serialVersionUID = 1L;
+
+ public StoreLimitException(String message) {
+ super(message);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index a9d08857fd..3b986d14a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -188,6 +188,7 @@ public class ZKRMStateStore extends RMStateStore {
private String fencingNodePath;
private Thread verifyActiveStatusThread;
private int zkSessionTimeout;
+ private int zknodeLimit;
/* ACL and auth info */
private List zkAcl;
@@ -283,6 +284,8 @@ public synchronized void initInternal(Configuration conf)
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
+ YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);
appIdNodeSplitIndex =
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
@@ -746,8 +749,17 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- safeCreate(nodeCreatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ if (appStateData.length <= zknodeLimit) {
+ safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application state data size for " + appId + " is "
+ + appStateData.length);
+ }
+ throw new StoreLimitException("Application " + appId
+ + " exceeds the maximum allowed size for application data. "
+ + "See yarn.resourcemanager.zk-max-znode-size.bytes.");
+ }
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java
index 6496402072..0c6139e94e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java
@@ -25,6 +25,7 @@ public class RMAppEvent extends AbstractEvent{
private final ApplicationId appId;
private final String diagnosticMsg;
+ private boolean storeAppInfo;
public RMAppEvent(ApplicationId appId, RMAppEventType type) {
this(appId, type, "");
@@ -35,6 +36,21 @@ public RMAppEvent(ApplicationId appId, RMAppEventType type,
super(type);
this.appId = appId;
this.diagnosticMsg = diagnostic;
+ this.storeAppInfo = true;
+ }
+
+ /**
+ * Constructor to create RM Application Event type.
+ *
+ * @param appId application Id
+ * @param type RM Event type
+ * @param diagnostic Diagnostic message for event
+ * @param storeApp Application should be saved or not
+ */
+ public RMAppEvent(ApplicationId appId, RMAppEventType type, String diagnostic,
+ boolean storeApp) {
+ this(appId, type, diagnostic);
+ this.storeAppInfo = storeApp;
}
public ApplicationId getApplicationId() {
@@ -44,4 +60,13 @@ public ApplicationId getApplicationId() {
public String getDiagnosticMsg() {
return this.diagnosticMsg;
}
+
+ /**
+ * Store application to state store or not.
+ *
+ * @return boolean application should be saved to store.
+ */
+ public boolean doStoreAppInfo() {
+ return storeAppInfo;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index dda947494d..bf8fa4f3a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1307,8 +1307,10 @@ public FinalSavingTransition(Object transitionToDo,
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
- targetedFinalState, stateToBeStored);
+ if (event.doStoreAppInfo()) {
+ app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+ targetedFinalState, stateToBeStored);
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 7c40ddfa98..fcd8647638 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
@@ -53,6 +55,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -70,6 +74,7 @@
import org.junit.Test;
import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -249,6 +254,38 @@ public void testZKRMStateStoreRealZK() throws Exception {
zkTester.getRMStateStore()).testRetryingCreateRootDir();
}
+ @Test
+ public void testZKNodeLimit() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ long submitTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() + 1234;
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES, 1);
+ RMStateStore store = zkTester.getRMStateStore(conf);
+ TestAppRejDispatcher dispatcher = new TestAppRejDispatcher();
+ store.setRMDispatcher(dispatcher);
+ ApplicationId appId1 =
+ ApplicationId.fromString("application_1352994193343_0001");
+ storeApp(store, appId1, submitTime, startTime);
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return dispatcher.apprejectedEvnt;
+ }
+ }, 100, 5000);
+ }
+
+ static class TestAppRejDispatcher extends TestDispatcher {
+ private boolean apprejectedEvnt;
+
+ public void handle(Event event) {
+ if (event instanceof RMAppEvent
+ && event.getType().equals(RMAppEventType.APP_REJECTED)) {
+ apprejectedEvnt = true;
+ }
+ };
+ }
+
@Test (timeout = 60000)
public void testCheckMajorVersionChange() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {