MAPREDUCE-2693. Fix NPE in job-blacklisting. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1186529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-19 22:02:21 +00:00
parent ba66ca6856
commit 74748ec625
4 changed files with 319 additions and 48 deletions

View File

@ -1695,6 +1695,8 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2788. Normalize resource requests in FifoScheduler MAPREDUCE-2788. Normalize resource requests in FifoScheduler
appropriately. (Ahmed Radwan via acmurthy) appropriately. (Ahmed Radwan via acmurthy)
MAPREDUCE-2693. Fix NPE in job-blacklisting. (Hitesh Shah via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -509,18 +509,6 @@ void addMap(ContainerRequestEvent event) {
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
} else { } else {
for (String host : event.getHosts()) { for (String host : event.getHosts()) {
//host comes from data splitLocations which are hostnames. Containers
// use IP addresses.
//TODO Temporary fix for locality. Use resolvers from h-common.
// Cache to make this more efficient ?
InetAddress addr = null;
try {
addr = InetAddress.getByName(host);
} catch (UnknownHostException e) {
LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
}
if (addr != null) //Fallback to host if resolve fails.
host = addr.getHostAddress();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list == null) { if (list == null) {
list = new LinkedList<TaskAttemptId>(); list = new LinkedList<TaskAttemptId>();
@ -557,26 +545,101 @@ private void assign(List<Container> allocatedContainers) {
while (it.hasNext()) { while (it.hasNext()) {
Container allocated = it.next(); Container allocated = it.next();
LOG.info("Assigning container " + allocated); LOG.info("Assigning container " + allocated);
ContainerRequest assigned = assign(allocated);
if (assigned != null) { // check if allocated container meets memory requirements
// Update resource requests // and whether we have any scheduled tasks that need
decContainerReq(assigned); // a container to be assigned
boolean isAssignable = true;
Priority priority = allocated.getPriority();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
if (allocated.getResource().getMemory() < mapResourceReqt
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
+ " container memory less than required " + mapResourceReqt
+ " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty());
isAssignable = false;
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
if (allocated.getResource().getMemory() < reduceResourceReqt
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
+ " container memory less than required " + reduceResourceReqt
+ " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty());
isAssignable = false;
}
}
// send the container-assigned event to task attempt boolean blackListed = false;
eventHandler.handle(new TaskAttemptContainerAssignedEvent( ContainerRequest assigned = null;
assigned.attemptID, allocated));
assignedRequests.add(allocated.getId(), assigned.attemptID); if (isAssignable) {
// do not assign if allocated container is on a
// blacklisted host
blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
if (blackListed) {
// we need to request for a new container
// and release the current one
LOG.info("Got allocated container on a blacklisted "
+ " host. Releasing container " + allocated);
LOG.info("Assigned container (" + allocated + ") " + // find the request matching this allocated container
" to task " + assigned.attemptID + // and replace it with a new one
" on node " + allocated.getNodeId().toString()); ContainerRequest toBeReplacedReq =
} else { getContainerReqToReplace(allocated);
//not assigned to any request, release the container if (toBeReplacedReq != null) {
LOG.info("Releasing unassigned and invalid container " + allocated LOG.info("Placing a new container request for task attempt "
+ ". RM has gone crazy, someone go look!" + toBeReplacedReq.attemptID);
+ " Hey RM, if you are so rich, go donate to non-profits!"); ContainerRequest newReq =
getFilteredContainerRequest(toBeReplacedReq);
decContainerReq(toBeReplacedReq);
if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
TaskType.MAP) {
maps.put(newReq.attemptID, newReq);
}
else {
reduces.put(newReq.attemptID, newReq);
}
addContainerReq(newReq);
}
else {
LOG.info("Could not map allocated container to a valid request."
+ " Releasing allocated container " + allocated);
}
}
else {
assigned = assign(allocated);
if (assigned != null) {
// Update resource requests
decContainerReq(assigned);
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
assigned.attemptID, allocated));
assignedRequests.add(allocated.getId(), assigned.attemptID);
LOG.info("Assigned container (" + allocated + ") " +
" to task " + assigned.attemptID +
" on node " + allocated.getNodeId().toString());
}
else {
//not assigned to any request, release the container
LOG.info("Releasing unassigned and invalid container "
+ allocated + ". RM has gone crazy, someone go look!"
+ " Hey RM, if you are so rich, go donate to non-profits!");
}
}
}
// release container if it was blacklisted
// or if we could not assign it
if (blackListed || assigned == null) {
containersReleased++; containersReleased++;
release(allocated.getId()); release(allocated.getId());
} }
@ -604,12 +667,37 @@ private ContainerRequest assign(Container allocated) {
return assigned; return assigned;
} }
private ContainerRequest getContainerReqToReplace(Container allocated) {
Priority priority = allocated.getPriority();
ContainerRequest toBeReplaced = null;
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
// allocated container was for a map
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list != null && list.size() > 0) {
TaskAttemptId tId = list.removeLast();
if (maps.containsKey(tId)) {
toBeReplaced = maps.remove(tId);
}
}
else {
TaskAttemptId tId = maps.keySet().iterator().next();
toBeReplaced = maps.remove(tId);
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
TaskAttemptId tId = reduces.keySet().iterator().next();
toBeReplaced = reduces.remove(tId);
}
return toBeReplaced;
}
private ContainerRequest assignToFailedMap(Container allocated) { private ContainerRequest assignToFailedMap(Container allocated) {
//try to assign to earlierFailedMaps if present //try to assign to earlierFailedMaps if present
ContainerRequest assigned = null; ContainerRequest assigned = null;
while (assigned == null && earlierFailedMaps.size() > 0 && while (assigned == null && earlierFailedMaps.size() > 0) {
allocated.getResource().getMemory() >= mapResourceReqt) {
TaskAttemptId tId = earlierFailedMaps.removeFirst(); TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) { if (maps.containsKey(tId)) {
assigned = maps.remove(tId); assigned = maps.remove(tId);
@ -627,8 +715,7 @@ private ContainerRequest assignToFailedMap(Container allocated) {
private ContainerRequest assignToReduce(Container allocated) { private ContainerRequest assignToReduce(Container allocated) {
ContainerRequest assigned = null; ContainerRequest assigned = null;
//try to assign to reduces if present //try to assign to reduces if present
if (assigned == null && reduces.size() > 0 if (assigned == null && reduces.size() > 0) {
&& allocated.getResource().getMemory() >= reduceResourceReqt) {
TaskAttemptId tId = reduces.keySet().iterator().next(); TaskAttemptId tId = reduces.keySet().iterator().next();
assigned = reduces.remove(tId); assigned = reduces.remove(tId);
LOG.info("Assigned to reduce"); LOG.info("Assigned to reduce");
@ -640,9 +727,8 @@ private ContainerRequest assignToMap(Container allocated) {
//try to assign to maps if present //try to assign to maps if present
//first by host, then by rack, followed by * //first by host, then by rack, followed by *
ContainerRequest assigned = null; ContainerRequest assigned = null;
while (assigned == null && maps.size() > 0 while (assigned == null && maps.size() > 0) {
&& allocated.getResource().getMemory() >= mapResourceReqt) { String host = allocated.getNodeId().getHost();
String host = getHost(allocated.getNodeId().toString());
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) { while (list != null && list.size() > 0) {
LOG.info("Host matched to the request list " + host); LOG.info("Host matched to the request list " + host);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.rm; package org.apache.hadoop.mapreduce.v2.app.rm;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -63,7 +65,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
//Key->ResourceName (e.g., hostname, rackname, *) //Key->ResourceName (e.g., hostname, rackname, *)
//Value->Map //Value->Map
//Key->Resource Capability //Key->Resource Capability
//Value->ResourceReqeust //Value->ResourceRequest
private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>> private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
remoteRequestsTable = remoteRequestsTable =
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>(); new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
@ -87,14 +89,22 @@ static class ContainerRequest {
final String[] racks; final String[] racks;
//final boolean earlierAttemptFailed; //final boolean earlierAttemptFailed;
final Priority priority; final Priority priority;
public ContainerRequest(ContainerRequestEvent event, Priority priority) { public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this.attemptID = event.getAttemptID(); this(event.getAttemptID(), event.getCapability(), event.getHosts(),
this.capability = event.getCapability(); event.getRacks(), priority);
this.hosts = event.getHosts(); }
this.racks = event.getRacks();
//this.earlierAttemptFailed = event.getEarlierAttemptFailed(); public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority; this.priority = priority;
} }
} }
@Override @Override
@ -149,14 +159,37 @@ protected void containerFailedOnHost(String hostName) {
//remove all the requests corresponding to this hostname //remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){ : remoteRequestsTable.values()){
//remove from host //remove from host if no pending allocations
Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName); boolean foundAll = true;
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName);
if (reqMap != null) { if (reqMap != null) {
for (ResourceRequest req : reqMap.values()) { for (ResourceRequest req : reqMap.values()) {
ask.remove(req); if (!ask.remove(req)) {
foundAll = false;
}
else {
// if ask already sent to RM, we can try and overwrite it if possible.
// send a new ask to RM with numContainers
// specified for the blacklisted host to be 0.
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
zeroedRequest.setNumContainers(0);
// to be sent to RM on next heartbeat
ask.add(zeroedRequest);
}
}
// if all requests were still in ask queue
// we can remove this request
if (foundAll) {
remoteRequests.remove(hostName);
} }
} }
//TODO: remove from rack // TODO handling of rack blacklisting
// Removing from rack should be dependent on no. of failures within the rack
// Blacklisting a rack on the basis of a single node's blacklisting
// may be overly aggressive.
// Node failures could be co-related with other failures on the same rack
// but we probably need a better approach at trying to decide how and when
// to blacklist a rack
} }
} else { } else {
nodeFailures.put(hostName, failures); nodeFailures.put(hostName, failures);
@ -171,7 +204,9 @@ protected void addContainerReq(ContainerRequest req) {
// Create resource requests // Create resource requests
for (String host : req.hosts) { for (String host : req.hosts) {
// Data-local // Data-local
addResourceRequest(req.priority, host, req.capability); if (!isNodeBlacklisted(host)) {
addResourceRequest(req.priority, host, req.capability);
}
} }
// Nothing Rack-local for now // Nothing Rack-local for now
@ -234,6 +269,14 @@ private void decResourceRequest(Priority priority, String resourceName,
Map<String, Map<Resource, ResourceRequest>> remoteRequests = Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority); this.remoteRequestsTable.get(priority);
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName); Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
// as we modify the resource requests by filtering out blacklisted hosts
// when they are added, this value may be null when being
// decremented
LOG.debug("Not decrementing resource as " + resourceName
+ " is not present in request table");
return;
}
ResourceRequest remoteRequest = reqMap.get(capability); ResourceRequest remoteRequest = reqMap.get(capability);
LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId() LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
@ -267,4 +310,23 @@ protected void release(ContainerId containerId) {
release.add(containerId); release.add(containerId);
} }
protected boolean isNodeBlacklisted(String hostname) {
if (!nodeBlacklistingEnabled) {
return false;
}
return blacklistedNodes.contains(hostname);
}
protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
ArrayList<String> newHosts = new ArrayList<String>();
for (String host : orig.hosts) {
if (!isNodeBlacklisted(host)) {
newHosts.add(host);
}
}
String[] hosts = newHosts.toArray(new String[newHosts.size()]);
ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
hosts, orig.racks, orig.priority);
return newReq;
}
} }

View File

@ -34,6 +34,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -44,6 +45,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@ -478,6 +480,105 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception {
Assert.assertEquals(100.0f, app.getProgress(), 0.0); Assert.assertEquals(100.0f, app.getProgress(), 0.0);
} }
@Test
public void testBlackListedNodes() throws Exception {
LOG.info("Running testBlackListedNodes");
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0, "jobfile"));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
dispatcher.await();
// create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
new String[] { "h1" });
allocator.sendRequest(event1);
// send 1 more request with different resource req
ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
new String[] { "h2" });
allocator.sendRequest(event2);
// send another request with different resource and priority
ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
new String[] { "h3" });
allocator.sendRequest(event3);
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Send events to blacklist nodes h1 and h2
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
allocator.sendFailure(f1);
ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
allocator.sendFailure(f2);
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
nodeManager2.nodeHeartbeat(true); // Node heartbeat
dispatcher.await();
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// mark h1/h2 as bad nodes
nodeManager1.nodeHeartbeat(false);
nodeManager2.nodeHeartbeat(false);
dispatcher.await();
assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
nodeManager3.nodeHeartbeat(true); // Node heartbeat
assigned = allocator.schedule();
dispatcher.await();
Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
// validate that all containers are assigned to h3
for (TaskAttemptContainerAssignedEvent assig : assigned) {
Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
.getContainer().getNodeId().getHost()));
}
}
private static class MyFifoScheduler extends FifoScheduler { private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) { public MyFifoScheduler(RMContext rmContext) {
@ -534,6 +635,19 @@ private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
new String[] { NetworkTopology.DEFAULT_RACK }); new String[] { NetworkTopology.DEFAULT_RACK });
} }
private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
String host, boolean reduce) {
TaskId taskId;
if (reduce) {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
} else {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
return new ContainerFailedEvent(attemptId, host);
}
private void checkAssignments(ContainerRequestEvent[] requests, private void checkAssignments(ContainerRequestEvent[] requests,
List<TaskAttemptContainerAssignedEvent> assignments, List<TaskAttemptContainerAssignedEvent> assignments,
boolean checkHostMatch) { boolean checkHostMatch) {
@ -653,6 +767,10 @@ public void sendRequests(List<ContainerRequestEvent> reqs) {
} }
} }
public void sendFailure(ContainerFailedEvent f) {
super.handle(f);
}
// API to be used by tests // API to be used by tests
public List<TaskAttemptContainerAssignedEvent> schedule() { public List<TaskAttemptContainerAssignedEvent> schedule() {
// run the scheduler // run the scheduler
@ -672,6 +790,7 @@ public List<TaskAttemptContainerAssignedEvent> schedule() {
protected void startAllocatorThread() { protected void startAllocatorThread() {
// override to NOT start thread // override to NOT start thread
} }
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -681,5 +800,7 @@ public static void main(String[] args) throws Exception {
t.testMapReduceScheduling(); t.testMapReduceScheduling();
t.testReportedAppProgress(); t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps(); t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes();
} }
} }