YARN-6185. Apply SLIDER-1199 to yarn native services for blacklisting nodes. Contributed by Billie Rinaldi
This commit is contained in:
parent
8967a1b812
commit
500695d726
@ -410,6 +410,12 @@ public abstract class AbstractProviderService
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBlacklist(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(List<AbstractRMOperation> operations) {
|
||||
for (AbstractRMOperation operation : operations) {
|
||||
|
@ -1755,7 +1755,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
|
||||
*/
|
||||
private void scheduleFailureWindowResets(ConfTree resources) throws
|
||||
BadConfigException {
|
||||
ResetFailureWindow reset = new ResetFailureWindow();
|
||||
ResetFailureWindow reset = new ResetFailureWindow(rmOperationHandler);
|
||||
ConfTreeOperations ops = new ConfTreeOperations(resources);
|
||||
MapOperations globals = ops.getGlobalOptions();
|
||||
long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW,
|
||||
@ -1988,6 +1988,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
|
||||
rmOperationHandler.cancelSingleRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBlacklist(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals) {
|
||||
rmOperationHandler.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
/* =================================================================== */
|
||||
/* END */
|
||||
/* =================================================================== */
|
||||
|
@ -19,21 +19,31 @@
|
||||
package org.apache.slider.server.appmaster.actions;
|
||||
|
||||
import org.apache.slider.server.appmaster.SliderAppMaster;
|
||||
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
|
||||
import org.apache.slider.server.appmaster.operations.RMOperationHandlerActions;
|
||||
import org.apache.slider.server.appmaster.state.AppState;
|
||||
|
||||
/**
|
||||
* Requests the AM to reset the failure window
|
||||
*/
|
||||
public class ResetFailureWindow extends AsyncAction {
|
||||
private final RMOperationHandlerActions operationHandler;
|
||||
|
||||
public ResetFailureWindow() {
|
||||
public ResetFailureWindow(RMOperationHandlerActions operationHandler) {
|
||||
super("ResetFailureWindow");
|
||||
this.operationHandler = operationHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(SliderAppMaster appMaster,
|
||||
QueueAccess queueService,
|
||||
AppState appState) throws Exception {
|
||||
appState.resetFailureCounts();
|
||||
synchronized (appMaster) {
|
||||
appState.resetFailureCounts();
|
||||
AbstractRMOperation blacklistOperation = appState.updateBlacklist();
|
||||
if (blacklistOperation != null) {
|
||||
blacklistOperation.execute(operationHandler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -107,4 +107,10 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
|
||||
public void addContainerRequest(AMRMClient.ContainerRequest req) {
|
||||
client.addContainerRequest(req);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBlacklist(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals) {
|
||||
client.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.slider.providers.ProviderService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ProviderNotifyingOperationHandler extends RMOperationHandler {
|
||||
|
||||
private final ProviderService providerService;
|
||||
@ -52,4 +54,10 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler {
|
||||
public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
|
||||
providerService.cancelSingleRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBlacklist(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals) {
|
||||
providerService.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
@ -27,25 +27,25 @@ import java.util.List;
|
||||
public interface RMOperationHandlerActions {
|
||||
|
||||
/**
|
||||
* Release an assigned container
|
||||
* Release an assigned container.
|
||||
* @param containerId container
|
||||
*/
|
||||
void releaseAssignedContainer(ContainerId containerId);
|
||||
|
||||
/**
|
||||
* Issue a container request
|
||||
* Issue a container request.
|
||||
* @param request
|
||||
*/
|
||||
void addContainerRequest(AMRMClient.ContainerRequest request);
|
||||
|
||||
/**
|
||||
* Cancel a specific request
|
||||
* Cancel a specific request.
|
||||
* @param request request to cancel
|
||||
*/
|
||||
void cancelSingleRequest(AMRMClient.ContainerRequest request);
|
||||
|
||||
/**
|
||||
* Remove a container request
|
||||
* Remove a container request.
|
||||
* @param priority1 priority to remove at
|
||||
* @param priority2 second priority to target
|
||||
* @param count number to remove
|
||||
@ -53,7 +53,15 @@ public interface RMOperationHandlerActions {
|
||||
int cancelContainerRequests(Priority priority1, Priority priority2, int count);
|
||||
|
||||
/**
|
||||
* Execute an entire list of operations
|
||||
* Blacklist resources.
|
||||
* @param blacklistAdditions resources to add to the blacklist
|
||||
* @param blacklistRemovals resources to remove from the blacklist
|
||||
*/
|
||||
void updateBlacklist(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals);
|
||||
|
||||
/**
|
||||
* Execute an entire list of operations.
|
||||
* @param operations ops
|
||||
*/
|
||||
void execute(List<AbstractRMOperation> operations);
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.slider.server.appmaster.operations;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Update blacklisted resources for the application.
|
||||
*/
|
||||
public class UpdateBlacklistOperation extends AbstractRMOperation {
|
||||
private final List<String> blacklistAdditions;
|
||||
private final List<String> blacklistRemovals;
|
||||
|
||||
public UpdateBlacklistOperation(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals) {
|
||||
this.blacklistAdditions = blacklistAdditions;
|
||||
this.blacklistRemovals = blacklistRemovals;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(RMOperationHandlerActions handler) {
|
||||
handler.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "blacklist additions: " + blacklistAdditions
|
||||
+ ", blacklist removals: " + blacklistRemovals;
|
||||
}
|
||||
}
|
@ -70,6 +70,7 @@ import org.apache.slider.server.appmaster.management.MetricsConstants;
|
||||
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
|
||||
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
|
||||
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
|
||||
import org.apache.slider.server.appmaster.operations.UpdateBlacklistOperation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -1934,6 +1935,15 @@ public class AppState {
|
||||
return results;
|
||||
}
|
||||
|
||||
public synchronized AbstractRMOperation updateBlacklist() {
|
||||
UpdateBlacklistOperation blacklistOperation =
|
||||
roleHistory.updateBlacklist(getRoleStatusMap().values());
|
||||
if (blacklistOperation != null) {
|
||||
log.info("Updating {}", blacklistOperation);
|
||||
}
|
||||
return blacklistOperation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look at where the current node state is -and whether it should be changed
|
||||
*/
|
||||
@ -1941,6 +1951,10 @@ public class AppState {
|
||||
throws SliderInternalStateException, TriggerClusterTeardownException {
|
||||
log.debug("in reviewRequestAndReleaseNodes()");
|
||||
List<AbstractRMOperation> allOperations = new ArrayList<>();
|
||||
AbstractRMOperation blacklistOperation = updateBlacklist();
|
||||
if (blacklistOperation != null) {
|
||||
allOperations.add(blacklistOperation);
|
||||
}
|
||||
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
|
||||
if (!roleStatus.isExcludeFromFlexing()) {
|
||||
List<AbstractRMOperation> operations = reviewOneRole(roleStatus);
|
||||
|
@ -42,6 +42,8 @@ public class NodeInstance {
|
||||
|
||||
public final String hostname;
|
||||
|
||||
private boolean blacklisted = false;
|
||||
|
||||
/**
|
||||
* last state of node. Starts off as {@link NodeState#RUNNING},
|
||||
* on the assumption that it is live.
|
||||
@ -81,6 +83,14 @@ public class NodeInstance {
|
||||
nodeEntries = new ArrayList<>(roles);
|
||||
}
|
||||
|
||||
public synchronized void setBlacklisted(boolean blacklisted) {
|
||||
this.blacklisted = blacklisted;
|
||||
}
|
||||
|
||||
public boolean isBlacklisted() {
|
||||
return blacklisted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the node status.
|
||||
* The return code is true if the node state changed enough to
|
||||
|
@ -34,6 +34,7 @@ import org.apache.slider.server.appmaster.management.BoolMetric;
|
||||
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
|
||||
import org.apache.slider.server.appmaster.management.Timestamp;
|
||||
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
|
||||
import org.apache.slider.server.appmaster.operations.UpdateBlacklistOperation;
|
||||
import org.apache.slider.server.avro.LoadedRoleHistory;
|
||||
import org.apache.slider.server.avro.NodeEntryRecord;
|
||||
import org.apache.slider.server.avro.RoleHistoryHeader;
|
||||
@ -49,6 +50,7 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@ -546,6 +548,38 @@ public class RoleHistory {
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized UpdateBlacklistOperation updateBlacklist(
|
||||
Collection<RoleStatus> roleStatuses) {
|
||||
List<String> blacklistAdditions = new ArrayList<>();
|
||||
List<String> blacklistRemovals = new ArrayList<>();
|
||||
for (Entry<String, NodeInstance> nodeInstanceEntry : nodemap.entrySet()) {
|
||||
boolean shouldBeBlacklisted = false;
|
||||
String nodeHost = nodeInstanceEntry.getKey();
|
||||
NodeInstance nodeInstance = nodeInstanceEntry.getValue();
|
||||
for (RoleStatus roleStatus : roleStatuses) {
|
||||
if (nodeInstance.exceedsFailureThreshold(roleStatus)) {
|
||||
shouldBeBlacklisted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (shouldBeBlacklisted) {
|
||||
if (!nodeInstance.isBlacklisted()) {
|
||||
blacklistAdditions.add(nodeHost);
|
||||
nodeInstance.setBlacklisted(true);
|
||||
}
|
||||
} else {
|
||||
if (nodeInstance.isBlacklisted()) {
|
||||
blacklistRemovals.add(nodeHost);
|
||||
nodeInstance.setBlacklisted(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (blacklistAdditions.isEmpty() && blacklistRemovals.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return new UpdateBlacklistOperation(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a node for use
|
||||
* @param role role
|
||||
|
Loading…
x
Reference in New Issue
Block a user