MAPREDUCE-6838. [ATSv2 Security] Add timeline delegation token received in allocate response to UGI. Contributed by Varun Saxena

This commit is contained in:
Jian He 2017-08-21 22:08:07 -07:00 committed by Varun Saxena
parent bea3e4df76
commit 08f40bcc7f
10 changed files with 301 additions and 36 deletions

View File

@ -848,7 +848,8 @@ private List<Container> getResources() throws Exception {
updateAMRMToken(response.getAMRMToken());
}
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
List<ContainerStatus> finishedContainers =
response.getCompletedContainersStatuses();
// propagate preemption requests
final PreemptionMessage preemptReq = response.getPreemptionMessage();
@ -877,19 +878,13 @@ private List<Container> getResources() throws Exception {
handleUpdatedNodes(response);
handleJobPriorityChange(response);
// handle receiving the timeline collector address for this app
String collectorAddr = null;
if (response.getCollectorInfo() != null) {
collectorAddr = response.getCollectorInfo().getCollectorAddr();
}
// Handle receiving the timeline collector address and token for this app.
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
&& appContext.getTimelineV2Client() != null) {
appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
if (appContext.getTimelineV2Client() != null) {
appContext.getTimelineV2Client().
setTimelineCollectorInfo(response.getCollectorInfo());
}
for (ContainerStatus cont : finishedContainers) {
processFinishedContainer(cont);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@ -27,6 +28,7 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -99,6 +101,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -110,6 +113,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -121,6 +125,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -137,9 +142,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService.TimelineV2DelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
@ -748,6 +755,96 @@ private void validateLabelsRequests(ResourceRequest resourceRequest,
}
}
@Test
public void testUpdateCollectorInfo() throws Exception {
LOG.info("Running testUpdateCollectorInfo");
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
String localAddr = "localhost:1234";
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
// Generate a timeline delegation token.
TimelineDelegationTokenIdentifier ident =
new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
new Text("renewer"), null);
ident.setSequenceNumber(1);
Token<TimelineDelegationTokenIdentifier> collectorToken =
new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
new Text(localAddr));
org.apache.hadoop.yarn.api.records.Token token =
org.apache.hadoop.yarn.api.records.Token.newInstance(
collectorToken.getIdentifier(), collectorToken.getKind().toString(),
collectorToken.getPassword(),
collectorToken.getService().toString());
CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token);
// Mock scheduler to server Allocate request.
final MockSchedulerForTimelineCollector mockScheduler =
new MockSchedulerForTimelineCollector(collectorInfo);
MyContainerAllocator allocator =
new MyContainerAllocator(null, conf, attemptId, mockJob,
SystemClock.getInstance()) {
@Override
protected void register() {
}
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return mockScheduler;
}
};
// Initially UGI should have no tokens.
ArrayList<Token<? extends TokenIdentifier>> tokens =
new ArrayList<>(ugi.getTokens());
assertEquals(0, tokens.size());
TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId));
client.init(conf);
when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()).
thenReturn(client);
// Send allocate request to RM and fetch collector address and token.
allocator.schedule();
verify(client).setTimelineCollectorInfo(collectorInfo);
// Verify if token has been updated in UGI.
tokens = new ArrayList<>(ugi.getTokens());
assertEquals(1, tokens.size());
assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
tokens.get(0).getKind());
assertEquals(collectorToken.decodeIdentifier(),
tokens.get(0).decodeIdentifier());
// Generate new collector token, send allocate request to RM and fetch the
// new token.
ident.setSequenceNumber(100);
Token<TimelineDelegationTokenIdentifier> collectorToken1 =
new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
new Text(localAddr));
token = org.apache.hadoop.yarn.api.records.Token.newInstance(
collectorToken1.getIdentifier(), collectorToken1.getKind().toString(),
collectorToken1.getPassword(), collectorToken1.getService().toString());
collectorInfo = CollectorInfo.newInstance(localAddr, token);
mockScheduler.updateCollectorInfo(collectorInfo);
allocator.schedule();
verify(client).setTimelineCollectorInfo(collectorInfo);
// Verify if new token has been updated in UGI.
tokens = new ArrayList<>(ugi.getTokens());
assertEquals(1, tokens.size());
assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
tokens.get(0).getKind());
assertEquals(collectorToken1.decodeIdentifier(),
tokens.get(0).decodeIdentifier());
allocator.close();
}
@Test
public void testMapReduceScheduling() throws Exception {
@ -3488,6 +3585,46 @@ public void completeContainer(ContainerId containerId) {
}
}
private static class MockSchedulerForTimelineCollector
implements ApplicationMasterProtocol {
CollectorInfo collectorInfo;
public MockSchedulerForTimelineCollector(CollectorInfo info) {
this.collectorInfo = info;
}
void updateCollectorInfo(CollectorInfo info) {
collectorInfo = info;
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
return Records.newRecord(RegisterApplicationMasterResponse.class);
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
return FinishApplicationMasterResponse.newInstance(false);
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
AllocateResponse response = AllocateResponse.newInstance(
request.getResponseId(), Collections.<ContainerStatus>emptyList(),
Collections.<Container>emptyList(),
Collections.<NodeReport>emptyList(),
Resource.newInstance(512000, 1024), null, 10, null,
Collections.<NMToken>emptyList());
response.setCollectorInfo(collectorInfo);
return response;
}
}
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();

View File

@ -32,6 +32,10 @@ public abstract class CollectorInfo {
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
public static CollectorInfo newInstance(String collectorAddr) {
return newInstance(collectorAddr, null);
}
public static CollectorInfo newInstance(String collectorAddr, Token token) {
CollectorInfo amCollectorInfo =
Records.newRecord(CollectorInfo.class);

View File

@ -69,8 +69,6 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>
private volatile boolean keepRunning;
private volatile float progress;
private volatile String collectorAddr;
/**
*
* @param intervalMs heartbeat interval in milliseconds between AM and RM
@ -332,14 +330,9 @@ public void run() {
TimelineV2Client timelineClient =
client.getRegisteredTimelineV2Client();
if (timelineClient != null && collectorAddress != null
&& !collectorAddress.isEmpty()) {
if (collectorAddr == null
|| !collectorAddr.equals(collectorAddress)) {
collectorAddr = collectorAddress;
timelineClient.setTimelineServiceAddress(collectorAddress);
LOG.info("collectorAddress " + collectorAddress);
}
if (timelineClient != null && response.getCollectorInfo() != null) {
timelineClient.
setTimelineCollectorInfo(response.getCollectorInfo());
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -83,10 +85,13 @@ public abstract void putEntitiesAsync(TimelineEntity... entities)
/**
* <p>
* Update the timeline service address where the request will be sent to.
* Update collector info received in AllocateResponse which contains the
* timeline service address where the request will be sent to and the timeline
* delegation token which will be used to send the request.
* </p>
*
* @param address the timeline service address
* @param collectorInfo Collector info which contains the timeline service
* address and timeline delegation token.
*/
public abstract void setTimelineServiceAddress(String address);
public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo);
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
@ -39,15 +40,22 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@ -62,6 +70,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
private TimelineEntityDispatcher entityDispatcher;
private volatile String timelineServiceAddress;
@VisibleForTesting
volatile Token currentTimelineToken = null;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
@ -100,7 +110,6 @@ protected void serviceInit(Configuration conf) throws Exception {
authUgi = ugi;
doAsUser = null;
}
// TODO need to add/cleanup filter retry later for ATSV2. similar to V1
DelegationTokenAuthenticatedURL.Token token =
new DelegationTokenAuthenticatedURL.Token();
@ -144,8 +153,73 @@ public void putEntitiesAsync(TimelineEntity... entities)
}
@Override
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
public void setTimelineCollectorInfo(CollectorInfo collectorInfo) {
if (collectorInfo == null) {
LOG.warn("Not setting collector info as it is null.");
return;
}
// First update the token so that it is available when collector address is
// used.
if (collectorInfo.getCollectorToken() != null) {
// Use collector address to update token service if its not available.
setTimelineDelegationToken(
collectorInfo.getCollectorToken(), collectorInfo.getCollectorAddr());
}
// Update timeline service address.
if (collectorInfo.getCollectorAddr() != null &&
!collectorInfo.getCollectorAddr().isEmpty() &&
!collectorInfo.getCollectorAddr().equals(timelineServiceAddress)) {
this.timelineServiceAddress = collectorInfo.getCollectorAddr();
LOG.info("Updated timeline service address to " + timelineServiceAddress);
}
}
private void setTimelineDelegationToken(Token delegationToken,
String collectorAddr) {
// Checks below are to ensure that an invalid token is not updated in UGI.
// This is required because timeline token is set via a public API.
if (!delegationToken.getKind().equals(
TimelineDelegationTokenIdentifier.KIND_NAME.toString())) {
LOG.warn("Timeline token to be updated should be of kind " +
TimelineDelegationTokenIdentifier.KIND_NAME);
return;
}
if (collectorAddr == null || collectorAddr.isEmpty()) {
collectorAddr = timelineServiceAddress;
}
// Token need not be updated if either address or token service does not
// exist.
String service = delegationToken.getService();
if ((service == null || service.isEmpty()) &&
(collectorAddr == null || collectorAddr.isEmpty())) {
LOG.warn("Timeline token does not have service and timeline service " +
"address is not yet set. Not updating the token");
return;
}
// No need to update a duplicate token.
if (currentTimelineToken != null &&
currentTimelineToken.equals(delegationToken)) {
return;
}
currentTimelineToken = delegationToken;
// Convert the token, sanitize the token service and add it to UGI.
org.apache.hadoop.security.token.
Token<TimelineDelegationTokenIdentifier> timelineToken =
new org.apache.hadoop.security.token.
Token<TimelineDelegationTokenIdentifier>(
delegationToken.getIdentifier().array(),
delegationToken.getPassword().array(),
new Text(delegationToken.getKind()),
service == null ? new Text() : new Text(service));
// Prefer timeline service address over service coming in the token for
// updating the token service.
InetSocketAddress serviceAddr =
(collectorAddr != null && !collectorAddr.isEmpty()) ?
NetUtils.createSocketAddr(collectorAddr) :
SecurityUtil.getTokenServiceAddr(timelineToken);
SecurityUtil.setTokenService(timelineToken, serviceAddr);
authUgi.addToken(timelineToken);
LOG.info("Updated timeline delegation token " + timelineToken);
}
@Private

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@ -27,11 +32,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -151,7 +161,7 @@ public void testExceptionMultipleRetry() {
maxRetries);
c.init(conf);
c.start();
c.setTimelineServiceAddress("localhost:12345");
c.setTimelineCollectorInfo(CollectorInfo.newInstance("localhost:12345"));
try {
c.putEntities(new TimelineEntity());
} catch (IOException e) {
@ -310,6 +320,50 @@ public void testConfigurableNumberOfMerges() throws Exception {
}
}
@Test
public void testSetTimelineToken() throws Exception {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
assertEquals(0, ugi.getTokens().size());
assertNull("Timeline token in v2 client should not be set",
client.currentTimelineToken);
Token token = Token.newInstance(
new byte[0], "kind", new byte[0], "service");
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
assertNull("Timeline token in v2 client should not be set as token kind " +
"is unexepcted.", client.currentTimelineToken);
assertEquals(0, ugi.getTokens().size());
token = Token.newInstance(new byte[0], TimelineDelegationTokenIdentifier.
KIND_NAME.toString(), new byte[0], null);
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
assertNull("Timeline token in v2 client should not be set as serice is " +
"not set.", client.currentTimelineToken);
assertEquals(0, ugi.getTokens().size());
TimelineDelegationTokenIdentifier ident =
new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
new Text("renewer"), null);
ident.setSequenceNumber(1);
token = Token.newInstance(ident.getBytes(),
TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
"localhost:1234");
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token));
assertEquals(1, ugi.getTokens().size());
assertNotNull("Timeline token should be set in v2 client.",
client.currentTimelineToken);
assertEquals(token, client.currentTimelineToken);
ident.setSequenceNumber(20);
Token newToken = Token.newInstance(ident.getBytes(),
TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
"localhost:1234");
client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, newToken));
assertEquals(1, ugi.getTokens().size());
assertNotEquals(token, client.currentTimelineToken);
assertEquals(newToken, client.currentTimelineToken);
}
@Test
public void testAfterStop() throws Exception {
client.setSleepBeforeReturn(true);

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -437,7 +438,7 @@ public void setTimelineServiceAddress(ApplicationId appId,
String collectorAddr) {
TimelineV2Client client = appToClientMap.get(appId);
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
client.setTimelineCollectorInfo(CollectorInfo.newInstance(collectorAddr));
}
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
@ -99,9 +100,9 @@ public void testPutEntities() throws Exception {
TimelineV2Client client =
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
collectorManager.getRestServerBindAddress());
// Set the timeline service address manually.
client.setTimelineCollectorInfo(CollectorInfo.newInstance(
collectorManager.getRestServerBindAddress()));
client.init(conf);
client.start();
TimelineEntity entity = new TimelineEntity();
@ -126,9 +127,9 @@ public void testPutExtendedEntities() throws Exception {
TimelineV2Client client =
TimelineV2Client.createTimelineClient(appId);
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
collectorManager.getRestServerBindAddress());
// Set the timeline service address manually.
client.setTimelineCollectorInfo(CollectorInfo.newInstance(
collectorManager.getRestServerBindAddress()));
client.init(conf);
client.start();
ClusterEntity cluster = new ClusterEntity();

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -228,7 +229,7 @@ private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) {
String restBindAddr = collectorManager.getRestServerBindAddress();
String addr =
"localhost" + restBindAddr.substring(restBindAddr.indexOf(":"));
client.setTimelineServiceAddress(addr);
client.setTimelineCollectorInfo(CollectorInfo.newInstance(addr));
client.init(conf);
client.start();
return client;