YARN-5243. fix several rebase and other miscellaneous issues before merge. (Sangjin Lee via Varun Saxena)
This commit is contained in:
parent
32b033d57c
commit
1ff6833bba
@ -137,7 +137,7 @@ public class JobHistoryEventHandler extends AbstractService
|
||||
protected volatile boolean forceJobCompletion = false;
|
||||
|
||||
protected TimelineClient timelineClient;
|
||||
|
||||
|
||||
private boolean timelineServiceV2Enabled = false;
|
||||
|
||||
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
|
||||
@ -263,16 +263,16 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
||||
|
||||
// TODO replace MR specific configurations on timeline service with getting
|
||||
// configuration from RM through registerApplicationMaster() in
|
||||
// ApplicationMasterProtocol with return value for timeline service
|
||||
// TODO replace MR specific configurations on timeline service with getting
|
||||
// configuration from RM through registerApplicationMaster() in
|
||||
// ApplicationMasterProtocol with return value for timeline service
|
||||
// configuration status: off, on_with_v1 or on_with_v2.
|
||||
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
|
||||
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
|
||||
LOG.info("Emitting job history data to the timeline service is enabled");
|
||||
if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
||||
|
||||
timelineClient =
|
||||
timelineClient =
|
||||
((MRAppMaster.RunningAppContext)context).getTimelineClient();
|
||||
timelineClient.init(conf);
|
||||
timelineServiceV2Enabled =
|
||||
@ -1062,11 +1062,11 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
|
||||
+ "Server", ex);
|
||||
}
|
||||
}
|
||||
|
||||
// create JobEntity from HistoryEvent with adding other info, like:
|
||||
|
||||
// create JobEntity from HistoryEvent with adding other info, like:
|
||||
// jobId, timestamp and entityType.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
|
||||
String entityType, boolean setCreatedTime) {
|
||||
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
@ -1093,16 +1093,16 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
|
||||
return entity;
|
||||
}
|
||||
|
||||
// create BaseEntity from HistoryEvent with adding other info, like:
|
||||
// create BaseEntity from HistoryEvent with adding other info, like:
|
||||
// timestamp and entityType.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createBaseEntity(HistoryEvent event, long timestamp, String entityType,
|
||||
boolean setCreatedTime) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent =
|
||||
event.toTimelineEvent();
|
||||
tEvent.setTimestamp(timestamp);
|
||||
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
||||
entity.addEvent(tEvent);
|
||||
entity.setType(entityType);
|
||||
@ -1115,10 +1115,10 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
|
||||
}
|
||||
return entity;
|
||||
}
|
||||
|
||||
// create TaskEntity from HistoryEvent with adding other info, like:
|
||||
|
||||
// create TaskEntity from HistoryEvent with adding other info, like:
|
||||
// taskId, jobId, timestamp, entityType and relatedJobEntity.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
|
||||
String entityType, String relatedJobEntity, JobId jobId,
|
||||
boolean setCreatedTime) {
|
||||
@ -1128,12 +1128,12 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
|
||||
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
|
||||
return entity;
|
||||
}
|
||||
|
||||
// create TaskAttemptEntity from HistoryEvent with adding other info, like:
|
||||
|
||||
// create TaskAttemptEntity from HistoryEvent with adding other info, like:
|
||||
// timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createTaskAttemptEntity(HistoryEvent event, long timestamp,
|
||||
String taskAttemptId, String entityType, String relatedTaskEntity,
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createTaskAttemptEntity(HistoryEvent event, long timestamp,
|
||||
String taskAttemptId, String entityType, String relatedTaskEntity,
|
||||
String taskId, boolean setCreatedTime) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
||||
|
@ -822,8 +822,9 @@ private List<Container> getResources() throws Exception {
|
||||
|
||||
handleUpdatedNodes(response);
|
||||
handleJobPriorityChange(response);
|
||||
// handle receiving the timeline collector address for this app
|
||||
String collectorAddr = response.getCollectorAddr();
|
||||
MRAppMaster.RunningAppContext appContext =
|
||||
MRAppMaster.RunningAppContext appContext =
|
||||
(MRAppMaster.RunningAppContext)this.getContext();
|
||||
if (collectorAddr != null && !collectorAddr.isEmpty()
|
||||
&& appContext.getTimelineClient() != null) {
|
||||
|
@ -418,7 +418,7 @@ public TimelineEvent toTimelineEvent() {
|
||||
public Set<TimelineMetric> getTimelineMetrics() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ public long getSubmitTime() {
|
||||
public EventType getEventType() {
|
||||
return EventType.AM_STARTED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -41,7 +41,7 @@ public interface HistoryEvent {
|
||||
|
||||
/** Set the Avro datum wrapped by this. */
|
||||
void setDatum(Object datum);
|
||||
|
||||
|
||||
/** Map HistoryEvent to TimelineEvent */
|
||||
TimelineEvent toTimelineEvent();
|
||||
|
||||
|
@ -139,7 +139,7 @@ public Counters getMapCounters() {
|
||||
public Counters getReduceCounters() {
|
||||
return reduceCounters;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -66,7 +66,7 @@ public void setDatum(Object datum) {
|
||||
public EventType getEventType() {
|
||||
return EventType.JOB_INFO_CHANGED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -77,7 +77,7 @@ public EventType getEventType() {
|
||||
}
|
||||
/** Get whether the job's map and reduce stages were combined */
|
||||
public boolean getUberized() { return datum.getUberized(); }
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -66,7 +66,7 @@ public JobPriority getPriority() {
|
||||
public EventType getEventType() {
|
||||
return EventType.JOB_PRIORITY_CHANGED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -64,7 +64,7 @@ public String getJobQueueName() {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -62,7 +62,7 @@ public void setDatum(Object datum) {
|
||||
public EventType getEventType() {
|
||||
return EventType.JOB_STATUS_CHANGED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -259,7 +259,7 @@ public TimelineEvent toTimelineEvent() {
|
||||
tEvent.addInfo("WORKFLOW_ADJACENCIES",
|
||||
getWorkflowAdjacencies());
|
||||
tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags());
|
||||
|
||||
|
||||
return tEvent;
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ public String getDiagnostics() {
|
||||
final CharSequence diagnostics = datum.getDiagnostics();
|
||||
return diagnostics == null ? NODIAGS : diagnostics.toString();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -247,5 +247,5 @@ public Set<TimelineMetric> getTimelineMetrics() {
|
||||
.countersToTimelineMetric(getCounters(), finishTime);
|
||||
return metrics;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ public Object getDatum() {
|
||||
public void setDatum(Object datum) {
|
||||
throw new UnsupportedOperationException("Not a seriable object");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -228,7 +228,7 @@ public int[] getVMemKbytes() {
|
||||
public int[] getPhysMemKbytes() {
|
||||
return physMemKbytes;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -141,7 +141,7 @@ public EventType getEventType() {
|
||||
? EventType.MAP_ATTEMPT_FINISHED
|
||||
: EventType.REDUCE_ATTEMPT_FINISHED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -136,7 +136,7 @@ public String getAvataar() {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -251,7 +251,7 @@ public int[] getVMemKbytes() {
|
||||
public int[] getPhysMemKbytes() {
|
||||
return physMemKbytes;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -142,7 +142,7 @@ public TaskAttemptID getFailedAttemptID() {
|
||||
public EventType getEventType() {
|
||||
return EventType.TASK_FAILED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -130,7 +130,7 @@ public TimelineEvent toTimelineEvent() {
|
||||
tEvent.addInfo("FINISH_TIME", getFinishTime());
|
||||
tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
|
||||
tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
|
||||
getSuccessfulTaskAttemptId() == null ? "" :
|
||||
getSuccessfulTaskAttemptId() == null ? "" :
|
||||
getSuccessfulTaskAttemptId().toString());
|
||||
return tEvent;
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ public TaskType getTaskType() {
|
||||
public EventType getEventType() {
|
||||
return EventType.TASK_STARTED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -62,7 +62,7 @@ public TaskID getTaskId() {
|
||||
public EventType getEventType() {
|
||||
return EventType.TASK_UPDATED;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TimelineEvent toTimelineEvent() {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
|
@ -65,7 +65,7 @@ public class TestMRTimelineEventHandling {
|
||||
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestMRTimelineEventHandling.class);
|
||||
|
||||
|
||||
@Test
|
||||
public void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
@ -168,7 +168,7 @@ public void testMRTimelineEventHandling() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
LOG.info("testMRNewTimelineServiceEventHandling start.");
|
||||
@ -184,7 +184,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
|
||||
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
|
||||
+ ".class", PerNodeTimelineCollectorsAuxService.class.getName());
|
||||
|
||||
|
||||
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
|
||||
|
||||
MiniMRYarnCluster cluster = null;
|
||||
@ -215,9 +215,9 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
YarnClient yarnClient = YarnClient.createYarnClient();
|
||||
yarnClient.init(new Configuration(cluster.getConfig()));
|
||||
yarnClient.start();
|
||||
EnumSet<YarnApplicationState> appStates =
|
||||
EnumSet<YarnApplicationState> appStates =
|
||||
EnumSet.allOf(YarnApplicationState.class);
|
||||
|
||||
|
||||
ApplicationId firstAppId = null;
|
||||
List<ApplicationReport> apps = yarnClient.getApplications(appStates);
|
||||
Assert.assertEquals(apps.size(), 1);
|
||||
@ -230,7 +230,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
|
||||
Assert.assertEquals(JobStatus.FAILED,
|
||||
job.getJobStatus().getState().getValue());
|
||||
|
||||
|
||||
apps = yarnClient.getApplications(appStates);
|
||||
Assert.assertEquals(apps.size(), 2);
|
||||
|
||||
@ -250,10 +250,10 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
if(testRootFolder.isDirectory()) {
|
||||
FileUtils.deleteDirectory(testRootFolder);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void checkNewTimelineEvent(ApplicationId appId,
|
||||
ApplicationReport appReport) throws IOException {
|
||||
String tmpRoot =
|
||||
@ -261,7 +261,7 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
||||
+ "/entities/";
|
||||
|
||||
File tmpRootFolder = new File(tmpRoot);
|
||||
|
||||
|
||||
Assert.assertTrue(tmpRootFolder.isDirectory());
|
||||
String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
|
||||
"/" + UserGroupInformation.getCurrentUser().getShortUserName() +
|
||||
@ -319,7 +319,7 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
||||
Assert.assertTrue("Task output directory: " + outputDirTask +
|
||||
" does not exist.",
|
||||
taskFolder.isDirectory());
|
||||
|
||||
|
||||
String taskEventFileName = appId.toString().replaceAll("application", "task")
|
||||
+ "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
|
||||
@ -330,15 +330,15 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
||||
taskEventFile.exists());
|
||||
verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
|
||||
true, false, null);
|
||||
|
||||
|
||||
// check for task attempt event file
|
||||
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
|
||||
File taskAttemptFolder = new File(outputDirTaskAttempt);
|
||||
Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
|
||||
Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
|
||||
" does not exist.", taskAttemptFolder.isDirectory());
|
||||
|
||||
|
||||
String taskAttemptEventFileName = appId.toString().replaceAll(
|
||||
"application", "attempt") + "_m_000000_0" +
|
||||
"application", "attempt") + "_m_000000_0" +
|
||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
|
||||
String taskAttemptEventFilePath = outputDirTaskAttempt +
|
||||
|
@ -93,7 +93,7 @@ public MapredTestDriver(ProgramDriver pgd) {
|
||||
pgd.addClass("timelineperformance", TimelineServicePerformance.class,
|
||||
"A job that launches mappers to test timline service " +
|
||||
"performance.");
|
||||
pgd.addClass("nnbench", NNBench.class,
|
||||
pgd.addClass("nnbench", NNBench.class,
|
||||
"A benchmark that stresses the namenode w/ MR.");
|
||||
pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class,
|
||||
"A benchmark that stresses the namenode w/o MR.");
|
||||
|
@ -552,9 +552,4 @@
|
||||
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
|
||||
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
|
||||
</Match>
|
||||
<!-- Object cast is based on the event type -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
|
||||
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
@ -631,7 +631,7 @@ public void run() throws YarnException, IOException, InterruptedException {
|
||||
DSEvent.DS_APP_ATTEMPT_START);
|
||||
} else {
|
||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
||||
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ private void setupInternal(int numNodeManager, float timelineVersion)
|
||||
// Enable ContainersMonitorImpl
|
||||
conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
|
||||
LinuxResourceCalculatorPlugin.class.getName());
|
||||
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
|
||||
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
|
||||
ProcfsBasedProcessTree.class.getName());
|
||||
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
|
||||
|
@ -495,8 +495,8 @@ protected void putObjects(
|
||||
}
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
String msg = "Response from the timeline server is " +
|
||||
((resp == null) ? "null":
|
||||
String msg = "Response from the timeline server is " +
|
||||
((resp == null) ? "null":
|
||||
"not successful," + " HTTP error code: " + resp.getStatus()
|
||||
+ ", Server response:\n" + resp.getEntity(String.class));
|
||||
LOG.error(msg);
|
||||
|
@ -784,7 +784,7 @@
|
||||
|
||||
<property>
|
||||
<description>The setting that controls whether yarn system metrics is
|
||||
published to the Timeline server (version one) or not, by RM.
|
||||
published to the Timeline server (version one) or not, by RM.
|
||||
This configuration is now deprecated in favor of
|
||||
yarn.system-metrics-publisher.enabled.</description>
|
||||
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
|
||||
|
@ -27,49 +27,49 @@
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestTimelineServiceHelper {
|
||||
|
||||
|
||||
@Test
|
||||
public void testMapCastToHashMap() {
|
||||
|
||||
// Test null map be casted to null
|
||||
Map<String, String> nullMap = null;
|
||||
Assert.assertNull(TimelineServiceHelper.mapCastToHashMap(nullMap));
|
||||
|
||||
|
||||
// Test empty hashmap be casted to a empty hashmap
|
||||
Map<String, String> emptyHashMap = new HashMap<String, String>();
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(emptyHashMap).size(), 0);
|
||||
|
||||
|
||||
// Test empty non-hashmap be casted to a empty hashmap
|
||||
Map<String, String> emptyTreeMap = new TreeMap<String, String>();
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(emptyTreeMap).size(), 0);
|
||||
|
||||
|
||||
// Test non-empty hashmap be casted to hashmap correctly
|
||||
Map<String, String> firstHashMap = new HashMap<String, String>();
|
||||
String key = "KEY";
|
||||
String value = "VALUE";
|
||||
firstHashMap.put(key, value);
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(firstHashMap), firstHashMap);
|
||||
|
||||
|
||||
// Test non-empty non-hashmap is casted correctly.
|
||||
Map<String, String> firstTreeMap = new TreeMap<String, String>();
|
||||
firstTreeMap.put(key, value);
|
||||
HashMap<String, String> alternateHashMap =
|
||||
HashMap<String, String> alternateHashMap =
|
||||
TimelineServiceHelper.mapCastToHashMap(firstTreeMap);
|
||||
Assert.assertEquals(firstTreeMap.size(), alternateHashMap.size());
|
||||
Assert.assertEquals(alternateHashMap.get(key), value);
|
||||
|
||||
|
||||
// Test complicated hashmap be casted correctly
|
||||
Map<String, Set<String>> complicatedHashMap = new HashMap<String, Set<String>>();
|
||||
Set<String> hashSet = new HashSet<String>();
|
||||
hashSet.add(value);
|
||||
complicatedHashMap.put(key, hashSet);
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(complicatedHashMap),
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(complicatedHashMap),
|
||||
complicatedHashMap);
|
||||
|
||||
|
||||
// Test complicated non-hashmap get casted correctly
|
||||
Map<String, Set<String>> complicatedTreeMap = new TreeMap<String, Set<String>>();
|
||||
complicatedTreeMap.put(key, hashSet);
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(complicatedTreeMap).get(key),
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(complicatedTreeMap).get(key),
|
||||
hashSet);
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||
private List<LogAggregationReport> logAggregationReportsForApps = null;
|
||||
|
||||
Map<ApplicationId, String> registeredCollectors = null;
|
||||
|
||||
|
||||
public NodeHeartbeatRequestPBImpl() {
|
||||
builder = NodeHeartbeatRequestProto.newBuilder();
|
||||
}
|
||||
@ -157,7 +157,7 @@ private LogAggregationReportProto convertToProtoFormat(
|
||||
LogAggregationReport value) {
|
||||
return ((LogAggregationReportPBImpl) value).getProto();
|
||||
}
|
||||
|
||||
|
||||
private void addRegisteredCollectorsToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearRegisteredCollectors();
|
||||
|
@ -102,7 +102,7 @@ interface QueuingContext {
|
||||
NMStateStoreService getNMStateStore();
|
||||
|
||||
boolean getDecommissioned();
|
||||
|
||||
|
||||
Configuration getConf();
|
||||
|
||||
void setDecommissioned(boolean isDecommissioned);
|
||||
|
@ -471,9 +471,9 @@ public void run() {
|
||||
public static class NMContext implements Context {
|
||||
|
||||
private NodeId nodeId = null;
|
||||
|
||||
|
||||
private Configuration conf = null;
|
||||
|
||||
|
||||
protected final ConcurrentMap<ApplicationId, Application> applications =
|
||||
new ConcurrentHashMap<ApplicationId, Application>();
|
||||
|
||||
@ -551,7 +551,7 @@ public int getHttpPort() {
|
||||
public ConcurrentMap<ApplicationId, Application> getApplications() {
|
||||
return this.applications;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
@ -684,7 +684,12 @@ public Map<ApplicationId, String> getRegisteredCollectors() {
|
||||
|
||||
public void addRegisteredCollectors(
|
||||
Map<ApplicationId, String> newRegisteredCollectors) {
|
||||
this.registeredCollectors.putAll(newRegisteredCollectors);
|
||||
if (registeredCollectors != null) {
|
||||
this.registeredCollectors.putAll(newRegisteredCollectors);
|
||||
} else {
|
||||
LOG.warn("collectors are added when the registered collectors are " +
|
||||
"initialized");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -184,7 +184,7 @@ public String getUser() {
|
||||
public ApplicationId getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ApplicationState getApplicationState() {
|
||||
this.readLock.lock();
|
||||
|
@ -243,7 +243,6 @@ protected void serviceStop() throws Exception {
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@ -422,7 +421,6 @@ public void run() {
|
||||
.entrySet()) {
|
||||
ContainerId containerId = entry.getKey();
|
||||
ProcessTreeInfo ptInfo = entry.getValue();
|
||||
|
||||
try {
|
||||
String pId = ptInfo.getPID();
|
||||
|
||||
@ -478,7 +476,7 @@ public void run() {
|
||||
|
||||
float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
|
||||
resourceCalculatorPlugin.getNumProcessors();
|
||||
|
||||
|
||||
// Multiply by 1000 to avoid losing data when converting to int
|
||||
int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
|
||||
* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
|
||||
@ -514,7 +512,7 @@ public void run() {
|
||||
containerMetricsUnregisterDelayMs).recordCpuUsage
|
||||
((int)cpuUsagePercentPerCore, milliVcoresUsed);
|
||||
}
|
||||
|
||||
|
||||
boolean isMemoryOverLimit = false;
|
||||
String msg = "";
|
||||
int containerExitStatus = ContainerExitStatus.INVALID;
|
||||
|
@ -111,7 +111,6 @@ public boolean isPmemCheckEnabled() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
|
||||
@ -175,7 +174,6 @@ public boolean isPmemCheckEnabled() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
|
||||
|
@ -177,7 +177,7 @@
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
@ -30,10 +30,8 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
@ -95,8 +93,6 @@ public class RMActiveServiceContext {
|
||||
private NodesListManager nodesListManager;
|
||||
private ResourceTrackerService resourceTrackerService;
|
||||
private ApplicationMasterService applicationMasterService;
|
||||
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
||||
private SystemMetricsPublisher systemMetricsPublisher;
|
||||
private RMTimelineCollectorManager timelineCollectorManager;
|
||||
|
||||
private RMNodeLabelsManager nodeLabelManager;
|
||||
@ -373,12 +369,6 @@ public boolean isWorkPreservingRecoveryEnabled() {
|
||||
return this.isWorkPreservingRecoveryEnabled;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
|
||||
return rmApplicationHistoryWriter;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
|
||||
@ -392,26 +382,6 @@ public void setRMTimelineCollectorManager(
|
||||
this.timelineCollectorManager = collectorManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setSystemMetricsPublisher(
|
||||
SystemMetricsPublisher metricsPublisher) {
|
||||
this.systemMetricsPublisher = metricsPublisher;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public SystemMetricsPublisher getSystemMetricsPublisher() {
|
||||
return systemMetricsPublisher;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setRMApplicationHistoryWriter(
|
||||
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
|
||||
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public long getEpoch() {
|
||||
|
@ -113,10 +113,10 @@ void setRMApplicationHistoryWriter(
|
||||
void setSystemMetricsPublisher(SystemMetricsPublisher systemMetricsPublisher);
|
||||
|
||||
SystemMetricsPublisher getSystemMetricsPublisher();
|
||||
|
||||
|
||||
void setRMTimelineCollectorManager(
|
||||
RMTimelineCollectorManager timelineCollectorManager);
|
||||
|
||||
|
||||
RMTimelineCollectorManager getRMTimelineCollectorManager();
|
||||
|
||||
ConfigurationProvider getConfigurationProvider();
|
||||
|
@ -380,7 +380,7 @@ public void setRMTimelineCollectorManager(
|
||||
public RMTimelineCollectorManager getRMTimelineCollectorManager() {
|
||||
return activeServiceContext.getRMTimelineCollectorManager();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setSystemMetricsPublisher(
|
||||
SystemMetricsPublisher metricsPublisher) {
|
||||
|
@ -22,11 +22,11 @@
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -123,15 +123,6 @@
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
@ -312,6 +303,15 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
addService(rmApplicationHistoryWriter);
|
||||
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
|
||||
|
||||
// initialize the RM timeline collector first so that the system metrics
|
||||
// publisher can bind to it
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
|
||||
RMTimelineCollectorManager timelineCollectorManager =
|
||||
createRMTimelineCollectorManager();
|
||||
addService(timelineCollectorManager);
|
||||
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
|
||||
}
|
||||
|
||||
SystemMetricsPublisher systemMetricsPublisher =
|
||||
createSystemMetricsPublisher();
|
||||
addIfService(systemMetricsPublisher);
|
||||
@ -602,18 +602,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
||||
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
|
||||
}
|
||||
|
||||
RMApplicationHistoryWriter rmApplicationHistoryWriter =
|
||||
createRMApplicationHistoryWriter();
|
||||
addService(rmApplicationHistoryWriter);
|
||||
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
|
||||
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
|
||||
RMTimelineCollectorManager timelineCollectorManager =
|
||||
createRMTimelineCollectorManager();
|
||||
addService(timelineCollectorManager);
|
||||
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
|
||||
}
|
||||
|
||||
// Register event handler for NodesListManager
|
||||
nodesListManager = new NodesListManager(rmContext);
|
||||
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
|
||||
|
@ -177,19 +177,25 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
String getTrackingUrl();
|
||||
|
||||
/**
|
||||
* The collector address for the application.
|
||||
* @return the address for the application's collector.
|
||||
* The collector address for the application. It should be used only if the
|
||||
* timeline service v.2 is enabled.
|
||||
*
|
||||
* @return the address for the application's collector, or null if the
|
||||
* timeline service v.2 is not enabled.
|
||||
*/
|
||||
String getCollectorAddr();
|
||||
|
||||
/**
|
||||
* Set collector address for the application
|
||||
* Set collector address for the application. It should be used only if the
|
||||
* timeline service v.2 is enabled.
|
||||
*
|
||||
* @param collectorAddr the address of collector
|
||||
*/
|
||||
void setCollectorAddr(String collectorAddr);
|
||||
|
||||
/**
|
||||
* Remove collector address when application is finished or killed.
|
||||
* Remove collector address when application is finished or killed. It should
|
||||
* be used only if the timeline service v.2 is enabled.
|
||||
*/
|
||||
void removeCollectorAddr();
|
||||
|
||||
|
@ -526,6 +526,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the application level timeline collector for this app. This should
|
||||
* be used only if the timeline service v.2 is enabled.
|
||||
*/
|
||||
public void startTimelineCollector() {
|
||||
AppLevelTimelineCollector collector =
|
||||
new AppLevelTimelineCollector(applicationId);
|
||||
@ -533,6 +537,10 @@ public void startTimelineCollector() {
|
||||
applicationId, collector);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the application level timeline collector for this app. This should be
|
||||
* used only if the timeline service v.2 is enabled.
|
||||
*/
|
||||
public void stopTimelineCollector() {
|
||||
rmContext.getRMTimelineCollectorManager().remove(applicationId);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user