YARN-8073 TimelineClientImpl doesn't honor yarn.timeline-service.versions configuration. Contributed by Rohith Sharma K S

This commit is contained in:
Vrushali C 2018-04-04 15:08:03 -07:00
parent 3087e89135
commit 345e7624d5
6 changed files with 61 additions and 16 deletions

View File

@ -269,7 +269,7 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.info("Emitting job history data to the timeline service is enabled"); LOG.info("Emitting job history data to the timeline service is enabled");
if (YarnConfiguration.timelineServiceEnabled(conf)) { if (YarnConfiguration.timelineServiceEnabled(conf)) {
boolean timelineServiceV2Enabled = boolean timelineServiceV2Enabled =
((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); YarnConfiguration.timelineServiceV2Enabled(conf);
if(timelineServiceV2Enabled) { if(timelineServiceV2Enabled) {
timelineV2Client = timelineV2Client =
((MRAppMaster.RunningAppContext)context).getTimelineV2Client(); ((MRAppMaster.RunningAppContext)context).getTimelineV2Client();

View File

@ -3796,6 +3796,27 @@ public static boolean timelineServiceV1Enabled(Configuration conf) {
return enabled; return enabled;
} }
/**
* Returns whether the timeline service v.1,5 is enabled via configuration.
*
* @param conf the configuration
* @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a
* version equal to 1.5.
*/
public static boolean timelineServiceV15Enabled(Configuration conf) {
boolean enabled = false;
if (timelineServiceEnabled(conf)) {
Collection<Float> versions = getTimelineServiceVersions(conf);
for (Float version : versions) {
if (Float.compare(version, 1.5f) == 0) {
enabled = true;
break;
}
}
}
return enabled;
}
/** /**
* Returns all the active timeline service versions. It does not check * Returns all the active timeline service versions. It does not check
* whether the timeline service itself is enabled. * whether the timeline service itself is enabled.

View File

@ -82,7 +82,7 @@ public class TimelineClientImpl extends TimelineClient {
@VisibleForTesting @VisibleForTesting
protected String doAsUser; protected String doAsUser;
private float timelineServiceVersion; private boolean timelineServiceV15Enabled;
private TimelineWriter timelineWriter; private TimelineWriter timelineWriter;
private String timelineServiceAddress; private String timelineServiceAddress;
@ -96,15 +96,15 @@ public TimelineClientImpl() {
} }
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
if (!YarnConfiguration.timelineServiceV1Enabled(conf)) { if (!YarnConfiguration.timelineServiceV1Enabled(conf)) {
throw new IOException("Timeline V1 client is not properly configured. " throw new IOException("Timeline V1 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to" + "Either timeline service is not enabled or version is not set to"
+ " 1.x"); + " 1.x");
} }
LOG.info("Timeline service address: " + getTimelineServiceAddress());
timelineServiceV15Enabled =
YarnConfiguration.timelineServiceV15Enabled(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser(); UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) { if (realUgi != null) {
@ -126,6 +126,7 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
} }
LOG.info("Timeline service address: " + getTimelineServiceAddress());
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -147,7 +148,7 @@ protected void serviceStart() throws Exception {
protected TimelineWriter createTimelineWriter(Configuration conf, protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation ugi, Client webClient, URI uri) UserGroupInformation ugi, Client webClient, URI uri)
throws IOException { throws IOException {
if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) { if (timelineServiceV15Enabled) {
return new FileSystemTimelineWriter( return new FileSystemTimelineWriter(
conf, ugi, webClient, uri); conf, ugi, webClient, uri);
} else { } else {
@ -406,10 +407,9 @@ public UserGroupInformation getUgi() {
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
TimelineEntityGroupId groupId, TimelineEntity... entities) TimelineEntityGroupId groupId, TimelineEntity... entities)
throws IOException, YarnException { throws IOException, YarnException {
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) { if (!timelineServiceV15Enabled) {
throw new YarnException( throw new YarnException(
"This API is not supported under current Timeline Service Version: " "This API is not supported under current Timeline Service Version:");
+ timelineServiceVersion);
} }
return timelineWriter.putEntities(appAttemptId, groupId, entities); return timelineWriter.putEntities(appAttemptId, groupId, entities);
@ -418,10 +418,9 @@ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
@Override @Override
public void putDomain(ApplicationAttemptId appAttemptId, public void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException { TimelineDomain domain) throws IOException, YarnException {
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) { if (!timelineServiceV15Enabled) {
throw new YarnException( throw new YarnException(
"This API is not supported under current Timeline Service Version: " "This API is not supported under current Timeline Service Version:");
+ timelineServiceVersion);
} }
timelineWriter.putDomain(appAttemptId, domain); timelineWriter.putDomain(appAttemptId, domain);
} }

View File

@ -126,8 +126,7 @@ public static float getTimelineServiceVersion(Configuration conf) {
* version equal to 1.5. * version equal to 1.5.
*/ */
public static boolean timelineServiceV1_5Enabled(Configuration conf) { public static boolean timelineServiceV1_5Enabled(Configuration conf) {
return timelineServiceEnabled(conf) && return YarnConfiguration.timelineServiceV15Enabled(conf);
Math.abs(getTimelineServiceVersion(conf) - 1.5) < 0.00001;
} }
public static TimelineAbout createTimelineAbout(String about) { public static TimelineAbout createTimelineAbout(String about) {

View File

@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.*;
@ -266,6 +267,31 @@ public void testTimelineServiceEventPublishingNoService() throws Exception {
runTest(false, false); runTest(false, false);
} }
@Test(timeout = 10000)
public void testTimelineServiceConfiguration()
throws Exception {
Configuration config = new Configuration(false);
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0,1.5");
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0");
Assert.assertTrue(YarnConfiguration.timelineServiceV2Enabled(config));
Assert.assertTrue(YarnConfiguration.timelineServiceV15Enabled(config));
Assert.assertTrue(YarnConfiguration.timelineServiceV1Enabled(config));
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0,1");
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.5");
Assert.assertTrue(YarnConfiguration.timelineServiceV2Enabled(config));
Assert.assertFalse(YarnConfiguration.timelineServiceV15Enabled(config));
Assert.assertTrue(YarnConfiguration.timelineServiceV1Enabled(config));
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0");
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.5");
Assert.assertTrue(YarnConfiguration.timelineServiceV2Enabled(config));
Assert.assertFalse(YarnConfiguration.timelineServiceV15Enabled(config));
Assert.assertFalse(YarnConfiguration.timelineServiceV1Enabled(config));
}
private void publishEvents(boolean v1Enabled, boolean v2Enabled) { private void publishEvents(boolean v1Enabled, boolean v2Enabled) {
long timestamp = (v1Enabled) ? 1 : 2; long timestamp = (v1Enabled) ? 1 : 2;
int id = (v2Enabled) ? 3 : 4; int id = (v2Enabled) ? 3 : 4;

View File

@ -235,7 +235,7 @@ static TimelineReaderServer startTimelineReaderServer(String[] args,
public static void main(String[] args) { public static void main(String[] args) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, 2.0f);
TimelineReaderServer server = startTimelineReaderServer(args, conf); TimelineReaderServer server = startTimelineReaderServer(args, conf);
server.join(); server.join();
} }