Made a number of miscellaneous fixes for javac, javadoc, and checstyle warnings.
This commit is contained in:
parent
c5dbde0cc4
commit
6cf6ab7b78
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.TaskStatus;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobCounter;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
@ -76,11 +75,8 @@
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.codehaus.jackson.JsonNode;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.node.ArrayNode;
|
||||
import org.codehaus.jackson.node.JsonNodeFactory;
|
||||
import org.codehaus.jackson.node.ObjectNode;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
|
@ -786,7 +786,8 @@ public TestParams() {
|
||||
public TestParams(boolean isLastAMRetry) {
|
||||
this(AppContext.class, isLastAMRetry);
|
||||
}
|
||||
public TestParams(Class<? extends AppContext> contextClass, boolean isLastAMRetry) {
|
||||
public TestParams(Class<? extends AppContext> contextClass,
|
||||
boolean isLastAMRetry) {
|
||||
this.isLastAMRetry = isLastAMRetry;
|
||||
mockAppContext = mockAppContext(contextClass, appId, this.isLastAMRetry);
|
||||
}
|
||||
|
@ -42,9 +42,17 @@ public interface HistoryEvent {
|
||||
/** Set the Avro datum wrapped by this. */
|
||||
void setDatum(Object datum);
|
||||
|
||||
/** Map HistoryEvent to TimelineEvent */
|
||||
/**
|
||||
* Map HistoryEvent to TimelineEvent.
|
||||
*
|
||||
* @return the timeline event
|
||||
*/
|
||||
TimelineEvent toTimelineEvent();
|
||||
|
||||
/** Counters or Metrics if any else return null. */
|
||||
/**
|
||||
* Counters or Metrics if any else return null.
|
||||
*
|
||||
* @return the set of timeline metrics
|
||||
*/
|
||||
Set<TimelineMetric> getTimelineMetrics();
|
||||
}
|
||||
|
@ -169,6 +169,7 @@ public void testMRTimelineEventHandling() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
LOG.info("testMRNewTimelineServiceEventHandling start.");
|
||||
@ -245,7 +246,8 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||
}
|
||||
// Cleanup test file
|
||||
String testRoot =
|
||||
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
|
||||
FileSystemTimelineWriterImpl.
|
||||
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
|
||||
File testRootFolder = new File(testRoot);
|
||||
if(testRootFolder.isDirectory()) {
|
||||
FileUtils.deleteDirectory(testRootFolder);
|
||||
@ -320,8 +322,10 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
||||
" does not exist.",
|
||||
taskFolder.isDirectory());
|
||||
|
||||
String taskEventFileName = appId.toString().replaceAll("application", "task")
|
||||
+ "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
String taskEventFileName =
|
||||
appId.toString().replaceAll("application", "task") +
|
||||
"_m_000000" +
|
||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
|
||||
String taskEventFilePath = outputDirTask + taskEventFileName;
|
||||
File taskEventFile = new File(taskEventFilePath);
|
||||
@ -372,10 +376,12 @@ private void verifyEntity(File entityFile, String eventId,
|
||||
reader = new BufferedReader(new FileReader(entityFile));
|
||||
while ((strLine = reader.readLine()) != null) {
|
||||
if (strLine.trim().length() > 0) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity =
|
||||
FileSystemTimelineReaderImpl.getTimelineRecordFromJSON(
|
||||
strLine.trim(),
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntity.class);
|
||||
if (eventId == null) {
|
||||
// Job metrics are published without any events for
|
||||
// ApplicationEntity. There is also possibility that some other
|
||||
|
@ -621,8 +621,8 @@ static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
|
||||
return job;
|
||||
}
|
||||
|
||||
public static void waitForAppFinished(RunningJob job, MiniMRYarnCluster cluster)
|
||||
throws IOException {
|
||||
public static void waitForAppFinished(RunningJob job,
|
||||
MiniMRYarnCluster cluster) throws IOException {
|
||||
ApplicationId appId = ApplicationId.newInstance(
|
||||
Long.parseLong(job.getID().getJtIdentifier()), job.getID().getId());
|
||||
ConcurrentMap<ApplicationId, RMApp> rmApps =
|
||||
|
@ -33,7 +33,8 @@
|
||||
* to the timeline service.
|
||||
*/
|
||||
abstract class EntityWriterV2
|
||||
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
||||
extends org.apache.hadoop.mapreduce.Mapper
|
||||
<IntWritable, IntWritable, Writable, Writable> {
|
||||
@Override
|
||||
public void map(IntWritable key, IntWritable val, Context context)
|
||||
throws IOException {
|
||||
|
@ -89,8 +89,8 @@ protected void writeEntities(Configuration tlConf,
|
||||
parser.parseHistoryFile(job.getJobHistoryFilePath());
|
||||
Configuration jobConf =
|
||||
parser.parseConfiguration(job.getJobConfFilePath());
|
||||
LOG.info("parsed the job history file and the configuration file for job"
|
||||
+ jobIdStr);
|
||||
LOG.info("parsed the job history file and the configuration file " +
|
||||
"for job " + jobIdStr);
|
||||
|
||||
// set the context
|
||||
// flow id: job name, flow run id: timestamp, user id
|
||||
|
@ -32,7 +32,7 @@ interface SimpleEntityWriterConstants {
|
||||
|
||||
/**
|
||||
* To ensure that the compression really gets exercised, generate a
|
||||
* random alphanumeric fixed length payload
|
||||
* random alphanumeric fixed length payload.
|
||||
*/
|
||||
char[] ALPHA_NUMS = new char[] {'a', 'b', 'c', 'd', 'e', 'f',
|
||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
|
||||
|
@ -39,11 +39,13 @@
|
||||
* configuration.
|
||||
*/
|
||||
class SimpleEntityWriterV1
|
||||
extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable>
|
||||
extends org.apache.hadoop.mapreduce.Mapper
|
||||
<IntWritable, IntWritable, Writable, Writable>
|
||||
implements SimpleEntityWriterConstants {
|
||||
private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
|
||||
|
||||
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
|
||||
public void map(IntWritable key, IntWritable val, Context context)
|
||||
throws IOException {
|
||||
TimelineClient tlc = new TimelineClientImpl();
|
||||
Configuration conf = context.getConfiguration();
|
||||
|
||||
|
@ -90,7 +90,8 @@ private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
|
||||
return job;
|
||||
}
|
||||
|
||||
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
|
||||
private Set<TimelineEntity>
|
||||
createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
|
||||
Set<TimelineEntity> entities = new HashSet<>();
|
||||
Map<TaskID, TaskInfo> taskInfoMap = jobInfo.getAllTasks();
|
||||
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
|
||||
@ -135,7 +136,8 @@ private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
|
||||
return taskAttempts;
|
||||
}
|
||||
|
||||
private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
|
||||
private TimelineEntity
|
||||
createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
|
||||
TimelineEntity taskAttempt = new TimelineEntity();
|
||||
taskAttempt.setEntityType(TASK_ATTEMPT);
|
||||
taskAttempt.setEntityId(taskAttemptInfo.getAttemptId().toString());
|
||||
|
@ -27,11 +27,6 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
|
@ -137,7 +137,8 @@ public static boolean parseArgs(String[] args, Job job) throws IOException {
|
||||
default:
|
||||
// use the current timestamp as the "run id" of the test: this will
|
||||
// be used as simulating the cluster timestamp for apps
|
||||
conf.setLong(SimpleEntityWriterConstants.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
||||
conf.setLong(
|
||||
SimpleEntityWriterConstants.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
||||
System.currentTimeMillis());
|
||||
switch (timeline_service_version) {
|
||||
case TIMELINE_SERVICE_VERSION_2:
|
||||
|
@ -181,7 +181,8 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
}
|
||||
if (enableTimelineAuxService) {
|
||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, TIMELINE_AUX_SERVICE_NAME });
|
||||
new String[] {ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
TIMELINE_AUX_SERVICE_NAME});
|
||||
} else {
|
||||
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||
new String[] {ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID});
|
||||
|
@ -24,4 +24,3 @@
|
||||
package org.apache.hadoop.yarn.api.records.timelineservice;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
|
@ -500,7 +500,8 @@ public static boolean isAclEnabled(Configuration conf) {
|
||||
|
||||
/**
|
||||
* The setting that controls whether yarn system metrics is published on the
|
||||
* timeline server or not by RM and NM. This configuration setting is for ATS V2
|
||||
* timeline server or not by RM and NM. This configuration setting is for
|
||||
* ATS v2.
|
||||
*/
|
||||
public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX
|
||||
+ "system-metrics-publisher.enabled";
|
||||
|
@ -36,6 +36,8 @@ private TimelineServiceHelper() {
|
||||
/**
|
||||
* Cast map to HashMap for generic type.
|
||||
* @param originalMap the map need to be casted
|
||||
* @param <E> key type
|
||||
* @param <V> value type
|
||||
* @return casted HashMap object
|
||||
*/
|
||||
public static <E, V> HashMap<E, V> mapCastToHashMap(
|
||||
|
@ -102,7 +102,6 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
||||
@ -1348,8 +1347,10 @@ Thread createLaunchContainerThread(Container allocatedContainer,
|
||||
|
||||
private void publishContainerStartEventOnTimelineServiceV2(
|
||||
Container container) {
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntity();
|
||||
entity.setId(container.getId().toString());
|
||||
entity.setType(DSEntity.DS_CONTAINER.toString());
|
||||
long ts = System.currentTimeMillis();
|
||||
@ -1381,8 +1382,10 @@ public TimelinePutResponse run() throws Exception {
|
||||
|
||||
private void publishContainerEndEventOnTimelineServiceV2(
|
||||
final ContainerStatus container) {
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntity();
|
||||
entity.setId(container.getContainerId().toString());
|
||||
entity.setType(DSEntity.DS_CONTAINER.toString());
|
||||
//entity.setDomainId(domainId);
|
||||
@ -1412,8 +1415,10 @@ public TimelinePutResponse run() throws Exception {
|
||||
|
||||
private void publishApplicationAttemptEventOnTimelineServiceV2(
|
||||
DSEvent appEvent) {
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntity();
|
||||
entity.setId(appAttemptID.toString());
|
||||
entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
|
||||
long ts = System.currentTimeMillis();
|
||||
|
@ -153,7 +153,8 @@ private void setupInternal(int numNodeManager, float timelineVersion)
|
||||
ProcfsBasedProcessTree.class.getName());
|
||||
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
|
||||
conf.setBoolean(
|
||||
YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
|
||||
true);
|
||||
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
|
||||
true);
|
||||
@ -180,11 +181,13 @@ private void setupInternal(int numNodeManager, float timelineVersion)
|
||||
// disable v1 timeline server since we no longer have a server here
|
||||
// enable aux-service based timeline aggregators
|
||||
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
|
||||
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
|
||||
+ ".class", PerNodeTimelineCollectorsAuxService.class.getName());
|
||||
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." +
|
||||
TIMELINE_AUX_SERVICE_NAME + ".class",
|
||||
PerNodeTimelineCollectorsAuxService.class.getName());
|
||||
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
FileSystemTimelineWriterImpl.class,
|
||||
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter.class);
|
||||
org.apache.hadoop.yarn.server.timelineservice.storage.
|
||||
TimelineWriter.class);
|
||||
} else {
|
||||
Assert.fail("Wrong timeline version number: " + timelineVersion);
|
||||
}
|
||||
@ -395,7 +398,8 @@ public void run() {
|
||||
}
|
||||
|
||||
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
|
||||
&& appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
|
||||
&& appReport.getFinalApplicationStatus() !=
|
||||
FinalApplicationStatus.UNDEFINED) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -545,7 +549,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
|
||||
if (numOfContainerFinishedOccurences > 0) {
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(500l);
|
||||
Thread.sleep(500L);
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(
|
||||
@ -577,7 +581,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
|
||||
if (numOfStringOccurences > 0) {
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(500l);
|
||||
Thread.sleep(500L);
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(
|
||||
@ -631,9 +635,10 @@ private long getNumOfStringOccurences(File entityFile, String searchString)
|
||||
try {
|
||||
reader = new BufferedReader(new FileReader(entityFile));
|
||||
while ((strLine = reader.readLine()) != null) {
|
||||
if (strLine.trim().contains(searchString))
|
||||
if (strLine.trim().contains(searchString)) {
|
||||
actualCount++;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
|
@ -30,9 +30,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
|
@ -463,15 +463,15 @@ public NMTokenCache getNMTokenCache() {
|
||||
|
||||
/**
|
||||
* Register TimelineClient to AMRMClient.
|
||||
* @param timelineClient
|
||||
* @param client the timeline client to register
|
||||
*/
|
||||
public void registerTimelineClient(TimelineClient timelineClient) {
|
||||
this.timelineClient = timelineClient;
|
||||
public void registerTimelineClient(TimelineClient client) {
|
||||
this.timelineClient = client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get registered timeline client.
|
||||
* @return
|
||||
* @return the registered timeline client
|
||||
*/
|
||||
public TimelineClient getRegisteredTimeineClient() {
|
||||
return this.timelineClient;
|
||||
@ -481,7 +481,7 @@ public TimelineClient getRegisteredTimeineClient() {
|
||||
* Wait for <code>check</code> to return true for each 1000 ms.
|
||||
* See also {@link #waitFor(com.google.common.base.Supplier, int)}
|
||||
* and {@link #waitFor(com.google.common.base.Supplier, int, int)}
|
||||
* @param check
|
||||
* @param check the condition for which it should wait
|
||||
*/
|
||||
public void waitFor(Supplier<Boolean> check) throws InterruptedException {
|
||||
waitFor(check, 1000);
|
||||
|
@ -304,7 +304,7 @@ public void registerTimelineClient(TimelineClient timelineClient) {
|
||||
|
||||
/**
|
||||
* Get registered timeline client.
|
||||
* @return
|
||||
* @return the registered timeline client
|
||||
*/
|
||||
public TimelineClient getRegisteredTimeineClient() {
|
||||
return client.getRegisteredTimeineClient();
|
||||
@ -325,7 +325,7 @@ public abstract void updateBlacklist(List<String> blacklistAdditions,
|
||||
* Wait for <code>check</code> to return true for each 1000 ms.
|
||||
* See also {@link #waitFor(com.google.common.base.Supplier, int)}
|
||||
* and {@link #waitFor(com.google.common.base.Supplier, int, int)}
|
||||
* @param check
|
||||
* @param check the condition for which it should wait
|
||||
*/
|
||||
public void waitFor(Supplier<Boolean> check) throws InterruptedException {
|
||||
waitFor(check, 1000);
|
||||
|
@ -30,8 +30,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
@ -55,10 +53,12 @@ public abstract class TimelineClient extends AbstractService implements
|
||||
* construct and initialize a timeline client if the following operations are
|
||||
* supposed to be conducted by that user.
|
||||
*/
|
||||
protected ApplicationId contextAppId;
|
||||
private ApplicationId contextAppId;
|
||||
|
||||
/**
|
||||
* Creates an instance of the timeline v.1.x client.
|
||||
*
|
||||
* @return the created timeline client instance
|
||||
*/
|
||||
@Public
|
||||
public static TimelineClient createTimelineClient() {
|
||||
@ -68,6 +68,10 @@ public static TimelineClient createTimelineClient() {
|
||||
|
||||
/**
|
||||
* Creates an instance of the timeline v.2 client.
|
||||
*
|
||||
* @param appId the application id with which the timeline client is
|
||||
* associated
|
||||
* @return the created timeline client instance
|
||||
*/
|
||||
@Public
|
||||
public static TimelineClient createTimelineClient(ApplicationId appId) {
|
||||
@ -91,8 +95,8 @@ protected TimelineClient(String name, ApplicationId appId) {
|
||||
* @param entities
|
||||
* the collection of {@link TimelineEntity}
|
||||
* @return the error information if the sent entities are not correctly stored
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
* @throws IOException if there are I/O errors
|
||||
* @throws YarnException if entities are incomplete/invalid
|
||||
*/
|
||||
@Public
|
||||
public abstract TimelinePutResponse putEntities(
|
||||
@ -112,8 +116,8 @@ public abstract TimelinePutResponse putEntities(
|
||||
* @param entities
|
||||
* the collection of {@link TimelineEntity}
|
||||
* @return the error information if the sent entities are not correctly stored
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
* @throws IOException if there are I/O errors
|
||||
* @throws YarnException if entities are incomplete/invalid
|
||||
*/
|
||||
@Public
|
||||
public abstract TimelinePutResponse putEntities(
|
||||
@ -212,15 +216,15 @@ public abstract void cancelDelegationToken(
|
||||
* for a non-v.2 timeline client instance, a YarnException is thrown.
|
||||
* </p>
|
||||
*
|
||||
* @param entities
|
||||
* the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||
* @param entities the collection of {@link
|
||||
* org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Public
|
||||
public abstract void putEntities(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
|
||||
throws IOException, YarnException;
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@ -230,15 +234,15 @@ public abstract void putEntities(
|
||||
* non-v.2 timeline client instance, a YarnException is thrown.
|
||||
* </p>
|
||||
*
|
||||
* @param entities
|
||||
* the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||
* @param entities the collection of {@link
|
||||
* org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Public
|
||||
public abstract void putEntitiesAsync(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
|
||||
throws IOException, YarnException;
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -391,8 +391,8 @@ public TimelinePutResponse putEntities(
|
||||
|
||||
@Override
|
||||
public void putEntities(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException {
|
||||
if (!timelineServiceV2) {
|
||||
throw new YarnException("v.2 method is invoked on a v.1.x client");
|
||||
}
|
||||
@ -401,8 +401,8 @@ public void putEntities(
|
||||
|
||||
@Override
|
||||
public void putEntitiesAsync(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException {
|
||||
if (!timelineServiceV2) {
|
||||
throw new YarnException("v.2 method is invoked on a v.1.x client");
|
||||
}
|
||||
@ -494,7 +494,8 @@ protected void putObjects(
|
||||
throw new IOException(re);
|
||||
}
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
resp.getStatusInfo().getStatusCode() !=
|
||||
ClientResponse.Status.OK.getStatusCode()) {
|
||||
String msg = "Response from the timeline server is " +
|
||||
((resp == null) ? "null":
|
||||
"not successful," + " HTTP error code: " + resp.getStatus()
|
||||
@ -530,7 +531,8 @@ public Token<TimelineDelegationTokenIdentifier> run()
|
||||
// TODO we should add retry logic here if timelineServiceAddress is
|
||||
// not available immediately.
|
||||
return (Token) authUrl.getDelegationToken(
|
||||
constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(),
|
||||
constructResURI(getConfig(),
|
||||
getTimelineServiceAddress(), false).toURL(),
|
||||
token, renewer, doAsUser);
|
||||
}
|
||||
};
|
||||
@ -911,17 +913,21 @@ public boolean shouldRetryOn(Exception e) {
|
||||
}
|
||||
|
||||
private final class EntitiesHolder extends FutureTask<Void> {
|
||||
private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities;
|
||||
private final
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
entities;
|
||||
private final boolean isSync;
|
||||
|
||||
EntitiesHolder(
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities,
|
||||
final
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
entities,
|
||||
final boolean isSync) {
|
||||
super(new Callable<Void>() {
|
||||
// publishEntities()
|
||||
public Void call() throws Exception {
|
||||
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||
params.add("appid", contextAppId.toString());
|
||||
params.add("appid", getContextAppId().toString());
|
||||
params.add("async", Boolean.toString(!isSync));
|
||||
putObjects("entities", params, entities);
|
||||
return null;
|
||||
@ -935,7 +941,8 @@ public boolean isSync() {
|
||||
return isSync;
|
||||
}
|
||||
|
||||
public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() {
|
||||
public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
getEntities() {
|
||||
return entities;
|
||||
}
|
||||
}
|
||||
@ -947,7 +954,7 @@ public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEn
|
||||
private class TimelineEntityDispatcher {
|
||||
/**
|
||||
* Time period for which the timelineclient will wait for draining after
|
||||
* stop
|
||||
* stop.
|
||||
*/
|
||||
private static final long DRAIN_TIME_PERIOD = 2000L;
|
||||
|
||||
@ -1063,17 +1070,20 @@ private void publishWithoutBlockingOnQueue(
|
||||
}
|
||||
|
||||
public void dispatchEntities(boolean sync,
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished)
|
||||
throws YarnException {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
|
||||
entitiesTobePublished) throws YarnException {
|
||||
if (executor.isShutdown()) {
|
||||
throw new YarnException("Timeline client is in the process of stopping,"
|
||||
+ " not accepting any more TimelineEntities");
|
||||
}
|
||||
|
||||
// wrap all TimelineEntity into TimelineEntities object
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
|
||||
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
entities =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntities();
|
||||
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity : entitiesTobePublished) {
|
||||
entities.addEntity(entity);
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,8 @@ public void testTimelineEntities() throws Exception {
|
||||
}
|
||||
entity.addMetric(metric2);
|
||||
|
||||
TimelineMetric metric3 = new TimelineMetric(TimelineMetric.Type.SINGLE_VALUE);
|
||||
TimelineMetric metric3 =
|
||||
new TimelineMetric(TimelineMetric.Type.SINGLE_VALUE);
|
||||
metric3.setId("test metric id 1");
|
||||
metric3.addValue(4L, (short) 4);
|
||||
Assert.assertEquals("metric3 should equal to metric2! ", metric3, metric2);
|
||||
@ -212,18 +213,22 @@ public void testFirstClassCitizenEntities() throws Exception {
|
||||
ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(0, 1), 1), 1).toString());
|
||||
|
||||
cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
|
||||
cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(),
|
||||
flow1.getId());
|
||||
flow1
|
||||
.setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId());
|
||||
flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
|
||||
flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
|
||||
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
|
||||
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId());
|
||||
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
app1.getId());
|
||||
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
app2.getId());
|
||||
app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
|
||||
app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
|
||||
appAttempt.getId());
|
||||
appAttempt
|
||||
.setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
|
||||
.setParent(TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
app1.getId());
|
||||
app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
|
||||
appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(),
|
||||
container.getId());
|
||||
|
@ -43,7 +43,7 @@ public class TestTimelineClientV2Impl {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestTimelineClientV2Impl.class);
|
||||
private TestV2TimelineClient client;
|
||||
private static long TIME_TO_SLEEP = 150;
|
||||
private static final long TIME_TO_SLEEP = 150L;
|
||||
private static final String EXCEPTION_MSG = "Exception in the content";
|
||||
|
||||
@Before
|
||||
@ -62,12 +62,12 @@ public void setup() {
|
||||
public TestName currTestName = new TestName();
|
||||
private YarnConfiguration conf;
|
||||
|
||||
private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
|
||||
private TestV2TimelineClient createTimelineClient(YarnConfiguration config) {
|
||||
ApplicationId id = ApplicationId.newInstance(0, 0);
|
||||
TestV2TimelineClient client = new TestV2TimelineClient(id);
|
||||
client.init(conf);
|
||||
client.start();
|
||||
return client;
|
||||
TestV2TimelineClient tc = new TestV2TimelineClient(id);
|
||||
tc.init(config);
|
||||
tc.start();
|
||||
return tc;
|
||||
}
|
||||
|
||||
private class TestV2TimelineClientForExceptionHandling
|
||||
@ -76,12 +76,16 @@ public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
protected boolean throwYarnException;
|
||||
private boolean throwYarnException;
|
||||
|
||||
public void setThrowYarnException(boolean throwYarnException) {
|
||||
this.throwYarnException = throwYarnException;
|
||||
}
|
||||
|
||||
public boolean isThrowYarnException() {
|
||||
return throwYarnException;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void putObjects(URI base, String path,
|
||||
MultivaluedMap<String, String> params, Object obj)
|
||||
@ -123,7 +127,7 @@ public TestV2TimelineClient(ApplicationId id) {
|
||||
protected void putObjects(String path,
|
||||
MultivaluedMap<String, String> params, Object obj)
|
||||
throws IOException, YarnException {
|
||||
if (throwYarnException) {
|
||||
if (isThrowYarnException()) {
|
||||
throw new YarnException("ActualException");
|
||||
}
|
||||
publishedEntities.add((TimelineEntities) obj);
|
||||
@ -139,17 +143,17 @@ protected void putObjects(String path,
|
||||
|
||||
@Test
|
||||
public void testExceptionMultipleRetry() {
|
||||
TestV2TimelineClientForExceptionHandling client =
|
||||
TestV2TimelineClientForExceptionHandling c =
|
||||
new TestV2TimelineClientForExceptionHandling(
|
||||
ApplicationId.newInstance(0, 0));
|
||||
int maxRetries = 2;
|
||||
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
maxRetries);
|
||||
client.init(conf);
|
||||
client.start();
|
||||
client.setTimelineServiceAddress("localhost:12345");
|
||||
c.init(conf);
|
||||
c.start();
|
||||
c.setTimelineServiceAddress("localhost:12345");
|
||||
try {
|
||||
client.putEntities(new TimelineEntity());
|
||||
c.putEntities(new TimelineEntity());
|
||||
} catch (IOException e) {
|
||||
Assert.fail("YARN exception is expected");
|
||||
} catch (YarnException e) {
|
||||
@ -161,9 +165,9 @@ public void testExceptionMultipleRetry() {
|
||||
"TimelineClient has reached to max retry times : " + maxRetries));
|
||||
}
|
||||
|
||||
client.setThrowYarnException(true);
|
||||
c.setThrowYarnException(true);
|
||||
try {
|
||||
client.putEntities(new TimelineEntity());
|
||||
c.putEntities(new TimelineEntity());
|
||||
} catch (IOException e) {
|
||||
Assert.fail("YARN exception is expected");
|
||||
} catch (YarnException e) {
|
||||
@ -173,7 +177,7 @@ public void testExceptionMultipleRetry() {
|
||||
Assert.assertTrue("YARN exception is expected",
|
||||
cause.getMessage().contains(EXCEPTION_MSG));
|
||||
}
|
||||
client.stop();
|
||||
c.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -348,7 +352,6 @@ private void printReceivedEntities() {
|
||||
for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
|
||||
TimelineEntities publishedEntities = client.getPublishedEntities(i);
|
||||
StringBuilder entitiesPerPublish = new StringBuilder();
|
||||
;
|
||||
for (TimelineEntity entity : publishedEntities.getEntities()) {
|
||||
entitiesPerPublish.append(entity.getId());
|
||||
entitiesPerPublish.append(",");
|
||||
|
@ -37,18 +37,21 @@ public void testMapCastToHashMap() {
|
||||
|
||||
// Test empty hashmap be casted to a empty hashmap
|
||||
Map<String, String> emptyHashMap = new HashMap<String, String>();
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(emptyHashMap).size(), 0);
|
||||
Assert.assertEquals(
|
||||
TimelineServiceHelper.mapCastToHashMap(emptyHashMap).size(), 0);
|
||||
|
||||
// Test empty non-hashmap be casted to a empty hashmap
|
||||
Map<String, String> emptyTreeMap = new TreeMap<String, String>();
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(emptyTreeMap).size(), 0);
|
||||
Assert.assertEquals(
|
||||
TimelineServiceHelper.mapCastToHashMap(emptyTreeMap).size(), 0);
|
||||
|
||||
// Test non-empty hashmap be casted to hashmap correctly
|
||||
Map<String, String> firstHashMap = new HashMap<String, String>();
|
||||
String key = "KEY";
|
||||
String value = "VALUE";
|
||||
firstHashMap.put(key, value);
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(firstHashMap), firstHashMap);
|
||||
Assert.assertEquals(
|
||||
TimelineServiceHelper.mapCastToHashMap(firstHashMap), firstHashMap);
|
||||
|
||||
// Test non-empty non-hashmap is casted correctly.
|
||||
Map<String, String> firstTreeMap = new TreeMap<String, String>();
|
||||
@ -59,17 +62,21 @@ public void testMapCastToHashMap() {
|
||||
Assert.assertEquals(alternateHashMap.get(key), value);
|
||||
|
||||
// Test complicated hashmap be casted correctly
|
||||
Map<String, Set<String>> complicatedHashMap = new HashMap<String, Set<String>>();
|
||||
Map<String, Set<String>> complicatedHashMap =
|
||||
new HashMap<String, Set<String>>();
|
||||
Set<String> hashSet = new HashSet<String>();
|
||||
hashSet.add(value);
|
||||
complicatedHashMap.put(key, hashSet);
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(complicatedHashMap),
|
||||
Assert.assertEquals(
|
||||
TimelineServiceHelper.mapCastToHashMap(complicatedHashMap),
|
||||
complicatedHashMap);
|
||||
|
||||
// Test complicated non-hashmap get casted correctly
|
||||
Map<String, Set<String>> complicatedTreeMap = new TreeMap<String, Set<String>>();
|
||||
Map<String, Set<String>> complicatedTreeMap =
|
||||
new TreeMap<String, Set<String>>();
|
||||
complicatedTreeMap.put(key, hashSet);
|
||||
Assert.assertEquals(TimelineServiceHelper.mapCastToHashMap(complicatedTreeMap).get(key),
|
||||
Assert.assertEquals(
|
||||
TimelineServiceHelper.mapCastToHashMap(complicatedTreeMap).get(key),
|
||||
hashSet);
|
||||
}
|
||||
|
||||
|
@ -48,9 +48,9 @@ public interface CollectorNodemanagerProtocol {
|
||||
*
|
||||
* @param request the request of registering a new collector or a list of
|
||||
* collectors
|
||||
* @return
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @return the response for registering the new collector
|
||||
* @throws YarnException if the request is invalid
|
||||
* @throws IOException if there are I/O errors
|
||||
*/
|
||||
ReportNewCollectorInfoResponse reportNewCollectorInfo(
|
||||
ReportNewCollectorInfoRequest request)
|
||||
@ -63,9 +63,9 @@ ReportNewCollectorInfoResponse reportNewCollectorInfo(
|
||||
* </p>
|
||||
* @param request the request of getting the aggregator context information of
|
||||
* the given application
|
||||
* @return
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @return the response for registering the new collector
|
||||
* @throws YarnException if the request is invalid
|
||||
* @throws IOException if there are I/O errors
|
||||
*/
|
||||
GetTimelineCollectorContextResponse getTimelineCollectorContext(
|
||||
GetTimelineCollectorContextRequest request)
|
||||
|
@ -25,7 +25,8 @@
|
||||
@Private
|
||||
@Unstable
|
||||
@ProtocolInfo(
|
||||
protocolName = "org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB",
|
||||
protocolName =
|
||||
"org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB",
|
||||
protocolVersion = 1)
|
||||
public interface CollectorNodemanagerProtocolPB extends
|
||||
CollectorNodemanagerProtocolService.BlockingInterface {
|
||||
|
@ -53,7 +53,7 @@ public class CollectorNodemanagerProtocolPBClientImpl implements
|
||||
+ "rpc.nm-command-timeout";
|
||||
|
||||
/**
|
||||
* Maximum of 1 minute timeout for a Node to react to the command
|
||||
* Maximum of 1 minute timeout for a Node to react to the command.
|
||||
*/
|
||||
static final int DEFAULT_COMMAND_TIMEOUT = 60000;
|
||||
|
||||
|
@ -41,7 +41,8 @@ public class CollectorNodemanagerProtocolPBServiceImpl implements
|
||||
|
||||
private CollectorNodemanagerProtocol real;
|
||||
|
||||
public CollectorNodemanagerProtocolPBServiceImpl(CollectorNodemanagerProtocol impl) {
|
||||
public CollectorNodemanagerProtocolPBServiceImpl(
|
||||
CollectorNodemanagerProtocol impl) {
|
||||
this.real = impl;
|
||||
}
|
||||
|
||||
@ -52,7 +53,8 @@ public ReportNewCollectorInfoResponseProto reportNewCollectorInfo(
|
||||
ReportNewCollectorInfoRequestPBImpl request =
|
||||
new ReportNewCollectorInfoRequestPBImpl(proto);
|
||||
try {
|
||||
ReportNewCollectorInfoResponse response = real.reportNewCollectorInfo(request);
|
||||
ReportNewCollectorInfoResponse response =
|
||||
real.reportNewCollectorInfo(request);
|
||||
return ((ReportNewCollectorInfoResponsePBImpl)response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
|
@ -29,10 +29,10 @@
|
||||
public class GetTimelineCollectorContextRequestPBImpl extends
|
||||
GetTimelineCollectorContextRequest {
|
||||
|
||||
GetTimelineCollectorContextRequestProto
|
||||
private GetTimelineCollectorContextRequestProto
|
||||
proto = GetTimelineCollectorContextRequestProto.getDefaultInstance();
|
||||
GetTimelineCollectorContextRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
private GetTimelineCollectorContextRequestProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
private ApplicationId appId = null;
|
||||
|
||||
@ -60,8 +60,9 @@ public int hashCode() {
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
@ -80,8 +81,9 @@ private void mergeLocalToBuilder() {
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
@ -100,7 +102,8 @@ public ApplicationId getApplicationId() {
|
||||
return this.appId;
|
||||
}
|
||||
|
||||
GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
GetTimelineCollectorContextRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (!p.hasAppId()) {
|
||||
return null;
|
||||
}
|
||||
@ -110,14 +113,16 @@ public ApplicationId getApplicationId() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationId(ApplicationId appId) {
|
||||
public void setApplicationId(ApplicationId id) {
|
||||
maybeInitBuilder();
|
||||
if (appId == null)
|
||||
if (id == null) {
|
||||
builder.clearAppId();
|
||||
this.appId = appId;
|
||||
}
|
||||
this.appId = id;
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(YarnProtos.ApplicationIdProto p) {
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(
|
||||
YarnProtos.ApplicationIdProto p) {
|
||||
return new ApplicationIdPBImpl(p);
|
||||
}
|
||||
|
||||
|
@ -26,10 +26,10 @@
|
||||
public class GetTimelineCollectorContextResponsePBImpl extends
|
||||
GetTimelineCollectorContextResponse {
|
||||
|
||||
GetTimelineCollectorContextResponseProto proto =
|
||||
private GetTimelineCollectorContextResponseProto proto =
|
||||
GetTimelineCollectorContextResponseProto.getDefaultInstance();
|
||||
GetTimelineCollectorContextResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
private GetTimelineCollectorContextResponseProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public GetTimelineCollectorContextResponsePBImpl() {
|
||||
builder = GetTimelineCollectorContextResponseProto.newBuilder();
|
||||
@ -55,8 +55,9 @@ public int hashCode() {
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
@ -69,8 +70,9 @@ public String toString() {
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
@ -84,7 +86,8 @@ private void maybeInitBuilder() {
|
||||
|
||||
@Override
|
||||
public String getUserId() {
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (!p.hasUserId()) {
|
||||
return null;
|
||||
}
|
||||
@ -103,7 +106,8 @@ public void setUserId(String userId) {
|
||||
|
||||
@Override
|
||||
public String getFlowName() {
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (!p.hasFlowName()) {
|
||||
return null;
|
||||
}
|
||||
@ -122,7 +126,8 @@ public void setFlowName(String flowName) {
|
||||
|
||||
@Override
|
||||
public String getFlowVersion() {
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (!p.hasFlowVersion()) {
|
||||
return null;
|
||||
}
|
||||
@ -141,7 +146,8 @@ public void setFlowVersion(String flowVersion) {
|
||||
|
||||
@Override
|
||||
public long getFlowRunId() {
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
GetTimelineCollectorContextResponseProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getFlowRunId();
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||
private Set<NodeLabel> labels = null;
|
||||
private List<LogAggregationReport> logAggregationReportsForApps = null;
|
||||
|
||||
Map<ApplicationId, String> registeredCollectors = null;
|
||||
private Map<ApplicationId, String> registeredCollectors = null;
|
||||
|
||||
public NodeHeartbeatRequestPBImpl() {
|
||||
builder = NodeHeartbeatRequestProto.newBuilder();
|
||||
@ -161,7 +161,8 @@ private LogAggregationReportProto convertToProtoFormat(
|
||||
private void addRegisteredCollectorsToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearRegisteredCollectors();
|
||||
for (Map.Entry<ApplicationId, String> entry : registeredCollectors.entrySet()) {
|
||||
for (Map.Entry<ApplicationId, String> entry :
|
||||
registeredCollectors.entrySet()) {
|
||||
builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder()
|
||||
.setAppId(convertToProtoFormat(entry.getKey()))
|
||||
.setAppCollectorAddr(entry.getValue()));
|
||||
|
@ -69,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||
private List<ApplicationId> applicationsToCleanup = null;
|
||||
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
|
||||
private Resource resource = null;
|
||||
Map<ApplicationId, String> appCollectorsMap = null;
|
||||
private Map<ApplicationId, String> appCollectorsMap = null;
|
||||
|
||||
private MasterKey containerTokenMasterKey = null;
|
||||
private MasterKey nmTokenMasterKey = null;
|
||||
|
@ -30,11 +30,11 @@
|
||||
public class ReportNewCollectorInfoRequestPBImpl extends
|
||||
ReportNewCollectorInfoRequest {
|
||||
|
||||
ReportNewCollectorInfoRequestProto proto =
|
||||
private ReportNewCollectorInfoRequestProto proto =
|
||||
ReportNewCollectorInfoRequestProto.getDefaultInstance();
|
||||
|
||||
ReportNewCollectorInfoRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
private ReportNewCollectorInfoRequestProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
private List<AppCollectorsMap> collectorsList = null;
|
||||
|
||||
@ -62,8 +62,9 @@ public int hashCode() {
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
@ -71,8 +72,9 @@ public boolean equals(Object other) {
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
@ -104,10 +106,10 @@ private void addLocalCollectorsToProto() {
|
||||
|
||||
private void initLocalCollectorsList() {
|
||||
ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<AppCollectorsMapProto> collectorsList =
|
||||
List<AppCollectorsMapProto> list =
|
||||
p.getAppCollectorsList();
|
||||
this.collectorsList = new ArrayList<AppCollectorsMap>();
|
||||
for (AppCollectorsMapProto m : collectorsList) {
|
||||
for (AppCollectorsMapProto m : list) {
|
||||
this.collectorsList.add(convertFromProtoFormat(m));
|
||||
}
|
||||
}
|
||||
|
@ -29,18 +29,19 @@
|
||||
public class ReportNewCollectorInfoResponsePBImpl extends
|
||||
ReportNewCollectorInfoResponse {
|
||||
|
||||
ReportNewCollectorInfoResponseProto proto =
|
||||
private ReportNewCollectorInfoResponseProto proto =
|
||||
ReportNewCollectorInfoResponseProto.getDefaultInstance();
|
||||
|
||||
ReportNewCollectorInfoResponseProto.Builder builder = null;
|
||||
private ReportNewCollectorInfoResponseProto.Builder builder = null;
|
||||
|
||||
boolean viaProto = false;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public ReportNewCollectorInfoResponsePBImpl() {
|
||||
builder = ReportNewCollectorInfoResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public ReportNewCollectorInfoResponsePBImpl(ReportNewCollectorInfoResponseProto proto) {
|
||||
public ReportNewCollectorInfoResponsePBImpl(
|
||||
ReportNewCollectorInfoResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
@ -58,8 +59,9 @@ public int hashCode() {
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
|
@ -33,11 +33,11 @@
|
||||
@Unstable
|
||||
public class AppCollectorsMapPBImpl extends AppCollectorsMap {
|
||||
|
||||
AppCollectorsMapProto proto =
|
||||
private AppCollectorsMapProto proto =
|
||||
AppCollectorsMapProto.getDefaultInstance();
|
||||
|
||||
AppCollectorsMapProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
private AppCollectorsMapProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
private ApplicationId appId = null;
|
||||
private String collectorAddr = null;
|
||||
@ -65,8 +65,9 @@ public int hashCode() {
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
@ -98,12 +99,12 @@ public String getCollectorAddr() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationId(ApplicationId appId) {
|
||||
public void setApplicationId(ApplicationId id) {
|
||||
maybeInitBuilder();
|
||||
if (appId == null) {
|
||||
if (id == null) {
|
||||
builder.clearAppId();
|
||||
}
|
||||
this.appId = appId;
|
||||
this.appId = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +77,8 @@ public class TestRPC {
|
||||
|
||||
private static final String EXCEPTION_MSG = "test error";
|
||||
private static final String EXCEPTION_CAUSE = "exception cause";
|
||||
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
private static final RecordFactory RECORD_FACTORY =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
public static final String ILLEGAL_NUMBER_MESSAGE =
|
||||
"collectors' number in ReportNewCollectorInfoRequest is not ONE.";
|
||||
@ -101,7 +102,8 @@ public void testUnknownCall() {
|
||||
|
||||
// Any unrelated protocol would do
|
||||
ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy(
|
||||
ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
|
||||
ApplicationClientProtocol.class, NetUtils.getConnectAddress(server),
|
||||
conf);
|
||||
|
||||
try {
|
||||
proxy.getNewApplication(Records
|
||||
@ -111,7 +113,8 @@ public void testUnknownCall() {
|
||||
Assert.assertTrue(e.getMessage().matches(
|
||||
"Unknown method getNewApplication called on.*"
|
||||
+ "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
|
||||
+ "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
|
||||
+ "\\$ApplicationClientProtocolService\\$BlockingInterface "
|
||||
+ "protocol."));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
@ -132,8 +135,10 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
|
||||
server.start();
|
||||
|
||||
// Test unrelated protocol wouldn't get response
|
||||
ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy(
|
||||
ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
|
||||
ApplicationClientProtocol unknownProxy =
|
||||
(ApplicationClientProtocol) rpc.getProxy(
|
||||
ApplicationClientProtocol.class, NetUtils.getConnectAddress(server),
|
||||
conf);
|
||||
|
||||
try {
|
||||
unknownProxy.getNewApplication(Records
|
||||
@ -143,14 +148,17 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
|
||||
Assert.assertTrue(e.getMessage().matches(
|
||||
"Unknown method getNewApplication called on.*"
|
||||
+ "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
|
||||
+ "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
|
||||
+ "\\$ApplicationClientProtocolService\\$BlockingInterface "
|
||||
+ "protocol."));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// Test CollectorNodemanagerProtocol get proper response
|
||||
CollectorNodemanagerProtocol proxy = (CollectorNodemanagerProtocol)rpc.getProxy(
|
||||
CollectorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
|
||||
CollectorNodemanagerProtocol proxy =
|
||||
(CollectorNodemanagerProtocol)rpc.getProxy(
|
||||
CollectorNodemanagerProtocol.class, NetUtils.getConnectAddress(server),
|
||||
conf);
|
||||
// Verify request with DEFAULT_APP_ID and DEFAULT_COLLECTOR_ADDR get
|
||||
// normally response.
|
||||
try {
|
||||
@ -196,7 +204,8 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
|
||||
Assert.fail("RPC call failured is expected here.");
|
||||
} catch (YarnException | IOException e) {
|
||||
Assert.assertTrue(e instanceof YarnException);
|
||||
Assert.assertTrue(e.getMessage().contains("The application is not found."));
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"The application is not found."));
|
||||
}
|
||||
server.stop();
|
||||
}
|
||||
@ -215,12 +224,13 @@ private void test(String rpcClass) throws Exception {
|
||||
Server server = rpc.getServer(ContainerManagementProtocol.class,
|
||||
new DummyContainerManager(), addr, conf, null, 1);
|
||||
server.start();
|
||||
RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
|
||||
RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
ContainerManagementProtocol proxy = (ContainerManagementProtocol)
|
||||
rpc.getProxy(ContainerManagementProtocol.class,
|
||||
NetUtils.getConnectAddress(server), conf);
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
RECORD_FACTORY.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
@ -257,7 +267,7 @@ private void test(String rpcClass) throws Exception {
|
||||
boolean exception = false;
|
||||
try {
|
||||
StopContainersRequest stopRequest =
|
||||
recordFactory.newRecordInstance(StopContainersRequest.class);
|
||||
RECORD_FACTORY.newRecordInstance(StopContainersRequest.class);
|
||||
stopRequest.setContainerIds(containerIds);
|
||||
proxy.stopContainers(stopRequest);
|
||||
} catch (YarnException e) {
|
||||
@ -284,7 +294,7 @@ public GetContainerStatusesResponse getContainerStatuses(
|
||||
GetContainerStatusesRequest request)
|
||||
throws YarnException {
|
||||
GetContainerStatusesResponse response =
|
||||
recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
|
||||
RECORD_FACTORY.newRecordInstance(GetContainerStatusesResponse.class);
|
||||
response.setContainerStatuses(statuses);
|
||||
return response;
|
||||
}
|
||||
@ -293,8 +303,9 @@ public GetContainerStatusesResponse getContainerStatuses(
|
||||
public StartContainersResponse startContainers(
|
||||
StartContainersRequest requests) throws YarnException {
|
||||
StartContainersResponse response =
|
||||
recordFactory.newRecordInstance(StartContainersResponse.class);
|
||||
for (StartContainerRequest request : requests.getStartContainerRequests()) {
|
||||
RECORD_FACTORY.newRecordInstance(StartContainersResponse.class);
|
||||
for (StartContainerRequest request :
|
||||
requests.getStartContainerRequests()) {
|
||||
Token containerToken = request.getContainerToken();
|
||||
ContainerTokenIdentifier tokenId = null;
|
||||
|
||||
@ -304,7 +315,7 @@ public StartContainersResponse startContainers(
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
ContainerStatus status =
|
||||
recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
RECORD_FACTORY.newRecordInstance(ContainerStatus.class);
|
||||
status.setState(ContainerState.RUNNING);
|
||||
status.setContainerId(tokenId.getContainerID());
|
||||
status.setExitStatus(0);
|
||||
@ -324,7 +335,8 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
|
||||
|
||||
@Override
|
||||
public IncreaseContainersResourceResponse increaseContainersResource(
|
||||
IncreaseContainersResourceRequest request) throws YarnException, IOException {
|
||||
IncreaseContainersResourceRequest request)
|
||||
throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -383,7 +395,8 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
|
||||
}
|
||||
|
||||
ReportNewCollectorInfoResponse response =
|
||||
recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class);
|
||||
RECORD_FACTORY.newRecordInstance(
|
||||
ReportNewCollectorInfoResponse.class);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -187,8 +187,8 @@ protected ContainerManagerImpl createContainerManager(Context context,
|
||||
}
|
||||
}
|
||||
|
||||
protected NMCollectorService createNMCollectorService(Context context) {
|
||||
return new NMCollectorService(context);
|
||||
protected NMCollectorService createNMCollectorService(Context ctxt) {
|
||||
return new NMCollectorService(ctxt);
|
||||
}
|
||||
|
||||
protected WebServer createWebServer(Context nmContext,
|
||||
|
@ -816,7 +816,8 @@ public void run() {
|
||||
NodeStatusUpdaterImpl.this.context
|
||||
.getNMTokenSecretManager().getCurrentKey(),
|
||||
nodeLabelsForHeartbeat,
|
||||
NodeStatusUpdaterImpl.this.context.getRegisteredCollectors());
|
||||
NodeStatusUpdaterImpl.this.context
|
||||
.getRegisteredCollectors());
|
||||
|
||||
if (logAggregationEnabled) {
|
||||
// pull log aggregation status for application running in this NM
|
||||
@ -939,23 +940,6 @@ public void run() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Caller should take care of sending non null nodelabels for both
|
||||
* arguments
|
||||
*
|
||||
* @param nodeLabelsNew
|
||||
* @param nodeLabelsOld
|
||||
* @return if the New node labels are diff from the older one.
|
||||
*/
|
||||
private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
|
||||
Set<NodeLabel> nodeLabelsOld) {
|
||||
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|
||||
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void updateTimelineClientsAddress(
|
||||
NodeHeartbeatResponse response) {
|
||||
Map<ApplicationId, String> knownCollectorsMap =
|
||||
|
@ -230,8 +230,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
nmMetricsPublisher = createNMTimelinePublisher(context);
|
||||
context.setNMTimelinePublisher(nmMetricsPublisher);
|
||||
}
|
||||
this.containersMonitor =
|
||||
new ContainersMonitorImpl(exec, dispatcher, this.context);
|
||||
this.containersMonitor = createContainersMonitor(exec);
|
||||
addService(this.containersMonitor);
|
||||
|
||||
dispatcher.register(ContainerEventType.class,
|
||||
@ -447,8 +446,9 @@ protected SharedCacheUploadService createSharedCacheUploaderService() {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected NMTimelinePublisher createNMTimelinePublisher(Context context) {
|
||||
NMTimelinePublisher nmTimelinePublisherLocal = new NMTimelinePublisher(context);
|
||||
protected NMTimelinePublisher createNMTimelinePublisher(Context ctxt) {
|
||||
NMTimelinePublisher nmTimelinePublisherLocal =
|
||||
new NMTimelinePublisher(ctxt);
|
||||
addIfService(nmTimelinePublisherLocal);
|
||||
return nmTimelinePublisherLocal;
|
||||
}
|
||||
|
@ -25,7 +25,8 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent {
|
||||
private ContainerStatus containerStatus;
|
||||
|
||||
public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
|
||||
super(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(),
|
||||
super(containerStatus.getContainerId().getApplicationAttemptId().
|
||||
getApplicationId(),
|
||||
ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
|
||||
this.containerStatus = containerStatus;
|
||||
}
|
||||
|
@ -74,7 +74,6 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
|
||||
@ -87,7 +86,6 @@
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -435,7 +435,8 @@ public void run() {
|
||||
+ " for the first time");
|
||||
|
||||
ResourceCalculatorProcessTree pt =
|
||||
ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
|
||||
ResourceCalculatorProcessTree.
|
||||
getResourceCalculatorProcessTree(
|
||||
pId, processTreeClass, conf);
|
||||
ptInfo.setPid(pId);
|
||||
ptInfo.setProcessTree(pt);
|
||||
|
@ -33,12 +33,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -56,6 +56,8 @@
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Metrics publisher service that publishes data to the timeline service v.2. It
|
||||
* is used only if the timeline service v.2 is enabled and the system publishing
|
||||
@ -73,7 +75,7 @@ public class NMTimelinePublisher extends CompositeService {
|
||||
|
||||
private String httpAddress;
|
||||
|
||||
protected final Map<ApplicationId, TimelineClient> appToClientMap;
|
||||
private final Map<ApplicationId, TimelineClient> appToClientMap;
|
||||
|
||||
public NMTimelinePublisher(Context context) {
|
||||
super(NMTimelinePublisher.class.getName());
|
||||
@ -99,6 +101,11 @@ protected void serviceStart() throws Exception {
|
||||
this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<ApplicationId, TimelineClient> getAppToClientMap() {
|
||||
return appToClientMap;
|
||||
}
|
||||
|
||||
protected void handleNMTimelineEvent(NMTimelineEvent event) {
|
||||
switch (event.getType()) {
|
||||
case TIMELINE_ENTITY_PUBLISH:
|
||||
|
@ -85,7 +85,6 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerContext;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
@ -1707,9 +1706,9 @@ protected NMContext createNMContext(
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
NMStateStoreService store, boolean isDistributedSchedulingEnabled,
|
||||
Configuration conf) {
|
||||
Configuration config) {
|
||||
return new MyNMContext(containerTokenSecretManager,
|
||||
nmTokenSecretManager, conf);
|
||||
nmTokenSecretManager, config);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -645,7 +645,8 @@ private Container createMockedContainer(ApplicationId appId, int containerId) {
|
||||
when(c.getLaunchContext()).thenReturn(launchContext);
|
||||
when(launchContext.getApplicationACLs()).thenReturn(
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
when(c.cloneAndGetContainerStatus()).thenReturn(BuilderUtils.newContainerStatus(cId,
|
||||
when(c.cloneAndGetContainerStatus()).thenReturn(
|
||||
BuilderUtils.newContainerStatus(cId,
|
||||
ContainerState.NEW, "", 0, Resource.newInstance(1024, 1)));
|
||||
return c;
|
||||
}
|
||||
|
@ -99,7 +99,6 @@
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
@ -111,10 +110,11 @@
|
||||
public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||
|
||||
private static final String INVALID_JAVA_HOME = "/no/jvm/here";
|
||||
protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
|
||||
conf) {
|
||||
private Context distContext =
|
||||
new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(),
|
||||
false, conf) {
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
};
|
||||
|
@ -55,8 +55,8 @@ public void testContainerResourceUsage() {
|
||||
when(context.getHttpPort()).thenReturn(0);
|
||||
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
|
||||
public void createTimelineClient(ApplicationId appId) {
|
||||
if (!appToClientMap.containsKey(appId)) {
|
||||
appToClientMap.put(appId, timelineClient);
|
||||
if (!getAppToClientMap().containsKey(appId)) {
|
||||
getAppToClientMap().put(appId, timelineClient);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -24,7 +24,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
@ -40,10 +39,9 @@ public class MockApp implements Application {
|
||||
Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
|
||||
ApplicationState appState;
|
||||
Application app;
|
||||
String flowName;
|
||||
String flowVersion;
|
||||
long flowRunId;
|
||||
TimelineClient timelineClient = null;
|
||||
private String flowName;
|
||||
private String flowVersion;
|
||||
private long flowRunId;
|
||||
|
||||
public MockApp(int uniqId) {
|
||||
this("mockUser", 1234, uniqId);
|
||||
|
@ -373,7 +373,8 @@ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
|
||||
@Override
|
||||
public void setRMTimelineCollectorManager(
|
||||
RMTimelineCollectorManager timelineCollectorManager) {
|
||||
activeServiceContext.setRMTimelineCollectorManager(timelineCollectorManager);
|
||||
activeServiceContext.setRMTimelineCollectorManager(
|
||||
timelineCollectorManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,9 +76,9 @@
|
||||
public class TestSystemMetricsPublisherForV2 {
|
||||
|
||||
/**
|
||||
* is the folder where the FileSystemTimelineWriterImpl writes the entities
|
||||
* The folder where the FileSystemTimelineWriterImpl writes the entities.
|
||||
*/
|
||||
protected static File testRootDir = new File("target",
|
||||
private static File testRootDir = new File("target",
|
||||
TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
|
||||
@ -151,7 +151,8 @@ private static Configuration getTimelineV2Conf() {
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
Assert
|
||||
.fail("Exception while setting the TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
|
||||
.fail("Exception while setting the " +
|
||||
"TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
|
||||
}
|
||||
return conf;
|
||||
}
|
||||
@ -159,30 +160,30 @@ private static Configuration getTimelineV2Conf() {
|
||||
@Test
|
||||
public void testSystemMetricPublisherInitialization() {
|
||||
@SuppressWarnings("resource")
|
||||
TimelineServiceV2Publisher metricsPublisher =
|
||||
TimelineServiceV2Publisher publisher =
|
||||
new TimelineServiceV2Publisher(mock(RMContext.class));
|
||||
try {
|
||||
Configuration conf = getTimelineV2Conf();
|
||||
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_EVENTS_ENABLED);
|
||||
metricsPublisher.init(conf);
|
||||
publisher.init(conf);
|
||||
assertFalse(
|
||||
"Default configuration should not publish container events from RM",
|
||||
metricsPublisher.isPublishContainerEvents());
|
||||
publisher.isPublishContainerEvents());
|
||||
|
||||
metricsPublisher.stop();
|
||||
publisher.stop();
|
||||
|
||||
metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class));
|
||||
publisher = new TimelineServiceV2Publisher(mock(RMContext.class));
|
||||
conf = getTimelineV2Conf();
|
||||
metricsPublisher.init(conf);
|
||||
publisher.init(conf);
|
||||
assertTrue("Expected to have registered event handlers and set ready to "
|
||||
+ "publish events after init",
|
||||
metricsPublisher.isPublishContainerEvents());
|
||||
metricsPublisher.start();
|
||||
publisher.isPublishContainerEvents());
|
||||
publisher.start();
|
||||
assertTrue("Expected to publish container events from RM",
|
||||
metricsPublisher.isPublishContainerEvents());
|
||||
publisher.isPublishContainerEvents());
|
||||
} finally {
|
||||
metricsPublisher.stop();
|
||||
publisher.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,7 +134,8 @@ public void testPutExtendedEntities() throws Exception {
|
||||
ApplicationEntity app = new ApplicationEntity();
|
||||
app.setId(appId.toString());
|
||||
flow.addChild(app.getType(), app.getId());
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
ApplicationAttemptId attemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ApplicationAttemptEntity appAttempt = new ApplicationAttemptEntity();
|
||||
appAttempt.setId(attemptId.toString());
|
||||
ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
|
||||
@ -144,8 +145,10 @@ public void testPutExtendedEntities() throws Exception {
|
||||
user.setId(UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
QueueEntity queue = new QueueEntity();
|
||||
queue.setId("default_queue");
|
||||
client.putEntities(cluster, flow, app, appAttempt, container, user, queue);
|
||||
client.putEntitiesAsync(cluster, flow, app, appAttempt, container, user, queue);
|
||||
client.putEntities(cluster, flow, app, appAttempt, container, user,
|
||||
queue);
|
||||
client.putEntitiesAsync(cluster, flow, app, appAttempt, container, user,
|
||||
queue);
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
|
@ -391,10 +391,11 @@ private static ClientResponse getResponse(Client client, URI uri)
|
||||
client.resource(uri).accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
resp.getStatusInfo().getStatusCode() !=
|
||||
ClientResponse.Status.OK.getStatusCode()) {
|
||||
String msg = "";
|
||||
if (resp != null) {
|
||||
msg = resp.getClientResponseStatus().toString();
|
||||
msg = String.valueOf(resp.getStatusInfo().getStatusCode());
|
||||
}
|
||||
throw new IOException("Incorrect response from timeline reader. " +
|
||||
"Status=" + msg);
|
||||
@ -406,7 +407,8 @@ private static class DummyURLConnectionFactory
|
||||
implements HttpURLConnectionFactory {
|
||||
|
||||
@Override
|
||||
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
|
||||
public HttpURLConnection getHttpURLConnection(final URL url)
|
||||
throws IOException {
|
||||
try {
|
||||
return (HttpURLConnection)url.openConnection();
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
@ -422,10 +424,10 @@ private static TimelineEntity newEntity(String type, String id) {
|
||||
}
|
||||
|
||||
private static TimelineMetric newMetric(TimelineMetric.Type type,
|
||||
String id, long ts, Number value) {
|
||||
String id, long t, Number value) {
|
||||
TimelineMetric metric = new TimelineMetric(type);
|
||||
metric.setId(id);
|
||||
metric.addValue(ts, value);
|
||||
metric.addValue(t, value);
|
||||
return metric;
|
||||
}
|
||||
|
||||
@ -463,7 +465,7 @@ private static void verifyHttpResponse(Client client, URI uri,
|
||||
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertNotNull(resp);
|
||||
assertTrue("Response from server should have been " + status,
|
||||
resp.getClientResponseStatus().equals(status));
|
||||
resp.getStatusInfo().getStatusCode() == status.getStatusCode());
|
||||
System.out.println("Response is: " + resp.getEntity(String.class));
|
||||
}
|
||||
|
||||
@ -866,7 +868,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
|
||||
String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111";
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
|
||||
"app-uid/" + appUIDWithoutFlowInfo);
|
||||
resp = getResponse(client, uri);;
|
||||
resp = getResponse(client, uri);
|
||||
TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(appEntity2);
|
||||
assertEquals(
|
||||
@ -893,7 +895,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
|
||||
String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1";
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
|
||||
"entity-uid/" + entityUIDWithFlowInfo);
|
||||
resp = getResponse(client, uri);;
|
||||
resp = getResponse(client, uri);
|
||||
TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(singleEntity1);
|
||||
assertEquals("type1", singleEntity1.getType());
|
||||
@ -903,7 +905,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
|
||||
appUIDWithoutFlowInfo + "!type1!entity1";
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
|
||||
"entity-uid/" + entityUIDWithoutFlowInfo);
|
||||
resp = getResponse(client, uri);;
|
||||
resp = getResponse(client, uri);
|
||||
TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(singleEntity2);
|
||||
assertEquals("type1", singleEntity2.getType());
|
||||
@ -1425,7 +1427,8 @@ public void testGetEntitiesInfoFilters() throws Exception {
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
|
||||
"entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" +
|
||||
"eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0)");
|
||||
"eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0" +
|
||||
")");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -2436,9 +2436,9 @@ public void testReadEntitiesMetricFilterPrefix() throws Exception {
|
||||
assertEquals(2, metricCnt);
|
||||
|
||||
entities = reader.getEntities(new TimelineReaderContext("cluster1", "user1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null), new TimelineEntityFilters(null, null, null, null, null, null,
|
||||
null, metricFilterList1, null), new TimelineDataToRetrieve(null,
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111",
|
||||
"world", null), new TimelineEntityFilters(null, null, null, null, null,
|
||||
null, null, metricFilterList1, null), new TimelineDataToRetrieve(null,
|
||||
metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE));
|
||||
assertEquals(2, entities.size());
|
||||
metricCnt = 0;
|
||||
@ -2736,8 +2736,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt1, null, null, null,
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt1, null, null,
|
||||
null, null),
|
||||
new TimelineDataToRetrieve());
|
||||
assertEquals(1, entities.size());
|
||||
isRelatedToCnt = 0;
|
||||
@ -2760,8 +2760,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt2, null, null, null,
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt2, null, null,
|
||||
null, null),
|
||||
new TimelineDataToRetrieve());
|
||||
assertEquals(2, entities.size());
|
||||
isRelatedToCnt = 0;
|
||||
@ -2783,8 +2783,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt3, null, null, null,
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt3, null, null,
|
||||
null, null),
|
||||
new TimelineDataToRetrieve());
|
||||
assertEquals(1, entities.size());
|
||||
isRelatedToCnt = 0;
|
||||
@ -2807,8 +2807,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt4, null, null, null,
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt4, null, null,
|
||||
null, null),
|
||||
new TimelineDataToRetrieve());
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
@ -2820,8 +2820,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt5, null, null, null,
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt5, null, null,
|
||||
null, null),
|
||||
new TimelineDataToRetrieve());
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
@ -2841,8 +2841,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
|
||||
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
|
||||
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt6, null, null, null,
|
||||
null),
|
||||
new TimelineEntityFilters(null, null, null, null, irt6, null, null,
|
||||
null, null),
|
||||
new TimelineDataToRetrieve());
|
||||
assertEquals(1, entities.size());
|
||||
isRelatedToCnt = 0;
|
||||
|
@ -31,12 +31,14 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Generates the data/entities for the FlowRun and FlowActivity Tables
|
||||
* Generates the data/entities for the FlowRun and FlowActivity Tables.
|
||||
*/
|
||||
class TestFlowDataGenerator {
|
||||
final class TestFlowDataGenerator {
|
||||
private TestFlowDataGenerator() {
|
||||
}
|
||||
|
||||
private static final String metric1 = "MAP_SLOT_MILLIS";
|
||||
private static final String metric2 = "HDFS_BYTES_READ";
|
||||
private static final String METRIC_1 = "MAP_SLOT_MILLIS";
|
||||
private static final String METRIC_2 = "HDFS_BYTES_READ";
|
||||
public static final long END_TS_INCR = 10000L;
|
||||
|
||||
static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
||||
@ -51,7 +53,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId(metric1);
|
||||
m1.setId(METRIC_1);
|
||||
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||
long ts = insertTs;
|
||||
|
||||
@ -64,7 +66,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
||||
metrics.add(m1);
|
||||
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId(metric2);
|
||||
m2.setId(METRIC_2);
|
||||
metricValues = new HashMap<Long, Number>();
|
||||
ts = System.currentTimeMillis();
|
||||
for (int k = 1; k < 100; k++) {
|
||||
@ -81,7 +83,8 @@ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
||||
}
|
||||
|
||||
|
||||
static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
|
||||
static TimelineEntity getEntityMetricsApp1Complete(long insertTs,
|
||||
Configuration c1) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunMetrics_test";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -93,7 +96,7 @@ static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId(metric1);
|
||||
m1.setId(METRIC_1);
|
||||
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||
long ts = insertTs;
|
||||
|
||||
@ -103,7 +106,7 @@ static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration
|
||||
metrics.add(m1);
|
||||
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId(metric2);
|
||||
m2.setId(METRIC_2);
|
||||
metricValues = new HashMap<Long, Number>();
|
||||
ts = insertTs;
|
||||
metricValues.put(ts - 80000, 57L);
|
||||
@ -134,7 +137,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs) {
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId(metric1);
|
||||
m1.setId(METRIC_1);
|
||||
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||
long ts = insertTs;
|
||||
metricValues.put(ts - 100000, 2L);
|
||||
@ -144,7 +147,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs) {
|
||||
metrics.add(m1);
|
||||
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId(metric2);
|
||||
m2.setId(METRIC_2);
|
||||
metricValues = new HashMap<Long, Number>();
|
||||
ts = insertTs;
|
||||
metricValues.put(ts - 100000, 31L);
|
||||
@ -177,7 +180,7 @@ static TimelineEntity getEntityMetricsApp2(long insertTs) {
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId(metric1);
|
||||
m1.setId(METRIC_1);
|
||||
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||
long ts = insertTs;
|
||||
metricValues.put(ts - 100000, 5L);
|
||||
@ -208,7 +211,7 @@ static TimelineEntity getEntity1() {
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId(metric1);
|
||||
m1.setId(METRIC_1);
|
||||
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||
long ts = System.currentTimeMillis();
|
||||
metricValues.put(ts - 120000, 100000000L);
|
||||
@ -250,7 +253,7 @@ static TimelineEntity getAFullEntity(long ts, long endTs) {
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId(metric1);
|
||||
m1.setId(METRIC_1);
|
||||
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||
metricValues.put(ts - 120000, 100000000L);
|
||||
metricValues.put(ts - 100000, 200000000L);
|
||||
@ -262,7 +265,7 @@ static TimelineEntity getAFullEntity(long ts, long endTs) {
|
||||
m1.setValues(metricValues);
|
||||
metrics.add(m1);
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId(metric2);
|
||||
m2.setId(METRIC_2);
|
||||
metricValues = new HashMap<Long, Number>();
|
||||
metricValues.put(ts - 900000, 31L);
|
||||
metricValues.put(ts - 30000, 57L);
|
||||
|
@ -58,7 +58,7 @@
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the FlowRun and FlowActivity Tables
|
||||
* Tests the FlowRun and FlowActivity Tables.
|
||||
*/
|
||||
public class TestHBaseStorageFlowActivity {
|
||||
|
||||
@ -114,7 +114,7 @@ public void testWriteFlowRunMinMax() throws Exception {
|
||||
String appName = "application_100000000000_1111";
|
||||
long minStartTs = 1424995200300L;
|
||||
long greaterStartTs = 1424995200300L + 864000L;
|
||||
long endTs = 1424995200300L + 86000000L;;
|
||||
long endTs = 1424995200300L + 86000000L;
|
||||
TimelineEntity entityMinStartTime = TestFlowDataGenerator
|
||||
.getEntityMinStartTime(minStartTs);
|
||||
|
||||
@ -209,7 +209,7 @@ public void testWriteFlowRunMinMax() throws Exception {
|
||||
|
||||
/**
|
||||
* Write 1 application entity and checks the record for today in the flow
|
||||
* activity table
|
||||
* activity table.
|
||||
*/
|
||||
@Test
|
||||
public void testWriteFlowActivityOneFlow() throws Exception {
|
||||
@ -313,10 +313,10 @@ private void checkFlowActivityTable(String cluster, String user, String flow,
|
||||
|
||||
/**
|
||||
* Writes 3 applications each with a different run id and version for the same
|
||||
* {cluster, user, flow}
|
||||
* {cluster, user, flow}.
|
||||
*
|
||||
* They should be getting inserted into one record in the flow activity table
|
||||
* with 3 columns, one per run id
|
||||
* with 3 columns, one per run id.
|
||||
*/
|
||||
@Test
|
||||
public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
|
||||
@ -425,7 +425,8 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
|
||||
s.setStartRow(startRow);
|
||||
String clusterStop = cluster + "1";
|
||||
byte[] stopRow =
|
||||
new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey();
|
||||
new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow)
|
||||
.getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn.getTable(TableName
|
||||
|
@ -69,7 +69,7 @@
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the FlowRun and FlowActivity Tables
|
||||
* Tests the FlowRun and FlowActivity Tables.
|
||||
*/
|
||||
public class TestHBaseStorageFlowRun {
|
||||
|
||||
@ -361,13 +361,15 @@ void checkFlowRunTableBatchLimit(String cluster, String user,
|
||||
|
||||
Scan s = new Scan();
|
||||
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
|
||||
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
|
||||
byte[] startRow =
|
||||
new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
|
||||
s.setStartRow(startRow);
|
||||
// set a batch limit
|
||||
int batchLimit = 2;
|
||||
s.setBatch(batchLimit);
|
||||
String clusterStop = cluster + "1";
|
||||
byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
|
||||
byte[] stopRow =
|
||||
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
|
||||
s.setStopRow(stopRow);
|
||||
Connection conn = ConnectionFactory.createConnection(c1);
|
||||
Table table1 = conn
|
||||
|
@ -63,14 +63,14 @@
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the FlowRun and FlowActivity Tables
|
||||
* Tests the FlowRun and FlowActivity Tables.
|
||||
*/
|
||||
public class TestHBaseStorageFlowRunCompaction {
|
||||
|
||||
private static HBaseTestingUtility util;
|
||||
|
||||
private static final String metric1 = "MAP_SLOT_MILLIS";
|
||||
private static final String metric2 = "HDFS_BYTES_READ";
|
||||
private static final String METRIC_1 = "MAP_SLOT_MILLIS";
|
||||
private static final String METRIC_2 = "HDFS_BYTES_READ";
|
||||
|
||||
private final byte[] aRowKey = Bytes.toBytes("a");
|
||||
private final byte[] aFamily = Bytes.toBytes("family");
|
||||
@ -89,8 +89,8 @@ private static void createSchema() throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
}
|
||||
|
||||
/** writes non numeric data into flow run table
|
||||
* reads it back
|
||||
/** Writes non numeric data into flow run table
|
||||
* reads it back.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@ -325,7 +325,8 @@ public void testWriteFlowRunCompaction() throws Exception {
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
List<Region> regions = server.getOnlineRegions(TableName
|
||||
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
|
||||
assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
|
||||
assertTrue("Didn't find any regions for primary table!",
|
||||
regions.size() > 0);
|
||||
// flush and compact all the regions of the primary table
|
||||
for (Region region : regions) {
|
||||
region.flush(true);
|
||||
@ -363,13 +364,13 @@ private void checkFlowRunTable(String cluster, String user, String flow,
|
||||
rowCount++;
|
||||
// check metric1
|
||||
byte[] q = ColumnHelper.getColumnQualifier(
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1);
|
||||
assertTrue(values.containsKey(q));
|
||||
assertEquals(141, Bytes.toLong(values.get(q)));
|
||||
|
||||
// check metric2
|
||||
q = ColumnHelper.getColumnQualifier(
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
|
||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2);
|
||||
assertTrue(values.containsKey(q));
|
||||
assertEquals(57, Bytes.toLong(values.get(q)));
|
||||
}
|
||||
@ -385,7 +386,7 @@ private FlowScanner getFlowScannerForTestingCompaction() {
|
||||
// okay to pass in nulls for the constructor arguments
|
||||
// because all we want to do is invoke the process summation
|
||||
FlowScanner fs = new FlowScanner(null, null,
|
||||
(request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
|
||||
(request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
|
||||
: FlowScannerOperation.MINOR_COMPACTION));
|
||||
assertNotNull(fs);
|
||||
return fs;
|
||||
@ -571,7 +572,8 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
|
||||
// of type SUM and SUM_FINAL
|
||||
// NOT cells of SUM_FINAL will expire
|
||||
@Test
|
||||
public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException {
|
||||
public void checkProcessSummationMoreCellsSumFinalVariedTags()
|
||||
throws IOException {
|
||||
FlowScanner fs = getFlowScannerForTestingCompaction();
|
||||
int countFinal = 20100;
|
||||
int countNotFinal = 1000;
|
||||
@ -585,7 +587,8 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOExceptio
|
||||
long cellTsFinalStart = 10001120L;
|
||||
long cellTsFinal = cellTsFinalStart;
|
||||
|
||||
long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp(
|
||||
long cellTsFinalStartNotExpire =
|
||||
TimestampGenerator.getSupplementedTimestamp(
|
||||
System.currentTimeMillis(), "application_10266666661166_118821");
|
||||
long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
|
||||
|
||||
|
@ -53,7 +53,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
|
||||
@ -180,7 +179,7 @@ public Response putEntities(
|
||||
private static ApplicationId parseApplicationId(String appId) {
|
||||
try {
|
||||
if (appId != null) {
|
||||
return ConverterUtils.toApplicationId(appId.trim());
|
||||
return ApplicationId.fromString(appId.trim());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -26,4 +26,3 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
|
@ -20,7 +20,6 @@
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
/**
|
||||
* Encodes and decodes {@link ApplicationId} for row keys.
|
||||
@ -50,7 +49,7 @@ public AppIdKeyConverter() {
|
||||
*/
|
||||
@Override
|
||||
public byte[] encode(String appIdStr) {
|
||||
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
|
||||
ApplicationId appId = ApplicationId.fromString(appIdStr);
|
||||
byte[] appIdBytes = new byte[getKeySize()];
|
||||
byte[] clusterTs = Bytes.toBytes(
|
||||
LongConverter.invertLong(appId.getClusterTimestamp()));
|
||||
|
@ -21,7 +21,6 @@
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
/**
|
||||
* Utility class that allows HBase coprocessors to interact with unique
|
||||
@ -99,7 +98,7 @@ private static long getAppIdSuffix(String appIdStr) {
|
||||
if (appIdStr == null) {
|
||||
return 0L;
|
||||
}
|
||||
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
|
||||
ApplicationId appId = ApplicationId.fromString(appIdStr);
|
||||
long id = appId.getId() % TS_MULTIPLIER;
|
||||
return id;
|
||||
}
|
||||
|
@ -26,4 +26,3 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
|
@ -88,9 +88,9 @@ public void testStartWebApp() throws Exception {
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testMultithreadedAdd() throws Exception {
|
||||
final int NUM_APPS = 5;
|
||||
final int numApps = 5;
|
||||
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
|
||||
for (int i = 0; i < NUM_APPS; i++) {
|
||||
for (int i = 0; i < numApps; i++) {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0L, i);
|
||||
Callable<Boolean> task = new Callable<Boolean>() {
|
||||
public Boolean call() {
|
||||
@ -101,7 +101,7 @@ public Boolean call() {
|
||||
};
|
||||
tasks.add(task);
|
||||
}
|
||||
ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(numApps);
|
||||
try {
|
||||
List<Future<Boolean>> futures = executor.invokeAll(tasks);
|
||||
for (Future<Boolean> future: futures) {
|
||||
@ -111,7 +111,7 @@ public Boolean call() {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
// check the keys
|
||||
for (int i = 0; i < NUM_APPS; i++) {
|
||||
for (int i = 0; i < numApps; i++) {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0L, i);
|
||||
assertTrue(collectorManager.containsTimelineCollector(appId));
|
||||
}
|
||||
@ -119,9 +119,9 @@ public Boolean call() {
|
||||
|
||||
@Test
|
||||
public void testMultithreadedAddAndRemove() throws Exception {
|
||||
final int NUM_APPS = 5;
|
||||
final int numApps = 5;
|
||||
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
|
||||
for (int i = 0; i < NUM_APPS; i++) {
|
||||
for (int i = 0; i < numApps; i++) {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0L, i);
|
||||
Callable<Boolean> task = new Callable<Boolean>() {
|
||||
public Boolean call() {
|
||||
@ -134,7 +134,7 @@ public Boolean call() {
|
||||
};
|
||||
tasks.add(task);
|
||||
}
|
||||
ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(numApps);
|
||||
try {
|
||||
List<Future<Boolean>> futures = executor.invokeAll(tasks);
|
||||
for (Future<Boolean> future: futures) {
|
||||
@ -144,16 +144,16 @@ public Boolean call() {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
// check the keys
|
||||
for (int i = 0; i < NUM_APPS; i++) {
|
||||
for (int i = 0; i < numApps; i++) {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0L, i);
|
||||
assertFalse(collectorManager.containsTimelineCollector(appId));
|
||||
}
|
||||
}
|
||||
|
||||
private NodeTimelineCollectorManager createCollectorManager() {
|
||||
final NodeTimelineCollectorManager collectorManager =
|
||||
final NodeTimelineCollectorManager cm =
|
||||
spy(new NodeTimelineCollectorManager());
|
||||
doReturn(new Configuration()).when(collectorManager).getConfig();
|
||||
doReturn(new Configuration()).when(cm).getConfig();
|
||||
CollectorNodemanagerProtocol nmCollectorService =
|
||||
mock(CollectorNodemanagerProtocol.class);
|
||||
GetTimelineCollectorContextResponse response =
|
||||
@ -164,7 +164,7 @@ private NodeTimelineCollectorManager createCollectorManager() {
|
||||
} catch (YarnException | IOException e) {
|
||||
fail();
|
||||
}
|
||||
doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
|
||||
return collectorManager;
|
||||
doReturn(nmCollectorService).when(cm).getNMCollectorService();
|
||||
return cm;
|
||||
}
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ public void testRemoveApplication() throws Exception {
|
||||
// a configured period
|
||||
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Thread.sleep(500l);
|
||||
Thread.sleep(500L);
|
||||
if (!auxService.hasApplication(appAttemptId.getApplicationId())) {
|
||||
break;
|
||||
}
|
||||
@ -154,7 +154,7 @@ public void testLaunch() throws Exception {
|
||||
|
||||
private PerNodeTimelineCollectorsAuxService
|
||||
createCollectorAndAddApplication() {
|
||||
PerNodeTimelineCollectorsAuxService auxService = createCollector();
|
||||
PerNodeTimelineCollectorsAuxService service = createCollector();
|
||||
// create an AM container
|
||||
ContainerId containerId = getAMContainerId();
|
||||
ContainerInitializationContext context =
|
||||
@ -162,17 +162,17 @@ public void testLaunch() throws Exception {
|
||||
when(context.getContainerId()).thenReturn(containerId);
|
||||
when(context.getContainerType()).thenReturn(
|
||||
ContainerType.APPLICATION_MASTER);
|
||||
auxService.initializeContainer(context);
|
||||
return auxService;
|
||||
service.initializeContainer(context);
|
||||
return service;
|
||||
}
|
||||
|
||||
private PerNodeTimelineCollectorsAuxService createCollector() {
|
||||
NodeTimelineCollectorManager collectorManager = createCollectorManager();
|
||||
PerNodeTimelineCollectorsAuxService auxService =
|
||||
PerNodeTimelineCollectorsAuxService service =
|
||||
spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
|
||||
auxService.init(conf);
|
||||
auxService.start();
|
||||
return auxService;
|
||||
service.init(conf);
|
||||
service.start();
|
||||
return service;
|
||||
}
|
||||
|
||||
private NodeTimelineCollectorManager createCollectorManager() {
|
||||
|
@ -110,7 +110,8 @@ private static void verifyHttpResponse(Client client, URI uri,
|
||||
client.resource(uri).accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertNotNull(resp);
|
||||
assertEquals(resp.getClientResponseStatus(), expectedStatus);
|
||||
assertEquals(resp.getStatusInfo().getStatusCode(),
|
||||
expectedStatus.getStatusCode());
|
||||
}
|
||||
|
||||
private static Client createClient() {
|
||||
@ -126,10 +127,11 @@ private static ClientResponse getResponse(Client client, URI uri)
|
||||
client.resource(uri).accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
resp.getStatusInfo().getStatusCode() !=
|
||||
ClientResponse.Status.OK.getStatusCode()) {
|
||||
String msg = new String();
|
||||
if (resp != null) {
|
||||
msg = resp.getClientResponseStatus().toString();
|
||||
msg = String.valueOf(resp.getStatusInfo().getStatusCode());
|
||||
}
|
||||
throw new IOException("Incorrect response from timeline reader. " +
|
||||
"Status=" + msg);
|
||||
@ -141,7 +143,8 @@ private static class DummyURLConnectionFactory
|
||||
implements HttpURLConnectionFactory {
|
||||
|
||||
@Override
|
||||
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
|
||||
public HttpURLConnection getHttpURLConnection(final URL url)
|
||||
throws IOException {
|
||||
try {
|
||||
return (HttpURLConnection)url.openConnection();
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
|
@ -58,9 +58,9 @@
|
||||
|
||||
public class TestFileSystemTimelineReaderImpl {
|
||||
|
||||
private static final String rootDir =
|
||||
private static final String ROOT_DIR =
|
||||
FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
|
||||
FileSystemTimelineReaderImpl reader;
|
||||
private FileSystemTimelineReaderImpl reader;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
@ -68,7 +68,7 @@ public static void setup() throws Exception {
|
||||
// Create app flow mapping file.
|
||||
CSVFormat format =
|
||||
CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
|
||||
String appFlowMappingFile = rootDir + "/entities/cluster1/" +
|
||||
String appFlowMappingFile = ROOT_DIR + "/entities/cluster1/" +
|
||||
FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
|
||||
try (PrintWriter out =
|
||||
new PrintWriter(new BufferedWriter(
|
||||
@ -78,12 +78,12 @@ public static void setup() throws Exception {
|
||||
printer.printRecord("app2", "user1", "flow1,flow", 1);
|
||||
printer.close();
|
||||
}
|
||||
(new File(rootDir)).deleteOnExit();
|
||||
(new File(ROOT_DIR)).deleteOnExit();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
FileUtils.deleteDirectory(new File(rootDir));
|
||||
FileUtils.deleteDirectory(new File(ROOT_DIR));
|
||||
}
|
||||
|
||||
@Before
|
||||
@ -91,7 +91,7 @@ public void init() throws Exception {
|
||||
reader = new FileSystemTimelineReaderImpl();
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
rootDir);
|
||||
ROOT_DIR);
|
||||
reader.init(conf);
|
||||
}
|
||||
|
||||
@ -112,7 +112,7 @@ private static void writeEntityFile(TimelineEntity entity, File dir)
|
||||
}
|
||||
|
||||
private static void loadEntityData() throws Exception {
|
||||
File appDir = new File(rootDir +
|
||||
File appDir = new File(ROOT_DIR +
|
||||
"/entities/cluster1/user1/flow1/1/app1/app/");
|
||||
TimelineEntity entity11 = new TimelineEntity();
|
||||
entity11.setId("id_1");
|
||||
@ -254,7 +254,7 @@ private static void loadEntityData() throws Exception {
|
||||
entity4.addEvent(event44);
|
||||
writeEntityFile(entity4, appDir);
|
||||
|
||||
File appDir2 = new File(rootDir +
|
||||
File appDir2 = new File(ROOT_DIR +
|
||||
"/entities/cluster1/user1/flow1,flow/1/app2/app/");
|
||||
TimelineEntity entity5 = new TimelineEntity();
|
||||
entity5.setId("id_5");
|
||||
@ -298,7 +298,7 @@ public void testGetEntityByClusterAndApp() throws Exception {
|
||||
Assert.assertEquals(0, result.getMetrics().size());
|
||||
}
|
||||
|
||||
/** This test checks whether we can handle commas in app flow mapping csv */
|
||||
/** This test checks whether we can handle commas in app flow mapping csv. */
|
||||
@Test
|
||||
public void testAppFlowMappingCsv() throws Exception {
|
||||
// Test getting an entity by cluster and app where flow entry
|
||||
|
@ -41,7 +41,8 @@
|
||||
public class TestFileSystemTimelineWriterImpl {
|
||||
|
||||
/**
|
||||
* Unit test for PoC YARN 3264
|
||||
* Unit test for PoC YARN 3264.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
@ -84,8 +85,8 @@ public void testWriteEntityToFile() throws Exception {
|
||||
"app_id", te);
|
||||
|
||||
String fileName = fsi.getOutputRoot() +
|
||||
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" +
|
||||
type + "/" + id +
|
||||
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/" +
|
||||
"app_id/" + type + "/" + id +
|
||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
Path path = Paths.get(fileName);
|
||||
File f = new File(fileName);
|
||||
|
@ -60,7 +60,8 @@ public class TestRowKeys {
|
||||
if (sepByteLen <= byteArr.length) {
|
||||
for (int i = 0; i < sepByteLen; i++) {
|
||||
byteArr[byteArr.length - sepByteLen + i] =
|
||||
(byte) (longMaxByteArr[byteArr.length - sepByteLen + i] - QUALIFIER_SEP_BYTES[i]);
|
||||
(byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
|
||||
QUALIFIER_SEP_BYTES[i]);
|
||||
}
|
||||
}
|
||||
clusterTs = Bytes.toLong(byteArr);
|
||||
@ -73,7 +74,8 @@ private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
|
||||
for (int i = 0; i < sepLen; i++) {
|
||||
assertTrue(
|
||||
"Row key prefix not encoded properly.",
|
||||
byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == QUALIFIER_SEP_BYTES[i]);
|
||||
byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
|
||||
QUALIFIER_SEP_BYTES[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1197,4 +1197,3 @@ container ID. Similarly, application attempt can be queried by specifying entity
|
||||
1. If any problem occurs in parsing request, HTTP 400 (Bad Request) is returned.
|
||||
1. If flow context information cannot be retrieved or entity for the given entity id cannot be found, HTTP 404 (Not Found) is returned.
|
||||
1. For non-recoverable errors while retrieving data, HTTP 500 (Internal Server Error) is returned.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user