MAPREDUCE-5270. Migrated MR app from using BuilderUtil factory methods to individual record factory methods. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1486271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-25 01:46:14 +00:00
parent 259edf8dca
commit 643155cbee
25 changed files with 123 additions and 119 deletions

View File

@ -260,6 +260,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5230. Bring back NLineInputFormat.createFileSplit for binary
compatibility with mapred in 1.x (Mayank Bansal via vinodkv)
MAPREDUCE-5270. Migrated MR app from using BuilderUtil factory methods to
individual record factory methods. (Jian He via vinodkv)
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -132,7 +132,6 @@
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@ -598,8 +597,8 @@ private static LocalResource createLocalResource(FileSystem fc, Path file,
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
return BuilderUtils.newLocalResource(resourceURL, type, visibility,
resourceSize, resourceModificationTime);
return LocalResource.newInstance(resourceURL, type, visibility,
resourceSize, resourceModificationTime);
}
/**
@ -762,10 +761,9 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
// Construct the actual Container
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container = BuilderUtils
.newContainerLaunchContext(localResources,
environment, null, serviceData, taskCredentialsBuffer,
applicationACLs);
ContainerLaunchContext container =
ContainerLaunchContext.newInstance(localResources, environment, null,
serviceData, taskCredentialsBuffer, applicationACLs);
return container;
}
@ -806,7 +804,7 @@ static ContainerLaunchContext createContainerLaunchContext(
}
// Construct the actual Container
ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
ContainerLaunchContext container = ContainerLaunchContext.newInstance(
commonContainerSpec.getLocalResources(), myEnv, commands,
myServiceData, commonContainerSpec.getTokens().duplicate(),
applicationACLs);
@ -1096,7 +1094,7 @@ public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
// launching the container on an NM, these are already completed tasks, so
// setting them to null and RMIdentifier as 0
container =
BuilderUtils.newContainer(containerId, containerNodeId,
Container.newInstance(containerId, containerNodeId,
nodeHttpAddress, null, null, null, 0);
computeRackAndLocality();
launchTime = taInfo.getStartTime();

View File

@ -239,7 +239,11 @@ protected void containerFailedOnHost(String hostName) {
// if ask already sent to RM, we can try and overwrite it if possible.
// send a new ask to RM with numContainers
// specified for the blacklisted host to be 0.
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
ResourceRequest zeroedRequest =
ResourceRequest.newInstance(req.getPriority(),
req.getHostName(), req.getCapability(),
req.getNumContainers());
zeroedRequest.setNumContainers(0);
// to be sent to RM on next heartbeat
addResourceRequestToAsk(zeroedRequest);

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.BuilderUtils;
@XmlRootElement(name = "jobAttempt")
@XmlAccessorType(XmlAccessType.FIELD)
@ -53,7 +52,7 @@ public AMAttemptInfo(AMInfo amInfo, String jobId, String user) {
int nmPort = amInfo.getNodeManagerPort();
if (nmHost != null) {
this.nodeHttpAddress = nmHost + ":" + nmHttpPort;
NodeId nodeId = BuilderUtils.newNodeId(nmHost, nmPort);
NodeId nodeId = NodeId.newInstance(nmHost, nmPort);
this.nodeId = nodeId.toString();
}

View File

@ -272,8 +272,8 @@ private class TestParams {
String workDir = setupTestWorkDir();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(appId);

View File

@ -91,13 +91,13 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
@ -164,7 +164,7 @@ private static ContainerId getContainerId(ApplicationId applicationId,
ApplicationAttemptId appAttemptId =
getApplicationAttemptId(applicationId, startCount);
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, startCount);
ContainerId.newInstance(appAttemptId, startCount);
return containerId;
}
@ -231,9 +231,9 @@ public void init(Configuration conf) {
this.clusterInfo.getMaxContainerCapability());
} else {
getContext().getClusterInfo().setMinContainerCapability(
BuilderUtils.newResource(1024, 1));
Resource.newInstance(1024, 1));
getContext().getClusterInfo().setMaxContainerCapability(
BuilderUtils.newResource(10240, 1));
Resource.newInstance(10240, 1));
}
}
@ -517,8 +517,8 @@ public void handle(ContainerAllocatorEvent event) {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++);
NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
Container container = BuilderUtils.newContainer(cId, nodeId,
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
Container container = Container.newInstance(cId, nodeId,
NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -205,10 +206,10 @@ protected AMRMProtocol createSchedulerProxy() {
throws IOException {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(BuilderUtils
.newResource(1024, 1));
response.setMaximumResourceCapability(BuilderUtils
.newResource(10240, 1));
response.setMinimumResourceCapability(Resource.newInstance(
1024, 1));
response.setMaximumResourceCapability(Resource.newInstance(
10240, 1));
return response;
}
@ -236,14 +237,13 @@ public AllocateResponse allocate(AllocateRequest request)
int numContainers = req.getNumContainers();
for (int i = 0; i < numContainers; i++) {
ContainerId containerId =
BuilderUtils.newContainerId(
ContainerId.newInstance(
request.getApplicationAttemptId(),
request.getResponseId() + i);
containers.add(BuilderUtils
.newContainer(containerId, BuilderUtils.newNodeId("host"
+ containerId.getId(), 2345),
"host" + containerId.getId() + ":5678", req
.getCapability(), req.getPriority(), null, 0));
containers.add(Container.newInstance(containerId,
NodeId.newInstance("host" + containerId.getId(), 2345),
"host" + containerId.getId() + ":5678",
req.getCapability(), req.getPriority(), null, 0));
}
}

View File

@ -622,9 +622,9 @@ public Configuration loadConfFile() throws IOException {
}
private static AMInfo createAMInfo(int attempt) {
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
BuilderUtils.newApplicationId(100, 1), attempt);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
}

View File

@ -361,9 +361,9 @@ public void testMRAppMasterCredentials() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
ApplicationAttemptId applicationAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId =
BuilderUtils.newContainerId(applicationAttemptId, 546);
ContainerId.newInstance(applicationAttemptId, 546);
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
// Create staging dir, so MRAppMaster doesn't barf.

View File

@ -496,7 +496,7 @@ public void testReportedAppProgress() throws Exception {
rm.sendAMLaunched(appAttemptId);
rmDispatcher.await();
MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
MRApp mrApp = new MRApp(appAttemptId, ContainerId.newInstance(
appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
@Override
protected Dispatcher createDispatcher() {
@ -612,7 +612,7 @@ private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
MRApp mrApp, Task task) throws Exception {
TaskAttempt attempt = task.getAttempts().values().iterator().next();
List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(),
contStatus.add(ContainerStatus.newInstance(attempt.getAssignedContainerID(),
ContainerState.COMPLETE, "", 0));
Map<ApplicationId,List<ContainerStatus>> statusUpdate =
new HashMap<ApplicationId,List<ContainerStatus>>(1);
@ -648,7 +648,7 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception {
rm.sendAMLaunched(appAttemptId);
rmDispatcher.await();
MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
MRApp mrApp = new MRApp(appAttemptId, ContainerId.newInstance(
appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
@Override
protected Dispatcher createDispatcher() {
@ -1229,7 +1229,7 @@ public synchronized Allocation allocate(
List<ContainerId> release) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req
ResourceRequest reqCopy = ResourceRequest.newInstance(req
.getPriority(), req.getHostName(), req.getCapability(), req
.getNumContainers());
askCopy.add(reqCopy);
@ -1255,7 +1255,7 @@ private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
Resource containerNeed = BuilderUtils.newResource(memory, 1);
Resource containerNeed = Resource.newInstance(memory, 1);
if (earlierFailedAttempt) {
return ContainerRequestEvent
.createContainerRequestEventForFailedContainer(attemptId,
@ -1338,8 +1338,8 @@ private static AppContext createAppContext(
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
when(context.getJob(isA(JobId.class))).thenReturn(job);
when(context.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
.newResource(10240, 1)));
new ClusterInfo(Resource.newInstance(1024, 1), Resource.newInstance(
10240, 1)));
when(context.getEventHandler()).thenReturn(new EventHandler() {
@Override
public void handle(Event event) {
@ -1412,12 +1412,12 @@ protected void unregister() {
@Override
protected Resource getMinContainerCapability() {
return BuilderUtils.newResource(1024, 1);
return Resource.newInstance(1024, 1);
}
@Override
protected Resource getMaxContainerCapability() {
return BuilderUtils.newResource(10240, 1);
return Resource.newInstance(10240, 1);
}
public void sendRequest(ContainerRequestEvent req) {
@ -1665,11 +1665,14 @@ public void testCompletedContainerEvent() {
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
MRBuilderUtils.newTaskId(
MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
ContainerStatus status = BuilderUtils.newContainerStatus(
ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1);
ContainerStatus status = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "", 0);
ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
ContainerStatus abortedStatus = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "",
ContainerExitStatus.ABORTED);

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@ -219,7 +220,7 @@ private class TestMRApp extends MRAppMaster {
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) {
super(applicationAttemptId, BuilderUtils.newContainerId(
super(applicationAttemptId, ContainerId.newInstance(
applicationAttemptId, 1), "testhost", 2222, 3333,
System.currentTimeMillis(), maxAppAttempts);
this.allocator = allocator;

View File

@ -198,8 +198,8 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb,
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
app.setClusterInfo(new ClusterInfo(BuilderUtils
.newResource(minContainerSize, 1), BuilderUtils.newResource(10240, 1)));
app.setClusterInfo(new ClusterInfo(Resource
.newInstance(minContainerSize, 1), Resource.newInstance(10240, 1)));
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
@ -320,7 +320,7 @@ public void handle(JobHistoryEvent event) {
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -345,8 +345,8 @@ public void testLaunchFailedWhileKilling() throws Exception {
mock(Token.class), new Credentials(),
new SystemClock(), null);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@ -370,7 +370,7 @@ public void testLaunchFailedWhileKilling() throws Exception {
public void testContainerCleanedWhileRunning() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -402,8 +402,8 @@ public void testContainerCleanedWhileRunning() throws Exception {
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@ -428,7 +428,7 @@ public void testContainerCleanedWhileRunning() throws Exception {
public void testContainerCleanedWhileCommitting() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -460,8 +460,8 @@ public void testContainerCleanedWhileCommitting() throws Exception {
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@ -489,7 +489,7 @@ public void testContainerCleanedWhileCommitting() throws Exception {
public void testDoubleTooManyFetchFailure() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -521,8 +521,8 @@ public void testDoubleTooManyFetchFailure() throws Exception {
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@ -555,7 +555,7 @@ public void testDoubleTooManyFetchFailure() throws Exception {
@Test
public void testAppDiognosticEventOnUnassignedTask() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
@ -587,8 +587,8 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@ -605,7 +605,7 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
@Test
public void testAppDiognosticEventOnNewTask() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
@ -637,8 +637,8 @@ public void testAppDiognosticEventOnNewTask() throws Exception {
jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);

View File

@ -83,7 +83,7 @@ public class TestContainerLauncher {
public void testPoolSize() throws InterruptedException {
ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
@ -104,7 +104,7 @@ public void testPoolSize() throws InterruptedException {
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
for (int i = 0; i < 10; i++) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
ContainerId containerId = ContainerId.newInstance(appAttemptId, i);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
@ -126,7 +126,7 @@ public void testPoolSize() throws InterruptedException {
Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId,
ContainerId containerId = ContainerId.newInstance(appAttemptId,
i + 10);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId,
i + 10);
@ -143,7 +143,7 @@ public void testPoolSize() throws InterruptedException {
// Core pool size should be 21 but the live pool size should be only 11.
containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = false;
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 21);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host11:1234", null,
@ -158,12 +158,12 @@ public void testPoolSize() throws InterruptedException {
@Test
public void testPoolLimits() throws InterruptedException {
ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 10);
AppContext context = mock(AppContext.class);
CustomContainerLauncher containerLauncher = new CustomContainerLauncher(

View File

@ -19,12 +19,11 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.atLeast;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -53,9 +52,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -66,6 +68,7 @@
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestContainerLauncherImpl {
static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
@ -122,8 +125,8 @@ public void waitForPoolToIdle() throws InterruptedException {
public static ContainerId makeContainerId(long ts, int appId, int attemptId,
int id) {
return BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
return ContainerId.newInstance(
ApplicationAttemptId.newInstance(
BuilderUtils.newApplicationId(ts, appId), attemptId), id);
}
@ -406,10 +409,10 @@ public void testContainerCleaned() throws Exception {
private ContainerToken createNewContainerToken(ContainerId contId,
String containerManagerAddr) {
return BuilderUtils.newContainerToken(BuilderUtils.newNodeId("127.0.0.1",
return BuilderUtils.newContainerToken(NodeId.newInstance("127.0.0.1",
1234), "password".getBytes(), new ContainerTokenIdentifier(
contId, containerManagerAddr, "user",
BuilderUtils.newResource(1024, 1),
Resource.newInstance(1024, 1),
System.currentTimeMillis() + 10000L, 123));
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -108,7 +109,7 @@ protected AMRMProtocol createSchedulerProxy() {
private static AppContext createAppContext() {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId attemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ApplicationAttemptId.newInstance(appId, 1);
Job job = mock(Job.class);
@SuppressWarnings("rawtypes")
EventHandler eventHandler = mock(EventHandler.class);
@ -117,8 +118,8 @@ private static AppContext createAppContext() {
when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
when(ctx.getJob(isA(JobId.class))).thenReturn(job);
when(ctx.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
.newResource(10240, 1)));
new ClusterInfo(Resource.newInstance(1024, 1), Resource.newInstance(
10240, 1)));
when(ctx.getEventHandler()).thenReturn(eventHandler);
return ctx;
}

View File

@ -46,8 +46,8 @@
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
@ -66,9 +66,9 @@
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.JerseyTest;
import com.sun.jersey.test.framework.WebAppDescriptor;
@ -967,7 +967,7 @@ public void verifyJobAttemptsGeneric(Job job, String nodeHttpAddress,
WebServicesTestUtils.checkStringMatch("nodeHttpAddress", nmHost + ":"
+ nmHttpPort, nodeHttpAddress);
WebServicesTestUtils.checkStringMatch("nodeId",
BuilderUtils.newNodeId(nmHost, nmPort).toString(), nodeId);
NodeId.newInstance(nmHost, nmPort).toString(), nodeId);
assertTrue("startime not greater than 0", startTime > 0);
WebServicesTestUtils.checkStringMatch("containerId", amInfo
.getContainerId().toString(), containerId);

View File

@ -58,11 +58,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.base.Charsets;
/**
* Helper class for MR applications
*/
@ -423,15 +420,10 @@ private static void parseDistributedCacheArtifacts(
getResourceDescription(orig.getType()) + orig.getResource() +
" conflicts with " + getResourceDescription(type) + u);
}
localResources.put(
linkName,
BuilderUtils.newLocalResource(
p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
localResources.put(linkName, LocalResource.newInstance(ConverterUtils
.getYarnUrlFromURI(p.toUri()), type, visibilities[i]
? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i]));
}
}
}

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.BuilderUtils;
@XmlRootElement(name = "jobAttempt")
@XmlAccessorType(XmlAccessType.FIELD)
@ -56,7 +55,7 @@ public AMAttemptInfo(AMInfo amInfo, String jobId, String user, String host,
int nmPort = amInfo.getNodeManagerPort();
if (nmHost != null) {
this.nodeHttpAddress = nmHost + ":" + nmHttpPort;
NodeId nodeId = BuilderUtils.newNodeId(nmHost, nmPort);
NodeId nodeId = NodeId.newInstance(nmHost, nmPort);
this.nodeId = nodeId.toString();
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -243,7 +244,7 @@ public void testLogsView2() throws IOException {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");
@ -271,7 +272,7 @@ public void testLogsViewSingle() throws IOException {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");
@ -302,7 +303,7 @@ public void testLogsViewBadStartEnd() throws IOException {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");

View File

@ -53,8 +53,8 @@
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
@ -913,7 +913,7 @@ public void verifyHsJobAttemptsGeneric(Job job, String nodeHttpAddress,
WebServicesTestUtils.checkStringMatch("nodeHttpAddress", nmHost + ":"
+ nmHttpPort, nodeHttpAddress);
WebServicesTestUtils.checkStringMatch("nodeId",
BuilderUtils.newNodeId(nmHost, nmPort).toString(), nodeId);
NodeId.newInstance(nmHost, nmPort).toString(), nodeId);
assertTrue("startime not greater than 0", startTime > 0);
WebServicesTestUtils.checkStringMatch("containerId", amInfo
.getContainerId().toString(), containerId);

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -478,7 +479,7 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
taReport.getContainerId().toString(),
taReport.getContainerId().getApplicationAttemptId()
.getApplicationId().toString(),
BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
NodeId.newInstance(taReport.getNodeManagerHost(),
taReport.getNodeManagerPort()).toString(), report.getUser());
} else {
if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
@ -489,7 +490,7 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
return new LogParams(
amInfo.getContainerId().toString(),
amInfo.getAppAttemptId().getApplicationId().toString(),
BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
NodeId.newInstance(amInfo.getNodeManagerHost(),
amInfo.getNodeManagerPort()).toString(), report.getUser());
}
} else {

View File

@ -88,7 +88,7 @@ private ApplicationReport getUnknownApplicationReport() {
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never
// used for a non running job
return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
YarnConfiguration.DEFAULT_APPLICATION_TYPE);

View File

@ -84,7 +84,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMTokenSelector;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
@ -475,9 +474,10 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer = BuilderUtils
.newContainerLaunchContext(localResources,
environment, vargsFinal, null, securityTokens, acls);
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(localResources, environment,
vargsFinal, null, securityTokens, acls);
// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext =

View File

@ -426,9 +426,9 @@ private GetCountersRequest getCountersRequest() {
private ApplicationReport getFinishedApplicationReport() {
ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
"N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
@ -436,9 +436,9 @@ private ApplicationReport getFinishedApplicationReport() {
private ApplicationReport getRunningApplicationReport(String host, int port) {
ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
"url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
YarnConfiguration.DEFAULT_APPLICATION_TYPE);

View File

@ -48,10 +48,9 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestMRJobsWithHistoryService {
@ -169,8 +168,8 @@ private void verifyJobReport(JobReport jobReport, JobId jobId) {
List<AMInfo> amInfos = jobReport.getAMInfos();
Assert.assertEquals(1, amInfos.size());
AMInfo amInfo = amInfos.get(0);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(jobId.getAppId(), 1);
ContainerId amContainerId = BuilderUtils.newContainerId(appAttemptId, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(jobId.getAppId(), 1);
ContainerId amContainerId = ContainerId.newInstance(appAttemptId, 1);
Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId());
Assert.assertEquals(amContainerId, amInfo.getContainerId());
Assert.assertTrue(jobReport.getSubmitTime() > 0);