MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else but just before ClientService to avoid race conditions during RM restart. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521699 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b5c6ff164a
commit
1c1ebc1553
@ -186,6 +186,10 @@ Release 2.1.1-beta - UNRELEASED
|
||||
MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit
|
||||
subclass (Sandy Ryza)
|
||||
|
||||
MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else
|
||||
but just before ClientService to avoid race conditions during RM restart.
|
||||
(Jian He via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
|
||||
|
@ -362,7 +362,10 @@ protected void serviceInit(final Configuration conf) throws Exception {
|
||||
|
||||
//service to handle requests from JobClient
|
||||
clientService = createClientService(context);
|
||||
addIfService(clientService);
|
||||
// Init ClientService separately so that we stop it separately, since this
|
||||
// service needs to wait some time before it stops so clients can know the
|
||||
// final states
|
||||
clientService.init(conf);
|
||||
|
||||
containerAllocator = createContainerAllocator(clientService, context);
|
||||
|
||||
@ -425,7 +428,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
|
||||
// queued inside the JobHistoryEventHandler
|
||||
addIfService(historyService);
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
} // end of init()
|
||||
|
||||
@ -534,14 +536,6 @@ public void shutDownJob() {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO:currently just wait for some time so clients can know the
|
||||
// final states. Will be removed once RM come on.
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
try {
|
||||
//if isLastAMRetry comes as true, should never set it to false
|
||||
if ( !isLastAMRetry){
|
||||
@ -556,6 +550,14 @@ public void shutDownJob() {
|
||||
LOG.info("Calling stop for all the services");
|
||||
MRAppMaster.this.stop();
|
||||
|
||||
// TODO: Stop ClientService last, since only ClientService should wait for
|
||||
// some time so clients can know the final states. Will be removed once RM come on.
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
clientService.stop();
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Graceful stop failed ", t);
|
||||
}
|
||||
@ -1019,8 +1021,10 @@ protected void serviceStart() throws Exception {
|
||||
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
|
||||
+ "job " + job.getID() + ".");
|
||||
}
|
||||
// Start ClientService here, since it's not initialized if
|
||||
// errorHappenedShutDown is true
|
||||
clientService.start();
|
||||
}
|
||||
|
||||
//start all the components
|
||||
super.serviceStart();
|
||||
|
||||
|
@ -1,28 +1,30 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.client;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
public interface ClientService {
|
||||
import org.apache.hadoop.service.Service;
|
||||
|
||||
InetSocketAddress getBindAddress();
|
||||
public interface ClientService extends Service {
|
||||
|
||||
int getHttpPort();
|
||||
public abstract InetSocketAddress getBindAddress();
|
||||
|
||||
public abstract int getHttpPort();
|
||||
}
|
||||
|
@ -94,8 +94,7 @@
|
||||
* jobclient (user facing).
|
||||
*
|
||||
*/
|
||||
public class MRClientService extends AbstractService
|
||||
implements ClientService {
|
||||
public class MRClientService extends AbstractService implements ClientService {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(MRClientService.class);
|
||||
|
||||
@ -106,7 +105,7 @@ public class MRClientService extends AbstractService
|
||||
private AppContext appContext;
|
||||
|
||||
public MRClientService(AppContext appContext) {
|
||||
super("MRClientService");
|
||||
super(MRClientService.class.getName());
|
||||
this.appContext = appContext;
|
||||
this.protocolHandler = new MRClientProtocolHandler();
|
||||
}
|
||||
|
@ -55,6 +55,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
@ -603,7 +604,7 @@ public void recoverTask(TaskAttemptContext taskContext)
|
||||
|
||||
@Override
|
||||
protected ClientService createClientService(AppContext context) {
|
||||
return new ClientService(){
|
||||
return new MRClientService(context) {
|
||||
@Override
|
||||
public InetSocketAddress getBindAddress() {
|
||||
return NetUtils.createSocketAddr("localhost:9876");
|
||||
|
@ -0,0 +1,121 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestMRAppComponentDependencies {
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testComponentStopOrder() throws Exception {
|
||||
@SuppressWarnings("resource")
|
||||
TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true);
|
||||
JobImpl job = (JobImpl) app.submit(new Configuration());
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
app.verifyCompleted();
|
||||
|
||||
int waitTime = 20 * 1000;
|
||||
while (waitTime > 0 && app.numStops < 2) {
|
||||
Thread.sleep(100);
|
||||
waitTime -= 100;
|
||||
}
|
||||
|
||||
// assert JobHistoryEventHandlerStopped and then clientServiceStopped
|
||||
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
|
||||
Assert.assertEquals(2, app.clientServiceStopped);
|
||||
}
|
||||
|
||||
private final class TestMRApp extends MRApp {
|
||||
int JobHistoryEventHandlerStopped;
|
||||
int clientServiceStopped;
|
||||
int numStops;
|
||||
|
||||
public TestMRApp(int maps, int reduces, boolean autoComplete,
|
||||
String testName, boolean cleanOnStart) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||
JobHistoryEventHandlerStopped = 0;
|
||||
clientServiceStopped = 0;
|
||||
numStops = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Job createJob(Configuration conf, JobStateInternal forcedState,
|
||||
String diagnostic) {
|
||||
UserGroupInformation currentUser = null;
|
||||
try {
|
||||
currentUser = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
Job newJob =
|
||||
new TestJob(getJobId(), getAttemptID(), conf, getDispatcher()
|
||||
.getEventHandler(), getTaskAttemptListener(), getContext()
|
||||
.getClock(), getCommitter(), isNewApiCommitter(),
|
||||
currentUser.getUserName(), getContext(), forcedState, diagnostic);
|
||||
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
||||
|
||||
getDispatcher().register(JobFinishEvent.Type.class,
|
||||
createJobFinishEventHandler());
|
||||
|
||||
return newJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClientService createClientService(AppContext context) {
|
||||
return new MRClientService(context) {
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
numStops++;
|
||||
clientServiceStopped = numStops;
|
||||
super.serviceStop();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||
AppContext context) {
|
||||
return new JobHistoryEventHandler(context, getStartCount()) {
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
numStops++;
|
||||
JobHistoryEventHandlerStopped = numStops;
|
||||
super.serviceStop();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
@ -41,6 +41,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
@ -284,14 +285,12 @@ public boolean getTestIsLastAMRetry(){
|
||||
private final class MRAppTestCleanup extends MRApp {
|
||||
int stagingDirCleanedup;
|
||||
int ContainerAllocatorStopped;
|
||||
int JobHistoryEventHandlerStopped;
|
||||
int numStops;
|
||||
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
|
||||
String testName, boolean cleanOnStart) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||
stagingDirCleanedup = 0;
|
||||
ContainerAllocatorStopped = 0;
|
||||
JobHistoryEventHandlerStopped = 0;
|
||||
numStops = 0;
|
||||
}
|
||||
|
||||
@ -318,26 +317,6 @@ protected Job createJob(Configuration conf, JobStateInternal forcedState,
|
||||
return newJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||
AppContext context) {
|
||||
return new TestJobHistoryEventHandler(context, getStartCount());
|
||||
}
|
||||
|
||||
private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
|
||||
|
||||
public TestJobHistoryEventHandler(AppContext context, int startCount) {
|
||||
super(context, startCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
numStops++;
|
||||
JobHistoryEventHandlerStopped = numStops;
|
||||
super.serviceStop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerAllocator createContainerAllocator(
|
||||
ClientService clientService, AppContext context) {
|
||||
@ -405,15 +384,13 @@ public void testStagingCleanupOrder() throws Exception {
|
||||
app.verifyCompleted();
|
||||
|
||||
int waitTime = 20 * 1000;
|
||||
while (waitTime > 0 && app.numStops < 3 ) {
|
||||
while (waitTime > 0 && app.numStops < 2) {
|
||||
Thread.sleep(100);
|
||||
waitTime -= 100;
|
||||
}
|
||||
|
||||
// assert JobHistoryEventHandlerStopped first, then
|
||||
// ContainerAllocatorStopped, and then stagingDirCleanedup
|
||||
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
|
||||
Assert.assertEquals(2, app.ContainerAllocatorStopped);
|
||||
Assert.assertEquals(3, app.stagingDirCleanedup);
|
||||
// assert ContainerAllocatorStopped and then tagingDirCleanedup
|
||||
Assert.assertEquals(1, app.ContainerAllocatorStopped);
|
||||
Assert.assertEquals(2, app.stagingDirCleanedup);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user