YARN-5006. ResourceManager quit due to ApplicationStateData exceed the limit size of znode in zk. Contributed by Bibin A Chundatt.

This commit is contained in:
Naganarasimha 2017-06-23 07:52:41 +05:30
parent 092ebdf885
commit 740204b292
8 changed files with 135 additions and 8 deletions

View File

@ -561,6 +561,11 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries"; public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries";
public static final int DEFAULT_ZK_RM_NUM_RETRIES = 1000; public static final int DEFAULT_ZK_RM_NUM_RETRIES = 1000;
/** Zookeeper znode limit */
public static final String RM_ZK_ZNODE_SIZE_LIMIT_BYTES =
RM_ZK_PREFIX + "max-znode-size.bytes";
public static final int DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES = 1024 * 1024;
public static final String RM_ZK_RETRY_INTERVAL_MS = public static final String RM_ZK_RETRY_INTERVAL_MS =
RM_ZK_PREFIX + "retry-interval-ms"; RM_ZK_PREFIX + "retry-interval-ms";
public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000; public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;

View File

@ -628,6 +628,14 @@
<value>0</value> <value>0</value>
</property> </property>
<property>
<description>Specifies the maximum size of the data that can be stored
in a znode.Value should be same or less than jute.maxbuffer configured
in zookeeper.Default value configured is 1MB.</description>
<name>yarn.resourcemanager.zk-max-znode-size.bytes</name>
<value>1048576</value>
</property>
<property> <property>
<description>Name of the cluster. In a HA setting, <description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader this is used to ensure the RM participates in leader

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -217,14 +216,20 @@ public RMStateStoreState transition(RMStateStore store,
LOG.info("Storing info for app: " + appId); LOG.info("Storing info for app: " + appId);
try { try {
store.storeApplicationStateInternal(appId, appState); store.storeApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId, store.notifyApplication(
RMAppEventType.APP_NEW_SAVED)); new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error storing app: " + appId, e); LOG.error("Error storing app: " + appId, e);
isFenced = store.notifyStoreOperationFailedInternal(e); if (e instanceof StoreLimitException) {
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_REJECTED, e.getMessage(), false));
} else {
isFenced = store.notifyStoreOperationFailedInternal(e);
}
} }
return finalState(isFenced); return finalState(isFenced);
}; };
} }
private static class UpdateAppTransition implements private static class UpdateAppTransition implements

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* This exception is thrown when Application Data size exceeds limit RM state
* store.
*
*/
public class StoreLimitException extends YarnException {
private static final long serialVersionUID = 1L;
public StoreLimitException(String message) {
super(message);
}
}

View File

@ -188,6 +188,7 @@ public class ZKRMStateStore extends RMStateStore {
private String fencingNodePath; private String fencingNodePath;
private Thread verifyActiveStatusThread; private Thread verifyActiveStatusThread;
private int zkSessionTimeout; private int zkSessionTimeout;
private int zknodeLimit;
/* ACL and auth info */ /* ACL and auth info */
private List<ACL> zkAcl; private List<ACL> zkAcl;
@ -283,6 +284,8 @@ public synchronized void initInternal(Configuration conf)
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);
appIdNodeSplitIndex = appIdNodeSplitIndex =
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
@ -746,8 +749,17 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
} }
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
safeCreate(nodeCreatePath, appStateData, zkAcl, if (appStateData.length <= zknodeLimit) {
CreateMode.PERSISTENT); safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Application state data size for " + appId + " is "
+ appStateData.length);
}
throw new StoreLimitException("Application " + appId
+ " exceeds the maximum allowed size for application data. "
+ "See yarn.resourcemanager.zk-max-znode-size.bytes.");
}
} }
@Override @Override

View File

@ -25,6 +25,7 @@ public class RMAppEvent extends AbstractEvent<RMAppEventType>{
private final ApplicationId appId; private final ApplicationId appId;
private final String diagnosticMsg; private final String diagnosticMsg;
private boolean storeAppInfo;
public RMAppEvent(ApplicationId appId, RMAppEventType type) { public RMAppEvent(ApplicationId appId, RMAppEventType type) {
this(appId, type, ""); this(appId, type, "");
@ -35,6 +36,21 @@ public RMAppEvent(ApplicationId appId, RMAppEventType type,
super(type); super(type);
this.appId = appId; this.appId = appId;
this.diagnosticMsg = diagnostic; this.diagnosticMsg = diagnostic;
this.storeAppInfo = true;
}
/**
* Constructor to create RM Application Event type.
*
* @param appId application Id
* @param type RM Event type
* @param diagnostic Diagnostic message for event
* @param storeApp Application should be saved or not
*/
public RMAppEvent(ApplicationId appId, RMAppEventType type, String diagnostic,
boolean storeApp) {
this(appId, type, diagnostic);
this.storeAppInfo = storeApp;
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
@ -44,4 +60,13 @@ public ApplicationId getApplicationId() {
public String getDiagnosticMsg() { public String getDiagnosticMsg() {
return this.diagnosticMsg; return this.diagnosticMsg;
} }
/**
* Store application to state store or not.
*
* @return boolean application should be saved to store.
*/
public boolean doStoreAppInfo() {
return storeAppInfo;
}
} }

View File

@ -1307,8 +1307,10 @@ public FinalSavingTransition(Object transitionToDo,
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.rememberTargetTransitionsAndStoreState(event, transitionToDo, if (event.doStoreAppInfo()) {
targetedFinalState, stateToBeStored); app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
targetedFinalState, stateToBeStored);
}
} }
} }

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
@ -53,6 +55,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -70,6 +74,7 @@
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -249,6 +254,38 @@ public void testZKRMStateStoreRealZK() throws Exception {
zkTester.getRMStateStore()).testRetryingCreateRootDir(); zkTester.getRMStateStore()).testRetryingCreateRootDir();
} }
@Test
public void testZKNodeLimit() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis() + 1234;
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES, 1);
RMStateStore store = zkTester.getRMStateStore(conf);
TestAppRejDispatcher dispatcher = new TestAppRejDispatcher();
store.setRMDispatcher(dispatcher);
ApplicationId appId1 =
ApplicationId.fromString("application_1352994193343_0001");
storeApp(store, appId1, submitTime, startTime);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dispatcher.apprejectedEvnt;
}
}, 100, 5000);
}
static class TestAppRejDispatcher extends TestDispatcher {
private boolean apprejectedEvnt;
public void handle(Event event) {
if (event instanceof RMAppEvent
&& event.getType().equals(RMAppEventType.APP_REJECTED)) {
apprejectedEvnt = true;
}
};
}
@Test (timeout = 60000) @Test (timeout = 60000)
public void testCheckMajorVersionChange() throws Exception { public void testCheckMajorVersionChange() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {