YARN-6695. Fixed NPE in publishing appFinished events to ATSv2.
Contributed by Prabhu Joseph
This commit is contained in:
parent
b979fdde99
commit
df76cdc895
@ -473,10 +473,15 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) {
|
||||
}
|
||||
TimelineCollector timelineCollector =
|
||||
rmTimelineCollectorManager.get(appId);
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(entity);
|
||||
timelineCollector.putEntities(entities,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
if (timelineCollector != null) {
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(entity);
|
||||
timelineCollector.putEntities(entities,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
} else {
|
||||
LOG.debug("Cannot find active collector while publishing entity "
|
||||
+ entity);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error when publishing entity " + entity);
|
||||
LOG.debug("Error when publishing entity {}", entity, e);
|
||||
|
@ -28,12 +28,17 @@
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -293,6 +298,48 @@ public void testPublishContainerMetrics() throws Exception {
|
||||
TimelineServiceHelper.invertLong(containerId.getContainerId()));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testPutEntityWhenNoCollector() throws Exception {
|
||||
// Validating the logs as DrainDispatcher won't throw exception
|
||||
class TestAppender extends AppenderSkeleton {
|
||||
private final List<LoggingEvent> log = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public boolean requiresLayout() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void append(final LoggingEvent loggingEvent) {
|
||||
log.add(loggingEvent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public List<LoggingEvent> getLog() {
|
||||
return new ArrayList<>(log);
|
||||
}
|
||||
}
|
||||
|
||||
TestAppender appender = new TestAppender();
|
||||
final Logger logger = Logger.getRootLogger();
|
||||
logger.addAppender(appender);
|
||||
|
||||
try {
|
||||
RMApp app = createRMApp(ApplicationId.newInstance(0, 1));
|
||||
metricsPublisher.appCreated(app, app.getStartTime());
|
||||
dispatcher.await();
|
||||
for (LoggingEvent event : appender.getLog()) {
|
||||
assertFalse("Dispatcher Crashed",
|
||||
event.getRenderedMessage().contains("Error in dispatcher thread"));
|
||||
}
|
||||
} finally {
|
||||
logger.removeAppender(appender);
|
||||
}
|
||||
}
|
||||
|
||||
private RMApp createAppAndRegister(ApplicationId appId) {
|
||||
RMApp app = createRMApp(appId);
|
||||
|
||||
|
@ -330,14 +330,6 @@ Following are the basic configurations to start Timeline service v.2:
|
||||
<name>yarn.system-metrics-publisher.enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The setting that controls whether yarn container events are
|
||||
published to the timeline service or not by RM. This configuration setting
|
||||
is for ATS V2.</description>
|
||||
<name>yarn.rm.system-metrics-publisher.emit-container-events</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
If using an aux services manifest instead of setting aux services through the Configuration, ensure that the manifest services array includes the timeline\_collector service as follows:
|
||||
|
Loading…
Reference in New Issue
Block a user