YARN-716. Making ApplicationID immutable. Contributed by Siddharth Seth.

MAPREDUCE-5282. Updating MR App to use immutable ApplicationID after YARN-716. Contributed by Siddharth Seth.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-30 20:18:39 +00:00
parent 9f551fa542
commit 982753dc8e
38 changed files with 136 additions and 227 deletions

View File

@ -443,6 +443,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5261. Fix issues in TestRMContainerAllocator after YARN-617.
(Omkar Vinit Joshi via vinodkv)
MAPREDUCE-5282. Updating MR App to use immutable ApplicationID after
YARN-716. (Siddharth Seth via vinodkv)
BREAKDOWN OF HADOOP-8562 SUBTASKS
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.

View File

@ -128,9 +128,7 @@ public class MRApp extends MRAppMaster {
static ApplicationId applicationId;
static {
applicationId = recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
applicationId = ApplicationId.newInstance(0, 0);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,

View File

@ -789,9 +789,7 @@ class MyAppContext implements AppContext {
private final Map<JobId, Job> allJobs;
MyAppContext(int numberMaps, int numberReduces) {
myApplicationID = recordFactory.newRecordInstance(ApplicationId.class);
myApplicationID.setClusterTimestamp(clock.getTime());
myApplicationID.setId(1);
myApplicationID = ApplicationId.newInstance(clock.getTime(), 1);
myAppAttemptID = recordFactory
.newRecordInstance(ApplicationAttemptId.class);

View File

@ -84,9 +84,8 @@ public void testDeletionofStaging() throws IOException {
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
@ -113,9 +112,8 @@ public void testNoDeletionofStagingOnReboot() throws IOException {
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
@ -141,9 +139,8 @@ public void testDeletionofStagingOnReboot() throws IOException {
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
@ -169,9 +166,8 @@ public void testDeletionofStagingOnKill() throws IOException {
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
@ -197,9 +193,8 @@ public void testDeletionofStagingOnKillLastTry() throws IOException {
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);

View File

@ -224,9 +224,7 @@ public void setup() {
metrics = mock(MRAppMetrics.class);
dataLocations = new String[1];
appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(1);
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
jobId = Records.newRecord(JobId.class);
jobId.setId(1);

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
import org.junit.Before;
import org.junit.Test;
@ -41,7 +40,7 @@ public class TestAppController {
public void setUp() {
AppContext context = mock(AppContext.class);
when(context.getApplicationID()).thenReturn(
Records.newRecord(ApplicationId.class));
ApplicationId.newInstance(0, 0));
App app = new App(context);
Configuration conf = new Configuration();
ctx = mock(RequestContext.class);

View File

@ -76,9 +76,8 @@ public static JobId toYarn(org.apache.hadoop.mapreduce.JobID id) {
JobId jobId = recordFactory.newRecordInstance(JobId.class);
jobId.setId(id.getId()); //currently there is 1-1 mapping between appid and jobid
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setId(id.getId());
appId.setClusterTimestamp(toClusterTimeStamp(id.getJtIdentifier()));
ApplicationId appId = ApplicationId.newInstance(
toClusterTimeStamp(id.getJtIdentifier()), id.getId());
jobId.setAppId(appId);
return jobId;
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapreduce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
@ -28,18 +31,13 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.QueueState;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import org.mockito.Mockito;
@ -74,14 +72,16 @@ public void testEnums() throws Exception {
public void testFromYarn() throws Exception {
int appStartTime = 612354;
YarnApplicationState state = YarnApplicationState.RUNNING;
ApplicationId applicationId = new ApplicationIdPBImpl();
ApplicationReportPBImpl applicationReport = new ApplicationReportPBImpl();
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationReport applicationReport = Records
.newRecord(ApplicationReport.class);
applicationReport.setApplicationId(applicationId);
applicationReport.setYarnApplicationState(state);
applicationReport.setStartTime(appStartTime);
applicationReport.setUser("TestTypeConverter-user");
ApplicationResourceUsageReportPBImpl appUsageRpt = new ApplicationResourceUsageReportPBImpl();
ResourcePBImpl r = new ResourcePBImpl();
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
r.setMemory(2048);
appUsageRpt.setNeededResources(r);
appUsageRpt.setNumReservedContainers(1);
@ -107,8 +107,9 @@ public void testFromYarnApplicationReport() {
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
String jobFile = "dummy-path/job.xml";
ApplicationResourceUsageReportPBImpl appUsageRpt = new ApplicationResourceUsageReportPBImpl();
ResourcePBImpl r = new ResourcePBImpl();
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
r.setMemory(2048);
appUsageRpt.setNeededResources(r);
appUsageRpt.setNumReservedContainers(1);
@ -134,7 +135,8 @@ public void testFromYarnApplicationReport() {
@Test
public void testFromYarnQueueInfo() {
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = new QueueInfoPBImpl();
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = Records
.newRecord(org.apache.hadoop.yarn.api.records.QueueInfo.class);
queueInfo.setQueueState(org.apache.hadoop.yarn.api.records.QueueState.STOPPED);
org.apache.hadoop.mapreduce.QueueInfo returned =
TypeConverter.fromYarn(queueInfo, new Configuration());

View File

@ -81,7 +81,7 @@ private static void delete(File dir) throws IOException {
@Test (timeout = 120000)
public void testJobIDtoString() {
JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
jid.setAppId(ApplicationId.newInstance(0, 0));
assertEquals("job_0_0000", MRApps.toString(jid));
}
@ -103,7 +103,7 @@ public void testJobIDShort() {
public void testTaskIDtoString() {
TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
tid.getJobId().setAppId(ApplicationId.newInstance(0, 0));
tid.setTaskType(TaskType.MAP);
TaskType type = tid.getTaskType();
System.err.println(type);
@ -145,7 +145,7 @@ public void testTaskAttemptIDtoString() {
taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
taid.getTaskId().setTaskType(TaskType.MAP);
taid.getTaskId().setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
taid.getTaskId().getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
taid.getTaskId().getJobId().setAppId(ApplicationId.newInstance(0, 0));
assertEquals("attempt_0_0000_m_000000_0", MRApps.toString(taid));
}

View File

@ -21,7 +21,6 @@
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
@ -30,7 +29,6 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.JobConf;
@ -40,7 +38,6 @@
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
/**
* A JUnit for testing availability and accessibility of shuffle related API.
@ -181,10 +178,6 @@ public void testConsumerApi() {
* AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins)
*/
public void testProviderApi() {
ApplicationId mockApplicationId = mock(ApplicationId.class);
mockApplicationId.setClusterTimestamp(new Long(10));
mockApplicationId.setId(mock(JobID.class).getId());
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
JobConf mockJobConf = mock(JobConf.class);
try {

View File

@ -74,8 +74,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
public void init(Configuration conf) throws YarnException {
LOG.info("JobHistory Init");
this.conf = conf;
this.appID = RecordFactoryProvider.getRecordFactory(conf)
.newRecordInstance(ApplicationId.class);
this.appID = ApplicationId.newInstance(0, 0);
this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
.newRecordInstance(ApplicationAttemptId.class);

View File

@ -129,7 +129,7 @@ private ApplicationReport getApplicationReport(
ApplicationResourceUsageReport appResources = Mockito
.mock(ApplicationResourceUsageReport.class);
Mockito.when(appReport.getApplicationId()).thenReturn(
Records.newRecord(ApplicationId.class));
ApplicationId.newInstance(0, 0));
Mockito.when(appResources.getNeededResources()).thenReturn(
Records.newRecord(Resource.class));
Mockito.when(appResources.getReservedResources()).thenReturn(

View File

@ -140,9 +140,7 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation)
).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class),
any(String.class), any(Credentials.class));
appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(1);
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
jobId = TypeConverter.fromYarn(appId);
if (testWorkDir.exists()) {
FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);

View File

@ -79,7 +79,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@ -549,9 +548,8 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
// $x/$user/appcache/$appId/output/$mapId
// TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job
JobID jobID = JobID.forName(jobId);
ApplicationId appID = Records.newRecord(ApplicationId.class);
appID.setClusterTimestamp(Long.parseLong(jobID.getJtIdentifier()));
appID.setId(jobID.getId());
ApplicationId appID = ApplicationId.newInstance(
Long.parseLong(jobID.getJtIdentifier()), jobID.getId());
final String base =
ContainerLocalizer.USERCACHE + "/" + user + "/"
+ ContainerLocalizer.APPCACHE + "/"

View File

@ -72,6 +72,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-571. Remove user from ContainerLaunchContext. (Omkar Vinit Joshi via
vinodkv)
YARN-716. Making ApplicationID immutable. (Siddharth Seth via vinodkv)
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p><code>ApplicationId</code> represents the <em>globally unique</em>
@ -40,6 +40,14 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
public static final String appIdStrPrefix = "application_";
public static ApplicationId newInstance(long clusterTimestamp, int id) {
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(clusterTimestamp);
appId.setId(id);
appId.build();
return appId;
}
/**
* Get the short integer identifier of the <code>ApplicationId</code>
* which is unique for all applications started by a particular instance
@ -51,8 +59,7 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
public abstract int getId();
@Private
@Unstable
public abstract void setId(int id);
protected abstract void setId(int id);
/**
* Get the <em>start time</em> of the <code>ResourceManager</code> which is
@ -62,10 +69,9 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
public abstract long getClusterTimestamp();
@Private
@Unstable
public abstract void setClusterTimestamp(long clusterTimestamp);
protected abstract void setClusterTimestamp(long clusterTimestamp);
protected abstract void build();
static final ThreadLocal<NumberFormat> appIdFormat =
new ThreadLocal<NumberFormat>() {

View File

@ -21,58 +21,49 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProtoOrBuilder;
import com.google.common.base.Preconditions;
public class ApplicationIdPBImpl extends ApplicationId {
ApplicationIdProto proto = ApplicationIdProto.getDefaultInstance();
ApplicationIdProto proto = null;
ApplicationIdProto.Builder builder = null;
boolean viaProto = false;
public ApplicationIdPBImpl() {
builder = ApplicationIdProto.newBuilder();
}
public ApplicationIdPBImpl(ApplicationIdProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized ApplicationIdProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
public ApplicationIdProto getProto() {
return proto;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ApplicationIdProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized int getId() {
ApplicationIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getId());
public int getId() {
Preconditions.checkNotNull(proto);
return proto.getId();
}
@Override
public synchronized void setId(int id) {
maybeInitBuilder();
builder.setId((id));
protected void setId(int id) {
builder.setId(id);
}
@Override
public synchronized long getClusterTimestamp() {
ApplicationIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getClusterTimestamp());
public long getClusterTimestamp() {
Preconditions.checkNotNull(proto);
return proto.getClusterTimestamp();
}
@Override
public synchronized void setClusterTimestamp(long clusterTimestamp) {
maybeInitBuilder();
protected void setClusterTimestamp(long clusterTimestamp) {
builder.setClusterTimestamp((clusterTimestamp));
}
@Override
protected void build() {
proto = builder.build();
}
}

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -90,9 +89,8 @@ public void testSubmitApplication() {
for (int i = 0; i < exitStates.length; ++i) {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
ApplicationId applicationId = Records.newRecord(ApplicationId.class);
applicationId.setClusterTimestamp(System.currentTimeMillis());
applicationId.setId(i);
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), i);
when(context.getApplicationId()).thenReturn(applicationId);
((MockYarnClient) client).setYarnApplicationState(exitStates[i]);
try {

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import static org.apache.hadoop.yarn.util.StringHelper.*;
@ -45,10 +44,8 @@ public static ApplicationId toAppID(String prefix, String s, Iterator<String> it
throwParseException(sjoin(prefix, ID), s);
}
shouldHaveNext(prefix, s, it);
ApplicationId appId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(Long.parseLong(it.next()));
shouldHaveNext(prefix, s, it);
appId.setId(Integer.parseInt(it.next()));
ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
Integer.parseInt(it.next()));
return appId;
}

View File

@ -132,28 +132,17 @@ public static LocalResource newLocalResource(URI uri,
public static ApplicationId newApplicationId(RecordFactory recordFactory,
long clustertimestamp, CharSequence id) {
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setId(Integer.valueOf(id.toString()));
applicationId.setClusterTimestamp(clustertimestamp);
return applicationId;
return ApplicationId.newInstance(clustertimestamp,
Integer.valueOf(id.toString()));
}
public static ApplicationId newApplicationId(RecordFactory recordFactory,
long clusterTimeStamp, int id) {
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setId(id);
applicationId.setClusterTimestamp(clusterTimeStamp);
return applicationId;
return ApplicationId.newInstance(clusterTimeStamp, id);
}
public static ApplicationId newApplicationId(long clusterTimeStamp, int id) {
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setId(id);
applicationId.setClusterTimestamp(clusterTimeStamp);
return applicationId;
return ApplicationId.newInstance(clusterTimeStamp, id);
}
public static ApplicationAttemptId newApplicationAttemptId(
@ -166,11 +155,8 @@ public static ApplicationAttemptId newApplicationAttemptId(
}
public static ApplicationId convert(long clustertimestamp, CharSequence id) {
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setId(Integer.valueOf(id.toString()));
applicationId.setClusterTimestamp(clustertimestamp);
return applicationId;
return ApplicationId.newInstance(clustertimestamp,
Integer.valueOf(id.toString()));
}
public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,

View File

@ -114,18 +114,15 @@ public static ApplicationId toApplicationId(RecordFactory recordFactory,
private static ApplicationId toApplicationId(RecordFactory recordFactory,
Iterator<String> it) {
ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(Long.parseLong(it.next()));
appId.setId(Integer.parseInt(it.next()));
ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
Integer.parseInt(it.next()));
return appId;
}
private static ApplicationAttemptId toApplicationAttemptId(
Iterator<String> it) throws NumberFormatException {
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(Long.parseLong(it.next()));
appId.setId(Integer.parseInt(it.next()));
ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
Integer.parseInt(it.next()));
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
@ -135,9 +132,8 @@ private static ApplicationAttemptId toApplicationAttemptId(
private static ApplicationId toApplicationId(
Iterator<String> it) throws NumberFormatException {
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(Long.parseLong(it.next()));
appId.setId(Integer.parseInt(it.next()));
ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
Integer.parseInt(it.next()));
return appId;
}

View File

@ -62,10 +62,7 @@ public static String newQueue() {
}
public static ApplicationId newAppID(int i) {
ApplicationId id = Records.newRecord(ApplicationId.class);
id.setClusterTimestamp(TS);
id.setId(i);
return id;
return ApplicationId.newInstance(TS, i);
}
public static ApplicationAttemptId newAppAttemptID(ApplicationId appId, int i) {

View File

@ -91,12 +91,9 @@ private void testRPCTimeout(String rpcClass) throws Exception {
.newRecordInstance(ContainerLaunchContext.class);
ContainerId containerId = recordFactory
.newRecordInstance(ContainerId.class);
ApplicationId applicationId = recordFactory
.newRecordInstance(ApplicationId.class);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId applicationAttemptId = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(0);
containerId.setApplicationAttemptId(applicationAttemptId);

View File

@ -113,12 +113,9 @@ private void test(String rpcClass) throws Exception {
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerId containerId =
recordFactory.newRecordInstance(ContainerId.class);
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId applicationAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(0);
containerId.setApplicationAttemptId(applicationAttemptId);

View File

@ -126,10 +126,7 @@ public long getRMIdentifier() {
ContainerLaunchContext launchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId applicationAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
applicationAttemptId.setApplicationId(applicationId);

View File

@ -249,9 +249,7 @@ private void createFiles(String dir, String subDir, int numOfFiles) {
}
private ContainerId createContainerId() {
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
Records.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);

View File

@ -220,9 +220,8 @@ public ContainerManager run() {
}
public static ContainerId createContainerId() {
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);

View File

@ -151,8 +151,6 @@ public RegisterNodeManagerResponse registerNodeManager(
return response;
}
ApplicationId applicationID = recordFactory
.newRecordInstance(ApplicationId.class);
ApplicationAttemptId appAttemptID = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
ContainerId firstContainerID = recordFactory
@ -191,12 +189,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
org.apache.hadoop.yarn.api.records.Container mockContainer =
mock(org.apache.hadoop.yarn.api.records.Container.class);
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM.
applicationID.setId(heartBeatID);
appAttemptID.setApplicationId(applicationID);
appAttemptID.setApplicationId(appId1);
firstContainerID.setApplicationAttemptId(appAttemptID);
firstContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory
@ -213,7 +214,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getContainersStatuses().size());
Assert.assertEquals("Number of container for the app should be one!",
1, appToContainers.get(applicationID).size());
1, appToContainers.get(appId1).size());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =
@ -221,8 +222,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals(1, activeContainers.size());
// Give another container to the NM.
applicationID.setId(heartBeatID);
appAttemptID.setApplicationId(applicationID);
appAttemptID.setApplicationId(appId2);
secondContainerID.setApplicationAttemptId(appAttemptID);
secondContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory
@ -239,7 +239,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals("Number of applications should only be one!", 1,
appToContainers.size());
Assert.assertEquals("Number of container for the app should be two!",
2, appToContainers.get(applicationID).size());
2, appToContainers.get(appId2).size());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.apache.hadoop.yarn.service.Service.STATE.INITED;
import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
import static org.apache.hadoop.yarn.service.Service.STATE.STOPPED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -30,17 +34,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.Service;
import static org.apache.hadoop.yarn.service.Service.STATE.*;
import org.junit.Test;
public class TestAuxServices {
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
@ -123,18 +120,17 @@ public void testAuxEventDispatch() {
aux.init(conf);
aux.start();
ApplicationId appId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
appId.setId(65);
ApplicationId appId1 = ApplicationId.newInstance(0, 65);
ByteBuffer buf = ByteBuffer.allocate(6);
buf.putChar('A');
buf.putInt(65);
buf.flip();
AuxServicesEvent event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_INIT, "user0", appId, "Asrv", buf);
AuxServicesEventType.APPLICATION_INIT, "user0", appId1, "Asrv", buf);
aux.handle(event);
appId.setId(66);
ApplicationId appId2 = ApplicationId.newInstance(0, 66);
event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
AuxServicesEventType.APPLICATION_STOP, "user0", appId2, "Bsrv", null);
// verify all services got the stop event
aux.handle(event);
Collection<AuxServices.AuxiliaryService> servs = aux.getServices();

View File

@ -78,9 +78,7 @@ public TestContainerManager() throws UnsupportedFileSystemException {
}
private ContainerId createContainerId() {
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);

View File

@ -165,9 +165,7 @@ public void testContainerEnvVariables() throws Exception {
Container mockContainer = mock(Container.class);
// ////// Construct the Container-id
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
@ -339,9 +337,7 @@ public void testDelayedKill() throws Exception {
Container mockContainer = mock(Container.class);
// ////// Construct the Container-id
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(1);
appId.setId(1);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);

View File

@ -709,10 +709,7 @@ public void testLogAggregationForRealContainerLaunch() throws IOException,
recordFactory.newRecordInstance(ContainerLaunchContext.class);
Container mockContainer = mock(Container.class);
// ////// Construct the Container-id
ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);

View File

@ -204,10 +204,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
recordFactory.newRecordInstance(ContainerLaunchContext.class);
Container mockContainer = mock(Container.class);
// ////// Construct the Container-id
ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);

View File

@ -157,8 +157,7 @@ public void testGetApplicationReport() throws YarnRemoteException {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(recordFactory
.newRecordInstance(ApplicationId.class));
request.setApplicationId(ApplicationId.newInstance(0, 0));
GetApplicationReportResponse applicationReport = rmService
.getApplicationReport(request);
Assert.assertNull("It should return null as application report for absent application.",
@ -436,11 +435,7 @@ private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
}
private ApplicationId getApplicationId(int id) {
ApplicationId applicationId = recordFactory
.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(123456);
applicationId.setId(id);
return applicationId;
return ApplicationId.newInstance(123456, id);
}
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Comparator;
@ -30,6 +31,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -51,13 +53,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.mockito.Mockito.*;
public class TestCapacityScheduler {
@ -468,15 +467,9 @@ public void testApplicationComparator()
{
CapacityScheduler cs = new CapacityScheduler();
Comparator<FiCaSchedulerApp> appComparator= cs.getApplicationComparator();
ApplicationId id1 = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
id1.setClusterTimestamp(1);
id1.setId(1);
ApplicationId id2 = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
id2.setClusterTimestamp(1);
id2.setId(2);
ApplicationId id3 = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
id3.setClusterTimestamp(2);
id3.setId(1);
ApplicationId id1 = ApplicationId.newInstance(1, 1);
ApplicationId id2 = ApplicationId.newInstance(1, 2);
ApplicationId id3 = ApplicationId.newInstance(2, 1);
//same clusterId
FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class);
when(app1.getApplicationId()).thenReturn(id1);

View File

@ -36,8 +36,7 @@ public class TestFSSchedulerApp {
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
appIdImpl.setId(appId);
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
return attId;

View File

@ -143,8 +143,7 @@ private Configuration createConfiguration() {
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
appIdImpl.setId(appId);
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
return attId;

View File

@ -97,9 +97,7 @@ public void tearDown() throws Exception {
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory
.newRecordInstance(ApplicationId.class);
appIdImpl.setId(appId);
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
return attId;