diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 18fb1eafa5..921aa6385b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -447,6 +447,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2989. Modified JobHistory to link to task and AM logs from the JobHistoryServer. (Siddharth Seth via vinodkv) + MAPREDUCE-3146. Added a MR specific command line to dump logs for a + given TaskAttemptID. (Siddharth Seth via vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index c6f1200d8a..7a6b86a0f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -48,9 +48,9 @@ import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; @@ -100,8 +100,6 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; @@ -130,9 +128,6 @@ public class MRAppMaster extends CompositeService { private static final Log LOG = LogFactory.getLog(MRAppMaster.class); - private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - private Clock clock; private final long startTime; private final long appSubmitTime; @@ -758,8 +753,8 @@ public void start() { amInfos = new LinkedList(); } AMInfo amInfo = - new AMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, - nmHttpPort); + MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, + nmPort, nmHttpPort); amInfos.add(amInfo); // /////////////////// Create the job itself. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index cf586743ec..fdba78d9b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobACL; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index b9496123d2..ea8bbdf5d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; @@ -61,6 +60,7 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.v2.api.records.Counters; @@ -580,13 +580,14 @@ public JobReport getReport() { if (getState() == JobState.NEW) { return MRBuilderUtils.newJobReport(jobId, jobName, username, state, - startTime, finishTime, setupProgress, 0.0f, - 0.0f, cleanupProgress, remoteJobConfFile.toString()); + appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, + cleanupProgress, remoteJobConfFile.toString(), amInfos); } return MRBuilderUtils.newJobReport(jobId, jobName, username, state, - startTime, finishTime, setupProgress, computeProgress(mapTasks), - computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString()); + appSubmitTime, startTime, finishTime, setupProgress, + computeProgress(mapTasks), computeProgress(reduceTasks), + cleanupProgress, remoteJobConfFile.toString(), amInfos); } finally { readLock.unlock(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index d354bda1fa..78f823f07f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -158,6 +159,8 @@ public abstract class TaskAttemptImpl implements private long finishTime; private WrappedProgressSplitsBlock progressSplitBlock; private int shufflePort = -1; + private String trackerName; + private int httpPort; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -423,7 +426,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) stateMachine; private ContainerId containerID; - private String nodeHostName; + private NodeId containerNodeId; private String containerMgrAddress; private String nodeHttpAddress; private WrappedJvmID jvmID; @@ -763,6 +766,12 @@ public TaskAttemptReport getReport() { result.setPhase(reportedStatus.phase); result.setStateString(reportedStatus.stateString); result.setCounters(getCounters()); + result.setContainerId(this.getAssignedContainerID()); + result.setNodeManagerHost(trackerName); + result.setNodeManagerHttpPort(httpPort); + if (this.containerNodeId != null) { + result.setNodeManagerPort(this.containerNodeId.getPort()); + } return result; } finally { readLock.unlock(); @@ -1001,8 +1010,8 @@ public void transition(final TaskAttemptImpl taskAttempt, final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; taskAttempt.containerID = cEvent.getContainer().getId(); - taskAttempt.nodeHostName = cEvent.getContainer().getNodeId().getHost(); - taskAttempt.containerMgrAddress = cEvent.getContainer().getNodeId() + taskAttempt.containerNodeId = cEvent.getContainer().getNodeId(); + taskAttempt.containerMgrAddress = taskAttempt.containerNodeId .toString(); taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress(); taskAttempt.containerToken = cEvent.getContainer().getContainerToken(); @@ -1113,6 +1122,8 @@ public void transition(TaskAttemptImpl taskAttempt, InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: // Costly? + taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); + taskAttempt.httpPort = nodeHttpInetAddr.getPort(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId() .getJobId()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java index 18aa0cbda2..95c4919d22 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Set; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.Dispatcher; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index a23519afe2..843e666c87 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -36,11 +36,11 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -158,13 +159,24 @@ public Clock getClock() { public Set getCompletedTasks() { return completedTasks.keySet(); } - + @Override public List getAMInfos() { if (jobInfo == null || jobInfo.getAMInfos() == null) { return new LinkedList(); } - return new LinkedList(jobInfo.getAMInfos()); + List amInfos = new LinkedList(); + for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo + .getAMInfos()) { + AMInfo amInfo = + MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(), + jhAmInfo.getStartTime(), jhAmInfo.getContainerId(), + jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(), + jhAmInfo.getNodeManagerHttpPort()); + + amInfos.add(amInfo); + } + return amInfos; } private void parse() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java index e3bb1f9d96..56a0a2f4c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java @@ -24,7 +24,7 @@ import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index a80a2a760c..02b8065722 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -96,6 +96,10 @@ public class MRApp extends MRAppMaster { private File testWorkDir; private Path testAbsPath; + + public static String NM_HOST = "localhost"; + public static int NM_PORT = 1234; + public static int NM_HTTP_PORT = 9999; private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -136,7 +140,8 @@ private static ContainerId getContainerId(ApplicationId applicationId, public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { super(getApplicationAttemptId(applicationId, startCount), getContainerId( - applicationId, startCount), "testhost", 2222, 3333, System.currentTimeMillis()); + applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System + .currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); LOG.info("PathUsed: " + testAbsPath); @@ -363,9 +368,9 @@ public void handle(ContainerAllocatorEvent event) { ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); cId.setId(containerCount++); - NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234); + NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT); Container container = BuilderUtils.newContainer(cId, nodeId, - "localhost:9999", null, null, null); + NM_HOST + ":" + NM_HTTP_PORT, null, null, null); JobID id = TypeConverter.fromYarn(applicationId); JobId jobId = TypeConverter.toYarn(id); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 8d4b08d1e8..76f71009f2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.MockApps; @@ -88,6 +89,10 @@ public class MockJobs extends MockApps { static final Iterator DIAGS = Iterators.cycle( "Error: java.lang.OutOfMemoryError: Java heap space", "Lost task tracker: tasktracker.domain/127.0.0.1:40879"); + + public static final String NM_HOST = "localhost"; + public static final int NM_PORT = 1234; + public static final int NM_HTTP_PORT = 9999; static final int DT = 1000000; // ms @@ -507,8 +512,7 @@ private static AMInfo createAMInfo(int attempt) { BuilderUtils.newApplicationAttemptId( BuilderUtils.newApplicationId(100, 1), attempt); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); - return new AMInfo(appAttemptId, System.currentTimeMillis(), containerId, - "testhost", 2222, 3333); - + return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(), + containerId, NM_HOST, NM_PORT, NM_HTTP_PORT); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java index f079c9254a..9c59269ec6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java @@ -32,8 +32,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.Phase; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; @@ -103,8 +106,9 @@ public void test() throws Exception { GetJobReportRequest gjrRequest = recordFactory.newRecordInstance(GetJobReportRequest.class); gjrRequest.setJobId(job.getID()); - Assert.assertNotNull("JobReport is null", - proxy.getJobReport(gjrRequest).getJobReport()); + JobReport jr = proxy.getJobReport(gjrRequest).getJobReport(); + verifyJobReport(jr); + GetTaskAttemptCompletionEventsRequest gtaceRequest = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class); @@ -123,8 +127,10 @@ public void test() throws Exception { GetTaskAttemptReportRequest gtarRequest = recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class); gtarRequest.setTaskAttemptId(attempt.getID()); - Assert.assertNotNull("TaskAttemptReport is null", - proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport()); + TaskAttemptReport tar = + proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport(); + verifyTaskAttemptReport(tar); + GetTaskReportRequest gtrRequest = recordFactory.newRecordInstance(GetTaskReportRequest.class); @@ -164,6 +170,31 @@ public void test() throws Exception { app.waitForState(job, JobState.SUCCEEDED); } + private void verifyJobReport(JobReport jr) { + Assert.assertNotNull("JobReport is null", jr); + List amInfos = jr.getAMInfos(); + Assert.assertEquals(1, amInfos.size()); + Assert.assertEquals(JobState.RUNNING, jr.getJobState()); + AMInfo amInfo = amInfos.get(0); + Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); + Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId()); + Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId() + .getAttemptId()); + Assert.assertTrue(amInfo.getStartTime() > 0); + } + + private void verifyTaskAttemptReport(TaskAttemptReport tar) { + Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState()); + Assert.assertNotNull("TaskAttemptReport is null", tar); + Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort()); + Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId() + .getAttemptId()); + } + class MRAppWithClientService extends MRApp { MRClientService clientService = null; MRAppWithClientService(int maps, int reduces, boolean autoComplete) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 25a6f87647..9dd877b330 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -117,8 +117,8 @@ public void testSimple() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, - 0, 0, 0, 0, 0, 0, "jobfile")); + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -194,8 +194,8 @@ public void testResource() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, - 0, 0, 0, 0, 0, 0, "jobfile")); + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -260,8 +260,8 @@ public void testMapReduceScheduling() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, - 0, 0, 0, 0, 0, 0, "jobfile")); + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -374,8 +374,8 @@ void setProgress(float setupProgress, float mapProgress, @Override public JobReport getReport() { return MRBuilderUtils.newJobReport(this.jobId, "job", "user", - JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress, - this.reduceProgress, this.cleanupProgress, "jobfile"); + JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress, + this.reduceProgress, this.cleanupProgress, "jobfile", null); } } @@ -510,8 +510,8 @@ public void testBlackListedNodes() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, - 0, 0, 0, 0, 0, 0, "jobfile")); + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null)); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 9de0b49f0c..277b097da4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -39,10 +39,10 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; @@ -221,9 +221,9 @@ public void testCrashed() throws Exception { .getAttemptId()); Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() .getApplicationAttemptId()); - Assert.assertEquals("testhost", amInfo.getNodeManagerHost()); - Assert.assertEquals(2222, amInfo.getNodeManagerPort()); - Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort()); + Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); } long am1StartTimeReal = job.getAMInfos().get(0).getStartTime(); long am2StartTimeReal = job.getAMInfos().get(1).getStartTime(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 2081e4d860..6698b3d94d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -33,7 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobACL; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/AMInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/AMInfo.java new file mode 100644 index 0000000000..1cd14ff312 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/AMInfo.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.api.records; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; + +public interface AMInfo { + public ApplicationAttemptId getAppAttemptId(); + public long getStartTime(); + public ContainerId getContainerId(); + public String getNodeManagerHost(); + public int getNodeManagerPort(); + public int getNodeManagerHttpPort(); + + public void setAppAttemptId(ApplicationAttemptId appAttemptId); + public void setStartTime(long startTime); + public void setContainerId(ContainerId containerId); + public void setNodeManagerHost(String nmHost); + public void setNodeManagerPort(int nmPort); + public void setNodeManagerHttpPort(int mnHttpPort); +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java index 87b77b7f80..469c425feb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.api.records; +import java.util.List; + public interface JobReport { public abstract JobId getJobId(); public abstract JobState getJobState(); @@ -25,6 +27,7 @@ public interface JobReport { public abstract float getReduceProgress(); public abstract float getCleanupProgress(); public abstract float getSetupProgress(); + public abstract long getSubmitTime(); public abstract long getStartTime(); public abstract long getFinishTime(); public abstract String getUser(); @@ -32,6 +35,7 @@ public interface JobReport { public abstract String getTrackingUrl(); public abstract String getDiagnostics(); public abstract String getJobFile(); + public abstract List getAMInfos(); public abstract void setJobId(JobId jobId); public abstract void setJobState(JobState jobState); @@ -39,6 +43,7 @@ public interface JobReport { public abstract void setReduceProgress(float progress); public abstract void setCleanupProgress(float progress); public abstract void setSetupProgress(float progress); + public abstract void setSubmitTime(long submitTime); public abstract void setStartTime(long startTime); public abstract void setFinishTime(long finishTime); public abstract void setUser(String user); @@ -46,4 +51,5 @@ public interface JobReport { public abstract void setTrackingUrl(String trackingUrl); public abstract void setDiagnostics(String diagnostics); public abstract void setJobFile(String jobFile); + public abstract void setAMInfos(List amInfos); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java index 4617258f32..bc0a4c6b4f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.api.records; +import org.apache.hadoop.yarn.api.records.ContainerId; + public interface TaskAttemptReport { public abstract TaskAttemptId getTaskAttemptId(); public abstract TaskAttemptState getTaskAttemptState(); @@ -32,6 +34,10 @@ public interface TaskAttemptReport { public abstract String getDiagnosticInfo(); public abstract String getStateString(); public abstract Phase getPhase(); + public abstract String getNodeManagerHost(); + public abstract int getNodeManagerPort(); + public abstract int getNodeManagerHttpPort(); + public abstract ContainerId getContainerId(); public abstract void setTaskAttemptId(TaskAttemptId taskAttemptId); public abstract void setTaskAttemptState(TaskAttemptState taskAttemptState); @@ -42,6 +48,10 @@ public interface TaskAttemptReport { public abstract void setDiagnosticInfo(String diagnosticInfo); public abstract void setStateString(String stateString); public abstract void setPhase(Phase phase); + public abstract void setNodeManagerHost(String nmHost); + public abstract void setNodeManagerPort(int nmPort); + public abstract void setNodeManagerHttpPort(int nmHttpPort); + public abstract void setContainerId(ContainerId containerId); /** * Set the shuffle finish time. Applicable only for reduce attempts diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/AMInfoPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/AMInfoPBImpl.java new file mode 100644 index 0000000000..325d9a8861 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/AMInfoPBImpl.java @@ -0,0 +1,201 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.api.records.impl.pb; + +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto; +import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProtoOrBuilder; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; + +public class AMInfoPBImpl extends ProtoBase implements AMInfo { + + AMInfoProto proto = AMInfoProto.getDefaultInstance(); + AMInfoProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationAttemptId appAttemptId; + private ContainerId containerId; + + public AMInfoPBImpl() { + builder = AMInfoProto.newBuilder(); + } + + public AMInfoPBImpl(AMInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized AMInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToBuilder() { + if (this.appAttemptId != null + && !((ApplicationAttemptIdPBImpl) this.appAttemptId).getProto().equals( + builder.getApplicationAttemptId())) { + builder.setApplicationAttemptId(convertToProtoFormat(this.appAttemptId)); + } + if (this.getContainerId() != null + && !((ContainerIdPBImpl) this.containerId).getProto().equals( + builder.getContainerId())) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + } + + private synchronized void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AMInfoProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized ApplicationAttemptId getAppAttemptId() { + AMInfoProtoOrBuilder p = viaProto ? proto : builder; + if (appAttemptId != null) { + return appAttemptId; + } // Else via proto + if (!p.hasApplicationAttemptId()) { + return null; + } + appAttemptId = convertFromProtoFormat(p.getApplicationAttemptId()); + return appAttemptId; + } + + @Override + public synchronized void setAppAttemptId(ApplicationAttemptId appAttemptId) { + maybeInitBuilder(); + if (appAttemptId == null) { + builder.clearApplicationAttemptId(); + } + this.appAttemptId = appAttemptId; + } + + @Override + public synchronized long getStartTime() { + AMInfoProtoOrBuilder p = viaProto ? proto : builder; + return (p.getStartTime()); + } + + @Override + public synchronized void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override + public synchronized ContainerId getContainerId() { + AMInfoProtoOrBuilder p = viaProto ? proto : builder; + if (containerId != null) { + return containerId; + } // Else via proto + if (!p.hasContainerId()) { + return null; + } + containerId = convertFromProtoFormat(p.getContainerId()); + return containerId; + } + + @Override + public synchronized void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + @Override + public synchronized String getNodeManagerHost() { + AMInfoProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeManagerHost()) { + return null; + } + return p.getNodeManagerHost(); + } + + @Override + public synchronized void setNodeManagerHost(String nmHost) { + maybeInitBuilder(); + if (nmHost == null) { + builder.clearNodeManagerHost(); + return; + } + builder.setNodeManagerHost(nmHost); + } + + @Override + public synchronized int getNodeManagerPort() { + AMInfoProtoOrBuilder p = viaProto ? proto : builder; + return (p.getNodeManagerPort()); + } + + @Override + public synchronized void setNodeManagerPort(int nmPort) { + maybeInitBuilder(); + builder.setNodeManagerPort(nmPort); + } + + @Override + public synchronized int getNodeManagerHttpPort() { + AMInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getNodeManagerHttpPort(); + } + + @Override + public synchronized void setNodeManagerHttpPort(int httpPort) { + maybeInitBuilder(); + builder.setNodeManagerHttpPort(httpPort); + } + + private ApplicationAttemptIdPBImpl convertFromProtoFormat( + ApplicationAttemptIdProto p) { + return new ApplicationAttemptIdPBImpl(p); + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private + ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) { + return ((ApplicationAttemptIdPBImpl) t).getProto(); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java index 2af50b6820..41e46c3391 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java @@ -19,9 +19,14 @@ package org.apache.hadoop.mapreduce.v2.api.records.impl.pb; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProto; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProto; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder; @@ -31,12 +36,14 @@ -public class JobReportPBImpl extends ProtoBase implements JobReport { +public class JobReportPBImpl extends ProtoBase implements + JobReport { JobReportProto proto = JobReportProto.getDefaultInstance(); JobReportProto.Builder builder = null; boolean viaProto = false; private JobId jobId = null; + private List amInfos = null; public JobReportPBImpl() { @@ -48,20 +55,23 @@ public JobReportPBImpl(JobReportProto proto) { viaProto = true; } - public JobReportProto getProto() { + public synchronized JobReportProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } - private void mergeLocalToBuilder() { + private synchronized void mergeLocalToBuilder() { if (this.jobId != null) { builder.setJobId(convertToProtoFormat(this.jobId)); } + if (this.amInfos != null) { + addAMInfosToProto(); + } } - private void mergeLocalToProto() { + private synchronized void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); mergeLocalToBuilder(); @@ -69,7 +79,7 @@ private void mergeLocalToProto() { viaProto = true; } - private void maybeInitBuilder() { + private synchronized void maybeInitBuilder() { if (viaProto || builder == null) { builder = JobReportProto.newBuilder(proto); } @@ -78,7 +88,7 @@ private void maybeInitBuilder() { @Override - public JobId getJobId() { + public synchronized JobId getJobId() { JobReportProtoOrBuilder p = viaProto ? proto : builder; if (this.jobId != null) { return this.jobId; @@ -91,14 +101,14 @@ public JobId getJobId() { } @Override - public void setJobId(JobId jobId) { + public synchronized void setJobId(JobId jobId) { maybeInitBuilder(); if (jobId == null) builder.clearJobId(); this.jobId = jobId; } @Override - public JobState getJobState() { + public synchronized JobState getJobState() { JobReportProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasJobState()) { return null; @@ -107,7 +117,7 @@ public JobState getJobState() { } @Override - public void setJobState(JobState jobState) { + public synchronized void setJobState(JobState jobState) { maybeInitBuilder(); if (jobState == null) { builder.clearJobState(); @@ -116,132 +126,197 @@ public void setJobState(JobState jobState) { builder.setJobState(convertToProtoFormat(jobState)); } @Override - public float getMapProgress() { + public synchronized float getMapProgress() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getMapProgress()); } @Override - public void setMapProgress(float mapProgress) { + public synchronized void setMapProgress(float mapProgress) { maybeInitBuilder(); builder.setMapProgress((mapProgress)); } @Override - public float getReduceProgress() { + public synchronized float getReduceProgress() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getReduceProgress()); } @Override - public void setReduceProgress(float reduceProgress) { + public synchronized void setReduceProgress(float reduceProgress) { maybeInitBuilder(); builder.setReduceProgress((reduceProgress)); } @Override - public float getCleanupProgress() { + public synchronized float getCleanupProgress() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getCleanupProgress()); } @Override - public void setCleanupProgress(float cleanupProgress) { + public synchronized void setCleanupProgress(float cleanupProgress) { maybeInitBuilder(); builder.setCleanupProgress((cleanupProgress)); } @Override - public float getSetupProgress() { + public synchronized float getSetupProgress() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getSetupProgress()); } @Override - public void setSetupProgress(float setupProgress) { + public synchronized void setSetupProgress(float setupProgress) { maybeInitBuilder(); builder.setSetupProgress((setupProgress)); } + @Override - public long getStartTime() { + public synchronized long getSubmitTime() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.getSubmitTime()); + } + + @Override + public synchronized void setSubmitTime(long submitTime) { + maybeInitBuilder(); + builder.setSubmitTime((submitTime)); + } + + @Override + public synchronized long getStartTime() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getStartTime()); } @Override - public void setStartTime(long startTime) { + public synchronized void setStartTime(long startTime) { maybeInitBuilder(); builder.setStartTime((startTime)); } @Override - public long getFinishTime() { + public synchronized long getFinishTime() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getFinishTime()); } @Override - public void setFinishTime(long finishTime) { + public synchronized void setFinishTime(long finishTime) { maybeInitBuilder(); builder.setFinishTime((finishTime)); } @Override - public String getUser() { + public synchronized String getUser() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getUser()); } @Override - public void setUser(String user) { + public synchronized void setUser(String user) { maybeInitBuilder(); builder.setUser((user)); } @Override - public String getJobName() { + public synchronized String getJobName() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getJobName()); } @Override - public void setJobName(String jobName) { + public synchronized void setJobName(String jobName) { maybeInitBuilder(); builder.setJobName((jobName)); } @Override - public String getTrackingUrl() { + public synchronized String getTrackingUrl() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return (p.getTrackingUrl()); } @Override - public void setTrackingUrl(String trackingUrl) { + public synchronized void setTrackingUrl(String trackingUrl) { maybeInitBuilder(); builder.setTrackingUrl(trackingUrl); } @Override - public String getDiagnostics() { + public synchronized String getDiagnostics() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return p.getDiagnostics(); } @Override - public void setDiagnostics(String diagnostics) { + public synchronized void setDiagnostics(String diagnostics) { maybeInitBuilder(); builder.setDiagnostics(diagnostics); } @Override - public String getJobFile() { + public synchronized String getJobFile() { JobReportProtoOrBuilder p = viaProto ? proto : builder; return p.getJobFile(); } @Override - public void setJobFile(String jobFile) { + public synchronized void setJobFile(String jobFile) { maybeInitBuilder(); builder.setJobFile(jobFile); } + @Override + public synchronized List getAMInfos() { + initAMInfos(); + return this.amInfos; + } + + @Override + public synchronized void setAMInfos(List amInfos) { + maybeInitBuilder(); + if (amInfos == null) { + this.builder.clearAmInfos(); + this.amInfos = null; + return; + } + initAMInfos(); + this.amInfos.clear(); + this.amInfos.addAll(amInfos); + } + + + private synchronized void initAMInfos() { + if (this.amInfos != null) { + return; + } + JobReportProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getAmInfosList(); + + this.amInfos = new ArrayList(); + + for (AMInfoProto amInfoProto : list) { + this.amInfos.add(convertFromProtoFormat(amInfoProto)); + } + } + + private synchronized void addAMInfosToProto() { + maybeInitBuilder(); + builder.clearAmInfos(); + if (this.amInfos == null) + return; + for (AMInfo amInfo : this.amInfos) { + builder.addAmInfos(convertToProtoFormat(amInfo)); + } + } + + private AMInfoPBImpl convertFromProtoFormat(AMInfoProto p) { + return new AMInfoPBImpl(p); + } + + private AMInfoProto convertToProtoFormat(AMInfo t) { + return ((AMInfoPBImpl)t).getProto(); + } + private JobIdPBImpl convertFromProtoFormat(JobIdProto p) { return new JobIdPBImpl(p); } @@ -257,7 +332,4 @@ private JobStateProto convertToProtoFormat(JobState e) { private JobState convertFromProtoFormat(JobStateProto e) { return MRProtoUtils.convertFromProtoFormat(e); } - - - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java index c52bf5a3c2..999d770292 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java @@ -31,7 +31,10 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptReportProtoOrBuilder; import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptStateProto; import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; @@ -42,6 +45,7 @@ public class TaskAttemptReportPBImpl extends ProtoBase i private TaskAttemptId taskAttemptId = null; private Counters counters = null; + private ContainerId containerId = null; public TaskAttemptReportPBImpl() { @@ -67,6 +71,9 @@ private void mergeLocalToBuilder() { if (this.counters != null) { builder.setCounters(convertToProtoFormat(this.counters)); } + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } } private void mergeLocalToProto() { @@ -255,7 +262,80 @@ public void setPhase(Phase phase) { } builder.setPhase(convertToProtoFormat(phase)); } + + @Override + public String getNodeManagerHost() { + TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeManagerHost()) { + return null; + } + return p.getNodeManagerHost(); + } + + @Override + public void setNodeManagerHost(String nmHost) { + maybeInitBuilder(); + if (nmHost == null) { + builder.clearNodeManagerHost(); + return; + } + builder.setNodeManagerHost(nmHost); + } + + @Override + public int getNodeManagerPort() { + TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.getNodeManagerPort()); + } + + @Override + public void setNodeManagerPort(int nmPort) { + maybeInitBuilder(); + builder.setNodeManagerPort(nmPort); + } + + @Override + public int getNodeManagerHttpPort() { + TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.getNodeManagerHttpPort()); + } + + @Override + public void setNodeManagerHttpPort(int nmHttpPort) { + maybeInitBuilder(); + builder.setNodeManagerHttpPort(nmHttpPort); + } + + @Override + public ContainerId getContainerId() { + TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder; + if (containerId != null) { + return containerId; + } // Else via proto + if (!p.hasContainerId()) { + return null; + } + containerId = convertFromProtoFormat(p.getContainerId()); + return containerId; + } + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + private CountersPBImpl convertFromProtoFormat(CountersProto p) { return new CountersPBImpl(p); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java index 543454c15a..109028205d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java @@ -18,13 +18,18 @@ package org.apache.hadoop.mapreduce.v2.util; +import java.util.List; + +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.Records; public class MRBuilderUtils { @@ -53,14 +58,15 @@ public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) { } public static JobReport newJobReport(JobId jobId, String jobName, - String userName, JobState state, long startTime, long finishTime, + String userName, JobState state, long submitTime, long startTime, long finishTime, float setupProgress, float mapProgress, float reduceProgress, - float cleanupProgress, String jobFile) { + float cleanupProgress, String jobFile, List amInfos) { JobReport report = Records.newRecord(JobReport.class); report.setJobId(jobId); report.setJobName(jobName); report.setUser(userName); report.setJobState(state); + report.setSubmitTime(submitTime); report.setStartTime(startTime); report.setFinishTime(finishTime); report.setSetupProgress(setupProgress); @@ -68,6 +74,20 @@ public static JobReport newJobReport(JobId jobId, String jobName, report.setMapProgress(mapProgress); report.setReduceProgress(reduceProgress); report.setJobFile(jobFile); + report.setAMInfos(amInfos); return report; } + + public static AMInfo newAMInfo(ApplicationAttemptId appAttemptId, + long startTime, ContainerId containerId, String nmHost, int nmPort, + int nmHttpPort) { + AMInfo amInfo = Records.newRecord(AMInfo.class); + amInfo.setAppAttemptId(appAttemptId); + amInfo.setStartTime(startTime); + amInfo.setContainerId(containerId); + amInfo.setNodeManagerHost(nmHost); + amInfo.setNodeManagerPort(nmPort); + amInfo.setNodeManagerHttpPort(nmHttpPort); + return amInfo; + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto index a4375c9e67..3390b7ad84 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto @@ -119,6 +119,10 @@ message TaskAttemptReportProto { optional PhaseProto phase = 9; optional int64 shuffle_finish_time = 10; optional int64 sort_finish_time=11; + optional string node_manager_host = 12; + optional int32 node_manager_port = 13; + optional int32 node_manager_http_port = 14; + optional ContainerIdProto container_id = 15; } enum JobStateProto { @@ -146,6 +150,17 @@ message JobReportProto { optional string trackingUrl = 11; optional string diagnostics = 12; optional string jobFile = 13; + repeated AMInfoProto am_infos = 14; + optional int64 submit_time = 15; +} + +message AMInfoProto { + optional ApplicationAttemptIdProto application_attempt_id = 1; + optional int64 start_time = 2; + optional ContainerIdProto container_id = 3; + optional string node_manager_host = 4; + optional int32 node_manager_port = 5; + optional int32 node_manager_http_port = 6; } enum TaskAttemptCompletionEventStatusProto { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index 2f083f5275..138e4332aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -34,6 +34,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-server-nodemanager + org.apache.hadoop hadoop-hdfs diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index 2a198b4955..460202167d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.util.ConfigUtil; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -212,7 +213,20 @@ public QueueInfo getQueue(String name) throws IOException, InterruptedException { return client.getQueue(name); } - + + /** + * Get log parameters for the specified jobID or taskAttemptID + * @param jobID the job id. + * @param taskAttemptID the task attempt id. Optional. + * @return the LogParams + * @throws IOException + * @throws InterruptedException + */ + public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException, InterruptedException { + return client.getLogFileParams(jobID, taskAttemptID); + } + /** * Get current cluster status. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java index 72a194ab0b..ad58807e1b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.authorize.AccessControlList; @@ -115,6 +116,8 @@ public interface ClientProtocol extends VersionedProtocol { * MAPREDUCE-2337. * Version 37: More efficient serialization format for framework counters * (MAPREDUCE-901) + * Version 38: Added getLogFilePath(JobID, TaskAttemptID) as part of + * MAPREDUCE-3146 */ public static final long versionID = 37L; @@ -351,4 +354,16 @@ public long renewDelegationToken(Token token public void cancelDelegationToken(Token token ) throws IOException, InterruptedException; + + /** + * Gets the location of the log file for a job if no taskAttemptId is + * specified, otherwise gets the log location for the taskAttemptId. + * @param jobID the jobId. + * @param taskAttemptID the taskAttemptId. + * @return log params. + * @throws IOException + * @throws InterruptedException + */ + public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException, InterruptedException; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index 882a4deab6..7b255289b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -42,9 +42,11 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper; /** * Interprets the map reduce cli options @@ -95,6 +97,7 @@ public int run(String[] argv) throws Exception { boolean killTask = false; boolean failTask = false; boolean setJobPriority = false; + boolean logs = false; if ("-submit".equals(cmd)) { if (argv.length != 2) { @@ -205,6 +208,19 @@ public int run(String[] argv) throws Exception { taskType = argv[2]; taskState = argv[3]; displayTasks = true; + } else if ("-logs".equals(cmd)) { + if (argv.length == 2 || argv.length ==3) { + logs = true; + jobid = argv[1]; + if (argv.length == 3) { + taskid = argv[2]; + } else { + taskid = null; + } + } else { + displayUsage(cmd); + return exitCode; + } } else { displayUsage(cmd); return exitCode; @@ -313,6 +329,22 @@ public int run(String[] argv) throws Exception { System.out.println("Could not fail task " + taskid); exitCode = -1; } + } else if (logs) { + try { + JobID jobID = JobID.forName(jobid); + TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); + LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); + LogDumper logDumper = new LogDumper(); + logDumper.setConf(getConf()); + logDumper.dumpAContainersLogs(logParams.getApplicationId(), + logParams.getContainerId(), logParams.getNodeId(), + logParams.getOwner()); + } catch (IOException e) { + if (e instanceof RemoteException) { + throw e; + } + System.out.println(e.getMessage()); + } } } catch (RemoteException re) { IOException unwrappedException = re.unwrapRemoteException(); @@ -380,6 +412,10 @@ private void displayUsage(String cmd) { " ]. " + "Valid values for are " + taskTypes + ". " + "Valid values for are " + taskStates); + } else if ("-logs".equals(cmd)) { + System.err.println(prefix + "[" + cmd + + " ]. " + + " is optional to get task attempt logs."); } else { System.err.printf(prefix + " \n"); System.err.printf("\t[-submit ]\n"); @@ -398,7 +434,8 @@ private void displayUsage(String cmd) { "Valid values for are " + taskTypes + ". " + "Valid values for are " + taskStates); System.err.printf("\t[-kill-task ]\n"); - System.err.printf("\t[-fail-task ]\n\n"); + System.err.printf("\t[-fail-task ]\n"); + System.err.printf("\t[-logs ]\n\n"); ToolRunner.printGenericCommandUsage(System.out); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/v2/LogParams.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/v2/LogParams.java new file mode 100644 index 0000000000..6d5bcc0197 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/v2/LogParams.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2; + +public class LogParams { + + private String containerId; + private String applicationId; + private String nodeId; + private String owner; + + public LogParams(String containerIdStr, String applicationIdStr, + String nodeIdStr, String owner) { + this.containerId = containerIdStr; + this.applicationId = applicationIdStr; + this.nodeId = nodeIdStr; + this.owner = owner; + } + + public String getContainerId() { + return containerId; + } + + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getOwner() { + return this.owner; + } + + public String setOwner(String owner) { + return this.owner; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index ca89080570..6d975e948b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -36,8 +36,8 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.YarnException; @@ -94,6 +95,7 @@ public CompletedJob(Configuration conf, JobId jobId, Path historyFile, JobReport.class); report.setJobId(jobId); report.setJobState(JobState.valueOf(jobInfo.getJobStatus())); + report.setSubmitTime(jobInfo.getSubmitTime()); report.setStartTime(jobInfo.getLaunchTime()); report.setFinishTime(jobInfo.getFinishTime()); report.setJobName(jobInfo.getJobname()); @@ -103,6 +105,7 @@ public CompletedJob(Configuration conf, JobId jobId, Path historyFile, report.setJobFile(confFile.toString()); report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter .toYarn(TypeConverter.fromYarn(jobId)).getAppId())); + report.setAMInfos(getAMInfos()); } @Override @@ -341,6 +344,17 @@ public Path getConfFile() { @Override public List getAMInfos() { - return jobInfo.getAMInfos(); + List amInfos = new LinkedList(); + for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo + .getAMInfos()) { + AMInfo amInfo = + MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(), + jhAmInfo.getStartTime(), jhAmInfo.getContainerId(), + jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(), + jhAmInfo.getNodeManagerHttpPort()); + + amInfos.add(amInfo); + } + return amInfos; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index 5481ed35ad..9f694849c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -79,6 +79,15 @@ public class CompletedTaskAttempt implements TaskAttempt { // report.setPhase(attemptInfo.get); //TODO report.setStateString(attemptInfo.getState()); report.setCounters(getCounters()); + report.setContainerId(attemptInfo.getContainerId()); + String []hostSplits = attemptInfo.getHostname().split(":"); + if (hostSplits.length != 2) { + report.setNodeManagerHost("UNKNOWN"); + } else { + report.setNodeManagerHost(hostSplits[0]); + report.setNodeManagerPort(Integer.parseInt(hostSplits[1])); + } + report.setNodeManagerHttpPort(attemptInfo.getHttpPort()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index efc0c2bab9..fc808e5a70 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobACL; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java index 05ec3bab62..c6e3b64f3e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.hadoop.mapreduce.JobACL; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 36f0e8dc11..0cf14abf56 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -112,11 +112,11 @@ public void testHistoryParsing() throws Exception { // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); - Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0) + Assert.assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0) .getNodeManagerHost()); AMInfo amInfo = jobInfo.getAMInfos().get(0); - Assert.assertEquals(2222, amInfo.getNodeManagerPort()); - Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort()); + Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId()); Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() .getApplicationAttemptId()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java index 2568ec8316..aceb562906 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java @@ -218,7 +218,8 @@ public void testLogsView2() throws IOException { params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1) .toString()); - params.put(NM_NODENAME, BuilderUtils.newNodeId("testhost", 2222).toString()); + params.put(NM_NODENAME, + BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(APP_OWNER, "owner"); @@ -229,7 +230,8 @@ public void testLogsView2() throws IOException { verify(spyPw).write( "Logs not available for container_10_0001_01_000001. Aggregation " + "may not be complete," - + " Check back later or try the nodemanager on testhost:2222"); + + " Check back later or try the nodemanager on " + + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 694ce81ee2..0bed43d71c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -23,6 +23,7 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -37,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; @@ -47,13 +49,17 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -68,6 +74,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.util.BuilderUtils; public class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); @@ -398,5 +405,52 @@ public boolean killJob(JobID oldJobID) return true; } + public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID) + throws YarnRemoteException, IOException { + org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = + TypeConverter.toYarn(oldJobID); + GetJobReportRequest request = + recordFactory.newRecordInstance(GetJobReportRequest.class); + request.setJobId(jobId); -} + JobReport report = + ((GetJobReportResponse) invoke("getJobReport", + GetJobReportRequest.class, request)).getJobReport(); + if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED, + JobState.ERROR).contains(report.getJobState())) { + if (oldTaskAttemptID != null) { + GetTaskAttemptReportRequest taRequest = + recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class); + taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID)); + TaskAttemptReport taReport = + ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport", + GetTaskAttemptReportRequest.class, taRequest)) + .getTaskAttemptReport(); + if (taReport.getContainerId() == null + || taReport.getNodeManagerHost() == null) { + throw new IOException("Unable to get log information for task: " + + oldTaskAttemptID); + } + return new LogParams( + taReport.getContainerId().toString(), + taReport.getContainerId().getApplicationAttemptId() + .getApplicationId().toString(), + BuilderUtils.newNodeId(taReport.getNodeManagerHost(), + taReport.getNodeManagerPort()).toString(), report.getUser()); + } else { + if (report.getAMInfos() == null || report.getAMInfos().size() == 0) { + throw new IOException("Unable to get log information for job: " + + oldJobID); + } + AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1); + return new LogParams( + amInfo.getContainerId().toString(), + amInfo.getAppAttemptId().getApplicationId().toString(), + BuilderUtils.newNodeId(amInfo.getNodeManagerHost(), + amInfo.getNodeManagerPort()).toString(), report.getUser()); + } + } else { + throw new IOException("Cannot get log path for a in-progress job"); + } + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 0864363820..03fc8836d8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.Credentials; @@ -504,4 +505,10 @@ public ProtocolSignature getProtocolSignature(String protocol, return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } + + @Override + public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException { + return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index eb300cd37b..c388759a4b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -168,7 +168,7 @@ public void testReconnectOnAMRestart() throws IOException { GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class); when(jobReportResponse1.getJobReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user", - JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything")); + JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null)); // First AM returns a report with jobName firstGen and simulates AM shutdown // on second invocation. @@ -180,7 +180,7 @@ public void testReconnectOnAMRestart() throws IOException { GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class); when(jobReportResponse2.getJobReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user", - JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything")); + JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null)); // Second AM generation returns a report with jobName secondGen MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java index 29c41404d5..d65a198c20 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java @@ -20,13 +20,13 @@ import java.io.File; import java.io.IOException; +import java.util.List; import junit.framework.Assert; import org.apache.avro.AvroRemoteException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.FailingMapper; import org.apache.hadoop.SleepJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,8 +35,20 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.junit.Before; import org.junit.After; import org.junit.Test; @@ -105,6 +117,8 @@ public void testJobHistoryData() throws IOException, InterruptedException, return; } + + SleepJob sleepJob = new SleepJob(); sleepJob.setConf(mrCluster.getConfig()); // Job with 3 maps and 2 reduces @@ -113,7 +127,8 @@ public void testJobHistoryData() throws IOException, InterruptedException, job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.waitForCompletion(true); Counters counterMR = job.getCounters(); - ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId(); + JobId jobId = TypeConverter.toYarn(job.getJobID()); + ApplicationId appID = jobId.getAppId(); while (true) { Thread.sleep(1000); if (mrCluster.getResourceManager().getRMContext().getRMApps() @@ -126,6 +141,36 @@ public void testJobHistoryData() throws IOException, InterruptedException, LOG.info("CounterHS " + counterHS); LOG.info("CounterMR " + counterMR); Assert.assertEquals(counterHS, counterMR); + + MRClientProtocol historyClient = instantiateHistoryProxy(); + GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class); + gjReq.setJobId(jobId); + JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport(); + verifyJobReport(jobReport, jobId); } + private void verifyJobReport(JobReport jobReport, JobId jobId) { + List amInfos = jobReport.getAMInfos(); + Assert.assertEquals(1, amInfos.size()); + AMInfo amInfo = amInfos.get(0); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(jobId.getAppId(), 1); + ContainerId amContainerId = BuilderUtils.newContainerId(appAttemptId, 1); + Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId()); + Assert.assertEquals(amContainerId, amInfo.getContainerId()); + Assert.assertTrue(jobReport.getSubmitTime() > 0); + Assert.assertTrue(jobReport.getStartTime() > 0 + && jobReport.getStartTime() >= jobReport.getSubmitTime()); + Assert.assertTrue(jobReport.getFinishTime() > 0 + && jobReport.getFinishTime() >= jobReport.getStartTime()); + } + + private MRClientProtocol instantiateHistoryProxy() { + final String serviceAddr = + mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS); + final YarnRPC rpc = YarnRPC.create(conf); + MRClientProtocol historyClient = + (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, + NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig()); + return historyClient; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index 3af36a468e..e7bfe72f4c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -126,6 +126,14 @@ private static ApplicationAttemptId toApplicationAttemptId( return appAttemptId; } + private static ApplicationId toApplicationId( + Iterator it) throws NumberFormatException { + ApplicationId appId = Records.newRecord(ApplicationId.class); + appId.setClusterTimestamp(Long.parseLong(it.next())); + appId.setId(Integer.parseInt(it.next())); + return appId; + } + public static String toString(ContainerId cId) { return cId.toString(); } @@ -178,4 +186,18 @@ public static ApplicationAttemptId toApplicationAttemptId( } } + public static ApplicationId toApplicationId( + String appIdStr) { + Iterator it = _split(appIdStr).iterator(); + if (!it.next().equals(APPLICATION_PREFIX)) { + throw new IllegalArgumentException("Invalid ApplicationId prefix: " + + appIdStr); + } + try { + return toApplicationId(it); + } catch (NumberFormatException n) { + throw new IllegalArgumentException("Invalid AppAttemptId: " + + appIdStr, n); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 028b8c96bc..528cae63d1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -140,7 +140,7 @@ public synchronized void stop() { } super.stop(); } - + /** * Constructs the full filename for an application's log file per node. * @param remoteRootLogDir diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java index f906b95902..f5e5640c52 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java @@ -50,6 +50,7 @@ public class LogDumper extends Configured implements Tool { private static final String CONTAINER_ID_OPTION = "containerId"; private static final String APPLICATION_ID_OPTION = "applicationId"; private static final String NODE_ADDRESS_OPTION = "nodeAddress"; + private static final String APP_OWNER_OPTION = "appOwner"; @Override public int run(String[] args) throws Exception { @@ -58,6 +59,7 @@ public int run(String[] args) throws Exception { opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId"); opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId"); opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress"); + opts.addOption(APP_OWNER_OPTION, true, "AppOwner"); if (args.length < 1) { HelpFormatter formatter = new HelpFormatter(); @@ -69,11 +71,13 @@ public int run(String[] args) throws Exception { String appIdStr = null; String containerIdStr = null; String nodeAddress = null; + String appOwner = null; try { CommandLine commandLine = parser.parse(opts, args, true); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); containerIdStr = commandLine.getOptionValue(CONTAINER_ID_OPTION); nodeAddress = commandLine.getOptionValue(NODE_ADDRESS_OPTION); + appOwner = commandLine.getOptionValue(APP_OWNER_OPTION); } catch (ParseException e) { System.out.println("options parsing failed: " + e.getMessage()); @@ -96,8 +100,11 @@ public int run(String[] args) throws Exception { DataOutputStream out = new DataOutputStream(System.out); + if (appOwner == null || appOwner.isEmpty()) { + appOwner = UserGroupInformation.getCurrentUser().getShortUserName(); + } if (containerIdStr == null && nodeAddress == null) { - dumpAllContainersLogs(appId, out); + dumpAllContainersLogs(appId, appOwner, out); } else if ((containerIdStr == null && nodeAddress != null) || (containerIdStr != null && nodeAddress == null)) { System.out.println("ContainerId or NodeAddress cannot be null!"); @@ -113,7 +120,7 @@ public int run(String[] args) throws Exception { LogAggregationService.getRemoteNodeLogFileForApp( remoteRootLogDir, appId, - UserGroupInformation.getCurrentUser().getShortUserName(), + appOwner, ConverterUtils.toNodeId(nodeAddress), getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX))); @@ -123,6 +130,21 @@ public int run(String[] args) throws Exception { return 0; } + public void dumpAContainersLogs(String appId, String containerId, + String nodeId, String jobOwner) throws IOException { + Path remoteRootLogDir = + new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + String suffix = LogAggregationService.getRemoteNodeLogDirSuffix(getConf()); + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(), + LogAggregationService.getRemoteNodeLogFileForApp(remoteRootLogDir, + ConverterUtils.toApplicationId(appId), jobOwner, + ConverterUtils.toNodeId(nodeId), suffix)); + DataOutputStream out = new DataOutputStream(System.out); + dumpAContainerLogs(containerId, reader, out); + } + private int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, DataOutputStream out) throws IOException { @@ -152,13 +174,12 @@ private int dumpAContainerLogs(String containerIdStr, return 0; } - private void - dumpAllContainersLogs(ApplicationId appId, DataOutputStream out) - throws IOException { + private void dumpAllContainersLogs(ApplicationId appId, String appOwner, + DataOutputStream out) throws IOException { Path remoteRootLogDir = new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - String user = UserGroupInformation.getCurrentUser().getShortUserName(); + String user = appOwner; String logDirSuffix = getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java index 870d02f5f8..091bf35a0b 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java @@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.mapreduce.util.MRAsyncDiskService; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; @@ -4829,6 +4830,13 @@ public long renewDelegationToken(Token token return secretManager.renewToken(token, user); } + @Override + public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID, + org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Not supported by JobTracker"); + } + JobACLsManager getJobACLsManager() { return aclsManager.getJobACLsManager(); } diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java index 4d05e40617..6d75228d05 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.State; @@ -812,4 +813,11 @@ public long renewDelegationToken(Token token ) throws IOException,InterruptedException{ return 0; } + + @Override + public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID, + org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Not supported"); + } }