YARN-5602. Utils for Federation State and Policy Store. (Giovanni Matteo Fumarola via Subru).

(cherry picked from commit 326a2e6bde1cf266ecc7d3b513cdaac6abcebbe4)
This commit is contained in:
Subru Krishnan 2017-04-05 15:02:00 -07:00 committed by Carlo Curino
parent 1c64e1709b
commit e1da8f0667
17 changed files with 685 additions and 68 deletions

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.exception;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>
* Logical error codes from <code>FederationStateStore</code>.
* </p>
*/
@Public
@Unstable
public enum FederationStateStoreErrorCode {
MEMBERSHIP_INSERT_FAIL(1101, "Fail to insert a tuple into Membership table."),
MEMBERSHIP_DELETE_FAIL(1102, "Fail to delete a tuple from Membership table."),
MEMBERSHIP_SINGLE_SELECT_FAIL(1103,
"Fail to select a tuple from Membership table."),
MEMBERSHIP_MULTIPLE_SELECT_FAIL(1104,
"Fail to select multiple tuples from Membership table."),
MEMBERSHIP_UPDATE_DEREGISTER_FAIL(1105,
"Fail to update/deregister a tuple in Membership table."),
MEMBERSHIP_UPDATE_HEARTBEAT_FAIL(1106,
"Fail to update/heartbeat a tuple in Membership table."),
APPLICATIONS_INSERT_FAIL(1201,
"Fail to insert a tuple into ApplicationsHomeSubCluster table."),
APPLICATIONS_DELETE_FAIL(1202,
"Fail to delete a tuple from ApplicationsHomeSubCluster table"),
APPLICATIONS_SINGLE_SELECT_FAIL(1203,
"Fail to select a tuple from ApplicationsHomeSubCluster table."),
APPLICATIONS_MULTIPLE_SELECT_FAIL(1204,
"Fail to select multiple tuple from ApplicationsHomeSubCluster table."),
APPLICATIONS_UPDATE_FAIL(1205,
"Fail to update a tuple in ApplicationsHomeSubCluster table."),
POLICY_INSERT_FAIL(1301, "Fail to insert a tuple into Policy table."),
POLICY_DELETE_FAIL(1302, "Fail to delete a tuple from Membership table."),
POLICY_SINGLE_SELECT_FAIL(1303, "Fail to select a tuple from Policy table."),
POLICY_MULTIPLE_SELECT_FAIL(1304,
"Fail to select multiple tuples from Policy table."),
POLICY_UPDATE_FAIL(1305, "Fail to update a tuple in Policy table.");
private final int id;
private final String msg;
FederationStateStoreErrorCode(int id, String msg) {
this.id = id;
this.msg = msg;
}
/**
* Get the error code related to the FederationStateStore failure.
*
* @return the error code related to the FederationStateStore failure.
*/
public int getId() {
return this.id;
}
/**
* Get the error message related to the FederationStateStore failure.
*
* @return the error message related to the FederationStateStore failure.
*/
public String getMsg() {
return this.msg;
}
@Override
public String toString() {
return "\nError Code: " + this.id + "\nError Message: " + this.msg;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.exception;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Exception thrown by the <code>FederationStateStore</code>.
*
*/
public class FederationStateStoreException extends YarnException {
/**
* IDE auto-generated.
*/
private static final long serialVersionUID = -6453353714832159296L;
private FederationStateStoreErrorCode code;
public FederationStateStoreException(FederationStateStoreErrorCode code) {
super();
this.code = code;
}
public FederationStateStoreErrorCode getCode() {
return code;
}
}

View File

@ -16,14 +16,14 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.utils;
package org.apache.hadoop.yarn.server.federation.store.exception;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Exception thrown by the {@link FederationMembershipStateStoreInputValidator},
* {@link FederationApplicationHomeSubClusterStoreInputValidator},
* {@link FederationPolicyStoreInputValidator} if the input is invalid.
* Exception thrown by the {@code FederationMembershipStateStoreInputValidator},
* {@code FederationApplicationHomeSubClusterStoreInputValidator},
* {@code FederationPolicyStoreInputValidator} if the input is invalid.
*
*/
public class FederationStateStoreInvalidInputException extends YarnException {

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.exception;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Exception thrown by the {@code FederationStateStore}, if it is a retriable
* exception.
*
*/
public class FederationStateStoreRetriableException extends YarnException {
private static final long serialVersionUID = 1L;
public FederationStateStoreRetriableException(Throwable cause) {
super(cause);
}
public FederationStateStoreRetriableException(String message) {
super(message);
}
public FederationStateStoreRetriableException(String message,
Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.exception;

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@ -60,8 +61,11 @@
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In-memory implementation of {@link FederationStateStore}.
@ -74,6 +78,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
private final MonotonicClock clock = new MonotonicClock();
public static final Logger LOG =
LoggerFactory.getLogger(MemoryFederationStateStore.class);
@Override
public void init(Configuration conf) {
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
@ -94,7 +101,17 @@ public SubClusterRegisterResponse registerSubCluster(
FederationMembershipStateStoreInputValidator
.validateSubClusterRegisterRequest(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
SubClusterInfo subClusterInfoToSave =
SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
subClusterInfo.getAMRMServiceAddress(),
subClusterInfo.getClientRMServiceAddress(),
subClusterInfo.getRMAdminServiceAddress(),
subClusterInfo.getRMWebServiceAddress(), clock.getTime(),
subClusterInfo.getState(), subClusterInfo.getLastStartTime(),
subClusterInfo.getCapability());
membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
return SubClusterRegisterResponse.newInstance();
}
@ -105,8 +122,11 @@ public SubClusterDeregisterResponse deregisterSubCluster(
.validateSubClusterDeregisterRequest(request);
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
throw new YarnException(
"SubCluster " + request.getSubClusterId().toString() + " not found");
String errMsg =
"SubCluster " + request.getSubClusterId().toString() + " not found";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
errMsg);
} else {
subClusterInfo.setState(request.getState());
}
@ -124,8 +144,11 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
throw new YarnException("Subcluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat");
String errMsg = "Subcluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
errMsg);
}
subClusterInfo.setLastHeartBeat(clock.getTime());
@ -143,8 +166,10 @@ public GetSubClusterInfoResponse getSubCluster(
.validateGetSubClusterInfoRequest(request);
SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) {
throw new YarnException(
"Subcluster " + subClusterId.toString() + " does not exist");
String errMsg =
"Subcluster " + subClusterId.toString() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg);
}
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
@ -193,7 +218,9 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (!applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " does not exist");
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg);
}
applications.put(appId,
@ -209,7 +236,10 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
.validateGetApplicationHomeSubClusterRequest(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " does not exist");
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
errMsg);
}
return GetApplicationHomeSubClusterResponse.newInstance(
@ -238,7 +268,9 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
.validateDeleteApplicationHomeSubClusterRequest(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
throw new YarnException("Application " + appId + " does not exist");
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg);
}
applications.remove(appId);
@ -253,7 +285,9 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
.validateGetSubClusterPolicyConfigurationRequest(request);
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
throw new YarnException("Policy for queue " + queue + " does not exist");
String errMsg = "Policy for queue " + queue + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG,
FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg);
}
return GetSubClusterPolicyConfigurationResponse

View File

@ -260,4 +260,66 @@ public String toString() {
+ ", getState() = " + getState() + ", getLastStartTime() = "
+ getLastStartTime() + ", getCapability() = " + getCapability() + "]";
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
SubClusterInfo other = (SubClusterInfo) obj;
if (!this.getSubClusterId().equals(other.getSubClusterId())) {
return false;
}
if (!this.getAMRMServiceAddress().equals(other.getAMRMServiceAddress())) {
return false;
}
if (!this.getClientRMServiceAddress()
.equals(other.getClientRMServiceAddress())) {
return false;
}
if (!this.getRMAdminServiceAddress()
.equals(other.getRMAdminServiceAddress())) {
return false;
}
if (!this.getRMWebServiceAddress().equals(other.getRMWebServiceAddress())) {
return false;
}
if (!this.getState().equals(other.getState())) {
return false;
}
return this.getLastStartTime() == other.getLastStartTime();
// Capability and HeartBeat fields are not included as they are temporal
// (i.e. timestamps), so they change during the lifetime of the same
// sub-cluster
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((getSubClusterId() == null) ? 0 : getSubClusterId().hashCode());
result = prime * result + ((getAMRMServiceAddress() == null) ? 0
: getAMRMServiceAddress().hashCode());
result = prime * result + ((getClientRMServiceAddress() == null) ? 0
: getClientRMServiceAddress().hashCode());
result = prime * result + ((getRMAdminServiceAddress() == null) ? 0
: getRMAdminServiceAddress().hashCode());
result = prime * result + ((getRMWebServiceAddress() == null) ? 0
: getRMWebServiceAddress().hashCode());
result =
prime * result + ((getState() == null) ? 0 : getState().hashCode());
result = prime * result
+ (int) (getLastStartTime() ^ (getLastStartTime() >>> 32));
return result;
// Capability and HeartBeat fields are not included as they are temporal
// (i.e. timestamps), so they change during the lifetime of the same
// sub-cluster
}
}

View File

@ -81,22 +81,6 @@ private void mergeLocalToBuilder() {
}
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.federation.store.utils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;

View File

@ -20,6 +20,7 @@
import java.net.URI;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.store.utils;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.utils;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Common utility methods used by the store implementations.
*
*/
public final class FederationStateStoreUtils {
public static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreUtils.class);
private FederationStateStoreUtils() {
}
/**
* Returns the SQL <code>FederationStateStore</code> connection to the pool.
*
* @param log the logger interface
* @param cstmt the interface used to execute SQL stored procedures
* @param conn the SQL connection
* @throws YarnException on failure
*/
public static void returnToPool(Logger log, CallableStatement cstmt,
Connection conn) throws YarnException {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException e) {
logAndThrowException(log, "Exception while trying to close Statement",
e);
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logAndThrowException(log, "Exception while trying to close Connection",
e);
}
}
}
/**
* Throws an exception due to an error in <code>FederationStateStore</code>.
*
* @param log the logger interface
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws YarnException on failure
*/
public static void logAndThrowException(Logger log, String errMsg,
Throwable t) throws YarnException {
if (t != null) {
log.error(errMsg, t);
throw new YarnException(errMsg, t);
} else {
log.error(errMsg);
throw new YarnException(errMsg);
}
}
/**
* Throws an <code>FederationStateStoreException</code> due to an error in
* <code>FederationStateStore</code>.
*
* @param log the logger interface
* @param code FederationStateStoreErrorCode of the error
* @param errMsg the error message
* @throws YarnException on failure
*/
public static void logAndThrowStoreException(Logger log,
FederationStateStoreErrorCode code, String errMsg) throws YarnException {
log.error(errMsg + " " + code.toString());
throw new FederationStateStoreException(code);
}
/**
* Throws an <code>FederationStateStoreException</code> due to an error in
* <code>FederationStateStore</code>.
*
* @param log the logger interface
* @param code FederationStateStoreErrorCode of the error
* @throws YarnException on failure
*/
public static void logAndThrowStoreException(Logger log,
FederationStateStoreErrorCode code) throws YarnException {
log.error(code.toString());
throw new FederationStateStoreException(code);
}
/**
* Throws an <code>FederationStateStoreInvalidInputException</code> due to an
* error in <code>FederationStateStore</code>.
*
* @param log the logger interface
* @param errMsg the error message
* @throws YarnException on failure
*/
public static void logAndThrowInvalidInputException(Logger log, String errMsg)
throws YarnException {
LOG.error(errMsg);
throw new FederationStateStoreInvalidInputException(errMsg);
}
/**
* Throws an <code>FederationStateStoreRetriableException</code> due to an
* error in <code>FederationStateStore</code>.
*
* @param log the logger interface
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws YarnException on failure
*/
public static void logAndThrowRetriableException(Logger log, String errMsg,
Throwable t) throws YarnException {
if (t != null) {
LOG.error(errMsg, t);
throw new FederationStateStoreRetriableException(errMsg, t);
} else {
LOG.error(errMsg);
throw new FederationStateStoreRetriableException(errMsg);
}
}
}

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@ -137,14 +138,32 @@ public synchronized void reinitialize(FederationStateStore store,
initCache();
}
/**
* Create a RetryPolicy for {@code FederationStateStoreFacade}. In case of
* failure, it retries for:
* <ul>
* <li>{@code FederationStateStoreRetriableException}</li>
* <li>{@code CacheLoaderException}</li>
* </ul>
*
* @param conf the updated configuration
* @return the RetryPolicy for FederationStateStoreFacade
*/
public static RetryPolicy createRetryPolicy(Configuration conf) {
// Retry settings for StateStore
RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry(
RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(FederationStateStoreRetriableException.class,
basePolicy);
exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy);
RetryPolicy retryPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
return retryPolicy;
}

View File

@ -19,11 +19,14 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@ -67,9 +70,11 @@ public abstract class FederationStateStoreBaseTest {
protected abstract FederationStateStore createStateStore();
private Configuration conf;
@Before
public void before() throws IOException, YarnException {
stateStore.init(new Configuration());
stateStore.init(conf);
}
@After
@ -114,8 +119,10 @@ public void testDeregisterSubClusterUnknownSubCluster() throws Exception {
try {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
e.getCode());
}
}
@ -141,9 +148,10 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
try {
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Subcluster SC does not exist"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL,
e.getCode());
}
}
@ -166,19 +174,25 @@ public void testGetAllSubClustersInfo() throws Exception {
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
Assert.assertTrue(
List<SubClusterInfo> subClustersActive =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
.getSubClusters().contains(subClusterInfo1));
Assert.assertFalse(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
.getSubClusters().contains(subClusterInfo2));
.getSubClusters();
List<SubClusterInfo> subClustersAll =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
.getSubClusters();
Assert.assertTrue(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
.getSubClusters().contains(subClusterInfo1));
Assert.assertTrue(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
.getSubClusters().contains(subClusterInfo2));
// SC1 is the only active
Assert.assertEquals(1, subClustersActive.size());
SubClusterInfo sc1 = subClustersActive.get(0);
Assert.assertEquals(subClusterId1, sc1.getSubClusterId());
// SC1 and SC2 are the SubCluster present into the StateStore
Assert.assertEquals(2, subClustersAll.size());
Assert.assertTrue(subClustersAll.contains(sc1));
subClustersAll.remove(sc1);
SubClusterInfo sc2 = subClustersAll.get(0);
Assert.assertEquals(subClusterId2, sc2.getSubClusterId());
}
@Test
@ -204,9 +218,10 @@ public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Subcluster SC does not exist; cannot heartbeat"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
e.getCode());
}
}
@ -265,9 +280,10 @@ public void testDeleteApplicationHomeSubCluster() throws Exception {
try {
queryApplicationHomeSC(appId);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId + " does not exist"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
e.getCode());
}
}
@ -281,9 +297,9 @@ public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception {
try {
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode());
}
}
@ -314,9 +330,10 @@ public void testGetApplicationHomeSubClusterUnknownApp() throws Exception {
try {
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
e.getCode());
}
}
@ -379,9 +396,9 @@ public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception {
try {
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode());
}
}
@ -440,9 +457,9 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception {
try {
stateStore.getPolicyConfiguration(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Policy for queue Queue does not exist"));
} catch (FederationStateStoreException e) {
Assert.assertEquals(
FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode());
}
}
@ -537,4 +554,8 @@ private SubClusterPolicyConfiguration queryPolicy(String queue)
return result.getPolicyConfiguration();
}
protected void setConf(Configuration conf) {
this.conf = conf;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/**
@ -27,6 +28,7 @@ public class TestMemoryFederationStateStore
@Override
protected FederationStateStore createStateStore() {
super.setConf(new Configuration());
return new MemoryFederationStateStore();
}
}
}

View File

@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import javax.cache.integration.CacheLoaderException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.junit.Assert;
import org.junit.Test;
/**
* Test class to validate FederationStateStoreFacade retry policy.
*/
public class TestFederationStateStoreFacadeRetry {
private int maxRetries = 4;
private Configuration conf;
/*
* Test to validate that FederationStateStoreRetriableException is a retriable
* exception.
*/
@Test
public void testFacadeRetriableException() throws Exception {
conf = new Configuration();
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
RetryAction action = policy.shouldRetry(
new FederationStateStoreRetriableException(""), 0, 0, false);
// We compare only the action, since delay and the reason are random values
// during this test
Assert.assertEquals(RetryAction.RETRY.action, action.action);
// After maxRetries we stop to retry
action = policy.shouldRetry(new FederationStateStoreRetriableException(""),
maxRetries, 0, false);
Assert.assertEquals(RetryAction.FAIL.action, action.action);
}
/*
* Test to validate that YarnException is not a retriable exception.
*/
@Test
public void testFacadeYarnException() throws Exception {
conf = new Configuration();
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
RetryAction action = policy.shouldRetry(new YarnException(), 0, 0, false);
Assert.assertEquals(RetryAction.FAIL.action, action.action);
}
/*
* Test to validate that FederationStateStoreException is not a retriable
* exception.
*/
@Test
public void testFacadeStateStoreException() throws Exception {
conf = new Configuration();
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
RetryAction action = policy.shouldRetry(
new FederationStateStoreException(
FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL),
0, 0, false);
Assert.assertEquals(RetryAction.FAIL.action, action.action);
}
/*
* Test to validate that FederationStateStoreInvalidInputException is not a
* retriable exception.
*/
@Test
public void testFacadeInvalidInputException() throws Exception {
conf = new Configuration();
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
RetryAction action = policy.shouldRetry(
new FederationStateStoreInvalidInputException(""), 0, 0, false);
Assert.assertEquals(RetryAction.FAIL.action, action.action);
}
/*
* Test to validate that CacheLoaderException is a retriable exception.
*/
@Test
public void testFacadeCacheRetriableException() throws Exception {
conf = new Configuration();
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
RetryAction action =
policy.shouldRetry(new CacheLoaderException(""), 0, 0, false);
// We compare only the action, since delay and the reason are random values
// during this test
Assert.assertEquals(RetryAction.RETRY.action, action.action);
// After maxRetries we stop to retry
action =
policy.shouldRetry(new CacheLoaderException(""), maxRetries, 0, false);
Assert.assertEquals(RetryAction.FAIL.action, action.action);
}
}