Merge trunk into HA branch.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1214944 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
d937abd434
@ -145,6 +145,8 @@ Trunk (unreleased changes)
|
|||||||
|
|
||||||
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
||||||
|
|
||||||
|
HADOOP_7917. compilation of protobuf files fails in windows/cygwin. (tucu)
|
||||||
|
|
||||||
Release 0.23.1 - Unreleased
|
Release 0.23.1 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -296,17 +296,29 @@
|
|||||||
</goals>
|
</goals>
|
||||||
<configuration>
|
<configuration>
|
||||||
<target>
|
<target>
|
||||||
<echo file="${project.build.directory}/compile-proto.sh">
|
<echo file="target/compile-proto.sh">
|
||||||
PROTO_DIR=${basedir}/src/main/proto
|
PROTO_DIR=src/main/proto
|
||||||
ls $PROTO_DIR &> /dev/null
|
JAVA_DIR=target/generated-sources/java
|
||||||
if [ $? = 0 ]; then
|
which cygpath 2> /dev/null
|
||||||
JAVA_DIR=${project.build.directory}/generated-sources/java
|
if [ $? = 1 ]; then
|
||||||
mkdir -p $JAVA_DIR
|
IS_WIN=false
|
||||||
ls $PROTO_DIR/*.proto | xargs -n 1 protoc -I$PROTO_DIR --java_out=$JAVA_DIR
|
else
|
||||||
fi
|
IS_WIN=true
|
||||||
|
WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR`
|
||||||
|
WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR`
|
||||||
|
fi
|
||||||
|
mkdir -p $JAVA_DIR 2> /dev/null
|
||||||
|
for PROTO_FILE in `ls $PROTO_DIR/*.proto 2> /dev/null`
|
||||||
|
do
|
||||||
|
if [ "$IS_WIN" = "true" ]; then
|
||||||
|
protoc -I$WIN_PROTO_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE
|
||||||
|
else
|
||||||
|
protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
|
||||||
|
fi
|
||||||
|
done
|
||||||
</echo>
|
</echo>
|
||||||
<exec executable="sh" dir="${project.build.directory}" failonerror="true">
|
<exec executable="sh" dir="${basedir}" failonerror="true">
|
||||||
<arg line="./compile-proto.sh"/>
|
<arg line="target/compile-proto.sh"/>
|
||||||
</exec>
|
</exec>
|
||||||
</target>
|
</target>
|
||||||
</configuration>
|
</configuration>
|
||||||
@ -319,17 +331,29 @@
|
|||||||
</goals>
|
</goals>
|
||||||
<configuration>
|
<configuration>
|
||||||
<target>
|
<target>
|
||||||
<echo file="${project.build.directory}/compile-test-proto.sh">
|
<echo file="target/compile-test-proto.sh">
|
||||||
PROTO_DIR=${basedir}/src/test/proto
|
PROTO_DIR=src/test/proto
|
||||||
ls $PROTO_DIR &> /dev/null
|
JAVA_DIR=target/generated-test-sources/java
|
||||||
if [ $? = 0 ]; then
|
which cygpath 2> /dev/null
|
||||||
JAVA_DIR=${project.build.directory}/generated-test-sources/java
|
if [ $? = 1 ]; then
|
||||||
mkdir -p $JAVA_DIR
|
IS_WIN=false
|
||||||
ls $PROTO_DIR/*.proto | xargs -n 1 protoc -I$PROTO_DIR --java_out=$JAVA_DIR
|
else
|
||||||
fi
|
IS_WIN=true
|
||||||
|
WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR`
|
||||||
|
WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR`
|
||||||
|
fi
|
||||||
|
mkdir -p $JAVA_DIR 2> /dev/null
|
||||||
|
for PROTO_FILE in `ls $PROTO_DIR/*.proto 2> /dev/null`
|
||||||
|
do
|
||||||
|
if [ "$IS_WIN" = "true" ]; then
|
||||||
|
protoc -I$WIN_PROTO_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE
|
||||||
|
else
|
||||||
|
protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
|
||||||
|
fi
|
||||||
|
done
|
||||||
</echo>
|
</echo>
|
||||||
<exec executable="sh" dir="${project.build.directory}" failonerror="true">
|
<exec executable="sh" dir="${basedir}" failonerror="true">
|
||||||
<arg line="./compile-test-proto.sh"/>
|
<arg line="target/compile-test-proto.sh"/>
|
||||||
</exec>
|
</exec>
|
||||||
</target>
|
</target>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -228,17 +228,29 @@
|
|||||||
</goals>
|
</goals>
|
||||||
<configuration>
|
<configuration>
|
||||||
<target>
|
<target>
|
||||||
<echo file="${project.build.directory}/compile-proto.sh">
|
<echo file="target/compile-proto.sh">
|
||||||
PROTO_DIR=${basedir}/src/main/proto
|
PROTO_DIR=src/main/proto
|
||||||
ls $PROTO_DIR &> /dev/null
|
JAVA_DIR=target/generated-sources/java
|
||||||
if [ $? = 0 ]; then
|
which cygpath 2> /dev/null
|
||||||
JAVA_DIR=${project.build.directory}/generated-sources/java
|
if [ $? = 1 ]; then
|
||||||
mkdir -p $JAVA_DIR
|
IS_WIN=false
|
||||||
ls $PROTO_DIR/*.proto | xargs -n 1 protoc -I$PROTO_DIR --java_out=$JAVA_DIR
|
else
|
||||||
fi
|
IS_WIN=true
|
||||||
|
WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR`
|
||||||
|
WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR`
|
||||||
|
fi
|
||||||
|
mkdir -p $JAVA_DIR 2> /dev/null
|
||||||
|
for PROTO_FILE in `ls $PROTO_DIR/*.proto 2> /dev/null`
|
||||||
|
do
|
||||||
|
if [ "$IS_WIN" = "true" ]; then
|
||||||
|
protoc -I$WIN_PROTO_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE
|
||||||
|
else
|
||||||
|
protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
|
||||||
|
fi
|
||||||
|
done
|
||||||
</echo>
|
</echo>
|
||||||
<exec executable="sh" dir="${project.build.directory}" failonerror="true">
|
<exec executable="sh" dir="${basedir}" failonerror="true">
|
||||||
<arg line="./compile-proto.sh"/>
|
<arg line="target/compile-proto.sh"/>
|
||||||
</exec>
|
</exec>
|
||||||
</target>
|
</target>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -95,6 +95,9 @@ Release 0.23.1 - Unreleased
|
|||||||
MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas
|
MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas
|
||||||
Graves via vinodkv)
|
Graves via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster
|
||||||
|
(Anupam Seth via mahadev)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-3297. Moved log related components into yarn-common so that
|
MAPREDUCE-3297. Moved log related components into yarn-common so that
|
||||||
@ -302,6 +305,15 @@ Release 0.23.1 - Unreleased
|
|||||||
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
|
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
|
||||||
ResourceManager. (Arun C Murthy via vinodkv)
|
ResourceManager. (Arun C Murthy via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3484. Fixed JobEndNotifier to not get interrupted before completing
|
||||||
|
all its retries. (Ravi Prakash via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3531. Fixed a race in ContainerTokenSecretManager. (Robert Joseph
|
||||||
|
Evans via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-3560. TestRMNodeTransitions is failing on trunk.
|
||||||
|
(Siddharth Seth via mahadev)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -375,6 +375,16 @@ public void handle(JobFinishEvent event) {
|
|||||||
// this is the only job, so shut down the Appmaster
|
// this is the only job, so shut down the Appmaster
|
||||||
// note in a workflow scenario, this may lead to creation of a new
|
// note in a workflow scenario, this may lead to creation of a new
|
||||||
// job (FIXME?)
|
// job (FIXME?)
|
||||||
|
try {
|
||||||
|
LOG.info("Job end notification started for jobID : "
|
||||||
|
+ job.getReport().getJobId());
|
||||||
|
JobEndNotifier notifier = new JobEndNotifier();
|
||||||
|
notifier.setConf(getConfig());
|
||||||
|
notifier.notify(job.getReport());
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Job end notification interrupted for jobID : "
|
||||||
|
+ job.getReport().getJobId(), ie );
|
||||||
|
}
|
||||||
|
|
||||||
// TODO:currently just wait for some time so clients can know the
|
// TODO:currently just wait for some time so clients can know the
|
||||||
// final states. Will be removed once RM come on.
|
// final states. Will be removed once RM come on.
|
||||||
@ -390,16 +400,6 @@ public void handle(JobFinishEvent event) {
|
|||||||
stop();
|
stop();
|
||||||
|
|
||||||
// Send job-end notification
|
// Send job-end notification
|
||||||
try {
|
|
||||||
LOG.info("Job end notification started for jobID : "
|
|
||||||
+ job.getReport().getJobId());
|
|
||||||
JobEndNotifier notifier = new JobEndNotifier();
|
|
||||||
notifier.setConf(getConfig());
|
|
||||||
notifier.notify(job.getReport());
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
LOG.warn("Job end notification interrupted for jobID : "
|
|
||||||
+ job.getReport().getJobId(), ie );
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Graceful stop failed ", t);
|
LOG.warn("Graceful stop failed ", t);
|
||||||
}
|
}
|
||||||
|
@ -96,13 +96,20 @@ public void testNotifyRetries() throws InterruptedException {
|
|||||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
|
||||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
|
||||||
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
|
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3");
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3");
|
||||||
JobReport jobReport = Mockito.mock(JobReport.class);
|
JobReport jobReport = Mockito.mock(JobReport.class);
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
this.notificationCount = 0;
|
this.notificationCount = 0;
|
||||||
this.setConf(conf);
|
this.setConf(conf);
|
||||||
this.notify(jobReport);
|
this.notify(jobReport);
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
Assert.assertEquals("Only 3 retries were expected but was : "
|
Assert.assertEquals("Only 3 retries were expected but was : "
|
||||||
+ this.notificationCount, this.notificationCount, 3);
|
+ this.notificationCount, this.notificationCount, 3);
|
||||||
|
Assert.assertTrue("Should have taken more than 9 seconds it took "
|
||||||
|
+ (endTime - startTime), endTime - startTime > 9000);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -68,6 +68,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
@ -156,30 +157,37 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
|
|||||||
application = rm.getApplicationReport(appId);
|
application = rm.getApplicationReport(appId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
|
if(!conf.getBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, false)) {
|
||||||
UserGroupInformation.getCurrentUser().getUserName());
|
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
|
||||||
serviceAddr = application.getHost() + ":" + application.getRpcPort();
|
UserGroupInformation.getCurrentUser().getUserName());
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
serviceAddr = application.getHost() + ":" + application.getRpcPort();
|
||||||
String clientTokenEncoded = application.getClientToken();
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
Token<ApplicationTokenIdentifier> clientToken =
|
String clientTokenEncoded = application.getClientToken();
|
||||||
new Token<ApplicationTokenIdentifier>();
|
Token<ApplicationTokenIdentifier> clientToken =
|
||||||
clientToken.decodeFromUrlString(clientTokenEncoded);
|
new Token<ApplicationTokenIdentifier>();
|
||||||
// RPC layer client expects ip:port as service for tokens
|
clientToken.decodeFromUrlString(clientTokenEncoded);
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(application
|
// RPC layer client expects ip:port as service for tokens
|
||||||
.getHost(), application.getRpcPort());
|
InetSocketAddress addr = NetUtils.createSocketAddr(application
|
||||||
clientToken.setService(new Text(addr.getAddress().getHostAddress()
|
.getHost(), application.getRpcPort());
|
||||||
+ ":" + addr.getPort()));
|
clientToken.setService(new Text(addr.getAddress().getHostAddress()
|
||||||
newUgi.addToken(clientToken);
|
+ ":" + addr.getPort()));
|
||||||
}
|
newUgi.addToken(clientToken);
|
||||||
LOG.info("The url to track the job: " + application.getTrackingUrl());
|
|
||||||
LOG.debug("Connecting to " + serviceAddr);
|
|
||||||
final String tempStr = serviceAddr;
|
|
||||||
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
|
|
||||||
@Override
|
|
||||||
public MRClientProtocol run() throws IOException {
|
|
||||||
return instantiateAMProxy(tempStr);
|
|
||||||
}
|
}
|
||||||
});
|
LOG.info("The url to track the job: " + application.getTrackingUrl());
|
||||||
|
LOG.debug("Connecting to " + serviceAddr);
|
||||||
|
final String tempStr = serviceAddr;
|
||||||
|
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
|
||||||
|
@Override
|
||||||
|
public MRClientProtocol run() throws IOException {
|
||||||
|
return instantiateAMProxy(tempStr);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
logApplicationReportInfo(application);
|
||||||
|
LOG.info("Network ACL closed to AM for job " + jobId
|
||||||
|
+ ". Redirecting to job history server.");
|
||||||
|
return checkAndGetHSProxy(null, JobState.RUNNING);
|
||||||
|
}
|
||||||
return realProxy;
|
return realProxy;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
//possibly the AM has crashed
|
//possibly the AM has crashed
|
||||||
@ -240,10 +248,55 @@ public MRClientProtocol run() throws IOException {
|
|||||||
return realProxy;
|
return realProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logApplicationReportInfo(ApplicationReport application) {
|
||||||
|
if(application == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("AppId: " + application.getApplicationId()
|
||||||
|
+ " # reserved containers: "
|
||||||
|
+ application.getApplicationResourceUsageReport().getNumReservedContainers()
|
||||||
|
+ " # used containers: "
|
||||||
|
+ application.getApplicationResourceUsageReport().getNumUsedContainers()
|
||||||
|
+ " Needed resources (memory): "
|
||||||
|
+ application.getApplicationResourceUsageReport().getNeededResources().getMemory()
|
||||||
|
+ " Reserved resources (memory): "
|
||||||
|
+ application.getApplicationResourceUsageReport().getReservedResources().getMemory()
|
||||||
|
+ " Used resources (memory): "
|
||||||
|
+ application.getApplicationResourceUsageReport().getUsedResources().getMemory()
|
||||||
|
+ " Diagnostics: "
|
||||||
|
+ application.getDiagnostics()
|
||||||
|
+ " Start time: "
|
||||||
|
+ application.getStartTime()
|
||||||
|
+ " Finish time: "
|
||||||
|
+ application.getFinishTime()
|
||||||
|
+ " Host: "
|
||||||
|
+ application.getHost()
|
||||||
|
+ " Name: "
|
||||||
|
+ application.getName()
|
||||||
|
+ " Orig. tracking url: "
|
||||||
|
+ application.getOriginalTrackingUrl()
|
||||||
|
+ " Queue: "
|
||||||
|
+ application.getQueue()
|
||||||
|
+ " RPC port: "
|
||||||
|
+ application.getRpcPort()
|
||||||
|
+ " Tracking url: "
|
||||||
|
+ application.getTrackingUrl()
|
||||||
|
+ " User: "
|
||||||
|
+ application.getUser()
|
||||||
|
+ " Client token: "
|
||||||
|
+ application.getClientToken()
|
||||||
|
+ " Final appl. status: "
|
||||||
|
+ application.getFinalApplicationStatus()
|
||||||
|
+ " Yarn appl. state: "
|
||||||
|
+ application.getYarnApplicationState()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private MRClientProtocol checkAndGetHSProxy(
|
private MRClientProtocol checkAndGetHSProxy(
|
||||||
ApplicationReport applicationReport, JobState state) {
|
ApplicationReport applicationReport, JobState state) {
|
||||||
if (null == historyServerProxy) {
|
if (null == historyServerProxy) {
|
||||||
LOG.warn("Job History Server is not configured.");
|
LOG.warn("Job History Server is not configured or " +
|
||||||
|
"job information not yet available on History Server.");
|
||||||
return getNotRunningJob(applicationReport, state);
|
return getNotRunningJob(applicationReport, state);
|
||||||
}
|
}
|
||||||
return historyServerProxy;
|
return historyServerProxy;
|
||||||
@ -452,4 +505,4 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
|
|||||||
throw new IOException("Cannot get log path for a in-progress job");
|
throw new IOException("Cannot get log path for a in-progress job");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
@ -31,8 +33,13 @@
|
|||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
@ -45,15 +52,30 @@
|
|||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for ClientServiceDelegate.java
|
* Tests for ClientServiceDelegate.java
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
public class TestClientServiceDelegate {
|
public class TestClientServiceDelegate {
|
||||||
private JobID oldJobId = JobID.forName("job_1315895242400_2");
|
private JobID oldJobId = JobID.forName("job_1315895242400_2");
|
||||||
private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
|
private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
|
||||||
.toYarn(oldJobId);
|
.toYarn(oldJobId);
|
||||||
|
private boolean isAMReachableFromClient;
|
||||||
|
|
||||||
|
public TestClientServiceDelegate(boolean isAMReachableFromClient) {
|
||||||
|
this.isAMReachableFromClient = isAMReachableFromClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
Object[][] data = new Object[][] { { true }, { false } };
|
||||||
|
return Arrays.asList(data);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnknownAppInRM() throws Exception {
|
public void testUnknownAppInRM() throws Exception {
|
||||||
@ -150,9 +172,30 @@ public void testJobReportFromHistoryServer() throws Exception {
|
|||||||
Assert.assertEquals(1.0f, jobStatus.getMapProgress());
|
Assert.assertEquals(1.0f, jobStatus.getMapProgress());
|
||||||
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
|
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCountersFromHistoryServer() throws Exception {
|
||||||
|
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
||||||
|
when(historyServerProxy.getCounters(getCountersRequest())).thenReturn(
|
||||||
|
getCountersResponseFromHistoryServer());
|
||||||
|
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
|
||||||
|
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
|
||||||
|
.thenReturn(null);
|
||||||
|
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
|
||||||
|
historyServerProxy, rm);
|
||||||
|
|
||||||
|
Counters counters = TypeConverter.toYarn(clientServiceDelegate.getJobCounters(oldJobId));
|
||||||
|
Assert.assertNotNull(counters);
|
||||||
|
Assert.assertEquals(1001, counters.getCounterGroup("dummyCounters").getCounter("dummyCounter").getValue());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReconnectOnAMRestart() throws IOException {
|
public void testReconnectOnAMRestart() throws IOException {
|
||||||
|
//test not applicable when AM not reachable
|
||||||
|
//as instantiateAMProxy is not called at all
|
||||||
|
if(!isAMReachableFromClient) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
||||||
|
|
||||||
@ -186,7 +229,7 @@ public void testReconnectOnAMRestart() throws IOException {
|
|||||||
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
|
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
|
||||||
when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
|
when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
|
||||||
.thenReturn(jobReportResponse2);
|
.thenReturn(jobReportResponse2);
|
||||||
|
|
||||||
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
|
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
|
||||||
historyServerProxy, rmDelegate));
|
historyServerProxy, rmDelegate));
|
||||||
// First time, connection should be to AM1, then to AM2. Further requests
|
// First time, connection should be to AM1, then to AM2. Further requests
|
||||||
@ -210,13 +253,13 @@ public void testReconnectOnAMRestart() throws IOException {
|
|||||||
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
|
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
|
||||||
any(String.class));
|
any(String.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private GetJobReportRequest getJobReportRequest() {
|
private GetJobReportRequest getJobReportRequest() {
|
||||||
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
|
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
|
||||||
request.setJobId(jobId);
|
request.setJobId(jobId);
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
private GetJobReportResponse getJobReportResponse() {
|
private GetJobReportResponse getJobReportResponse() {
|
||||||
GetJobReportResponse jobReportResponse = Records
|
GetJobReportResponse jobReportResponse = Records
|
||||||
.newRecord(GetJobReportResponse.class);
|
.newRecord(GetJobReportResponse.class);
|
||||||
@ -227,6 +270,12 @@ private GetJobReportResponse getJobReportResponse() {
|
|||||||
return jobReportResponse;
|
return jobReportResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private GetCountersRequest getCountersRequest() {
|
||||||
|
GetCountersRequest request = Records.newRecord(GetCountersRequest.class);
|
||||||
|
request.setJobId(jobId);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
private ApplicationReport getFinishedApplicationReport() {
|
private ApplicationReport getFinishedApplicationReport() {
|
||||||
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
|
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
|
||||||
1234, 5), "user", "queue", "appname", "host", 124, null,
|
1234, 5), "user", "queue", "appname", "host", 124, null,
|
||||||
@ -251,6 +300,7 @@ private ClientServiceDelegate getClientServiceDelegate(
|
|||||||
MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
|
MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient);
|
||||||
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
||||||
conf, rm, oldJobId, historyServerProxy);
|
conf, rm, oldJobId, historyServerProxy);
|
||||||
return clientServiceDelegate;
|
return clientServiceDelegate;
|
||||||
@ -269,4 +319,21 @@ private GetJobReportResponse getJobReportResponseFromHistoryServer() {
|
|||||||
jobReportResponse.setJobReport(jobReport);
|
jobReportResponse.setJobReport(jobReport);
|
||||||
return jobReportResponse;
|
return jobReportResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private GetCountersResponse getCountersResponseFromHistoryServer() {
|
||||||
|
GetCountersResponse countersResponse = Records
|
||||||
|
.newRecord(GetCountersResponse.class);
|
||||||
|
Counter counter = Records.newRecord(Counter.class);
|
||||||
|
CounterGroup counterGroup = Records.newRecord(CounterGroup.class);
|
||||||
|
Counters counters = Records.newRecord(Counters.class);
|
||||||
|
counter.setDisplayName("dummyCounter");
|
||||||
|
counter.setName("dummyCounter");
|
||||||
|
counter.setValue(1001);
|
||||||
|
counterGroup.setName("dummyCounters");
|
||||||
|
counterGroup.setDisplayName("dummyCounters");
|
||||||
|
counterGroup.setCounter("dummyCounter", counter);
|
||||||
|
counters.setCounterGroup("dummyCounters", counterGroup);
|
||||||
|
countersResponse.setCounters(counters);
|
||||||
|
return countersResponse;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -145,6 +145,11 @@ public class YarnConfiguration extends Configuration {
|
|||||||
/** ACL used in case none is found. Allows nothing. */
|
/** ACL used in case none is found. Allows nothing. */
|
||||||
public static final String DEFAULT_YARN_APP_ACL = " ";
|
public static final String DEFAULT_YARN_APP_ACL = " ";
|
||||||
|
|
||||||
|
/** RM-AM ACL disabled. **/
|
||||||
|
public static final String RM_AM_NETWORK_ACL_CLOSED =
|
||||||
|
RM_PREFIX + "am.acl.disabled";
|
||||||
|
public static final boolean DEFAULT_RM_AM_NETWORK_ACL_CLOSED = false;
|
||||||
|
|
||||||
/** The address of the RM admin interface.*/
|
/** The address of the RM admin interface.*/
|
||||||
public static final String RM_ADMIN_ADDRESS =
|
public static final String RM_ADMIN_ADDRESS =
|
||||||
RM_PREFIX + "admin.address";
|
RM_PREFIX + "admin.address";
|
||||||
|
@ -18,8 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.security;
|
package org.apache.hadoop.yarn.server.security;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
@ -34,9 +34,9 @@ public class ContainerTokenSecretManager extends
|
|||||||
private static Log LOG = LogFactory
|
private static Log LOG = LogFactory
|
||||||
.getLog(ContainerTokenSecretManager.class);
|
.getLog(ContainerTokenSecretManager.class);
|
||||||
|
|
||||||
private Map<String, SecretKey> secretkeys =
|
Map<String, SecretKey> secretkeys =
|
||||||
new HashMap<String, SecretKey>();
|
new ConcurrentHashMap<String, SecretKey>();
|
||||||
|
|
||||||
// Used by master for generation of secretyKey per host
|
// Used by master for generation of secretyKey per host
|
||||||
public SecretKey createAndGetSecretKey(CharSequence hostName) {
|
public SecretKey createAndGetSecretKey(CharSequence hostName) {
|
||||||
String hostNameStr = hostName.toString();
|
String hostNameStr = hostName.toString();
|
||||||
|
@ -117,6 +117,12 @@
|
|||||||
<value>*</value>
|
<value>*</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Network ACL to AM closed.</description>
|
||||||
|
<name>yarn.resourcemanager.am.acl.disabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>The address of the RM admin interface.</description>
|
<description>The address of the RM admin interface.</description>
|
||||||
<name>yarn.resourcemanager.admin.address</name>
|
<name>yarn.resourcemanager.admin.address</name>
|
||||||
|
@ -45,6 +45,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -74,8 +75,9 @@ public void handle(SchedulerEvent event) {
|
|||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
InlineDispatcher rmDispatcher = new InlineDispatcher();
|
InlineDispatcher rmDispatcher = new InlineDispatcher();
|
||||||
|
|
||||||
rmContext =
|
rmContext =
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher, null, null, null);
|
new RMContextImpl(new MemStore(), rmDispatcher, null, null,
|
||||||
|
mock(DelegationTokenRenewer.class));
|
||||||
scheduler = mock(YarnScheduler.class);
|
scheduler = mock(YarnScheduler.class);
|
||||||
doAnswer(
|
doAnswer(
|
||||||
new Answer<Void>() {
|
new Answer<Void>() {
|
||||||
|
Loading…
Reference in New Issue
Block a user