YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.

This commit is contained in:
Rohith Sharma K S 2017-07-31 17:26:34 +05:30 committed by Varun Saxena
parent 9f6540535d
commit 7594d1de7b
39 changed files with 830 additions and 151 deletions

View File

@ -878,13 +878,16 @@ private List<Container> getResources() throws Exception {
handleUpdatedNodes(response);
handleJobPriorityChange(response);
// handle receiving the timeline collector address for this app
String collectorAddr = response.getCollectorAddr();
String collectorAddr = null;
if (response.getCollectorInfo() != null) {
collectorAddr = response.getCollectorInfo().getCollectorAddr();
}
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
&& appContext.getTimelineV2Client() != null) {
appContext.getTimelineV2Client().setTimelineServiceAddress(
response.getCollectorAddr());
appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
}
for (ContainerStatus cont : finishedContainers) {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -92,21 +93,21 @@ public static AllocateResponse newInstance(int responseId,
.preemptionMessage(preempt).nmTokens(nmTokens).build();
}
@Public
@Private
@Unstable
public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens,
List<UpdatedContainer> updatedContainers) {
CollectorInfo collectorInfo) {
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
.updatedContainers(updatedContainers).build();
.collectorInfo(collectorInfo).build();
}
@Private
@ -133,7 +134,7 @@ public static AllocateResponse newInstance(int responseId,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<UpdatedContainer> updatedContainers, String collectorAddr) {
List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) {
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
@ -141,7 +142,7 @@ public static AllocateResponse newInstance(int responseId,
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
.updatedContainers(updatedContainers).amRmToken(amRMToken)
.collectorAddr(collectorAddr).build();
.collectorInfo(collectorInfo).build();
}
/**
@ -333,17 +334,18 @@ public abstract void setUpdatedContainers(
public abstract void setApplicationPriority(Priority priority);
/**
* The address of collector that belong to this app
* The data associated with the collector that belongs to this app. Contains
* address and token alongwith identification information.
*
* @return The address of collector that belong to this attempt
* @return The data of collector that belong to this attempt
*/
@Public
@Unstable
public abstract String getCollectorAddr();
public abstract CollectorInfo getCollectorInfo();
@Private
@Unstable
public abstract void setCollectorAddr(String collectorAddr);
public abstract void setCollectorInfo(CollectorInfo info);
/**
* Get the list of container update errors to inform the
@ -559,15 +561,17 @@ public AllocateResponseBuilder applicationPriority(
}
/**
* Set the <code>collectorAddr</code> of the response.
* @see AllocateResponse#setCollectorAddr(String)
* @param collectorAddr <code>collectorAddr</code> of the response
* Set the <code>collectorInfo</code> of the response.
* @see AllocateResponse#setCollectorInfo(CollectorInfo)
* @param collectorInfo <code>collectorInfo</code> of the response which
* contains collector address, RM id, version and collector token.
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder collectorAddr(String collectorAddr) {
allocateResponse.setCollectorAddr(collectorAddr);
public AllocateResponseBuilder collectorInfo(
CollectorInfo collectorInfo) {
allocateResponse.setCollectorInfo(collectorInfo);
return this;
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.util.Records;
/**
* Collector info containing collector address and collector token passed from
* RM to AM in Allocate Response.
*/
@Private
@InterfaceStability.Unstable
public abstract class CollectorInfo {
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
public static CollectorInfo newInstance(String collectorAddr, Token token) {
CollectorInfo amCollectorInfo =
Records.newRecord(CollectorInfo.class);
amCollectorInfo.setCollectorAddr(collectorAddr);
amCollectorInfo.setCollectorToken(token);
return amCollectorInfo;
}
public abstract String getCollectorAddr();
public abstract void setCollectorAddr(String addr);
/**
* Get delegation token for app collector which AM will use to publish
* entities.
* @return the delegation token for app collector.
*/
public abstract Token getCollectorToken();
public abstract void setCollectorToken(Token token);
}

View File

@ -613,3 +613,8 @@ message StringBytesMapProto {
optional string key = 1;
optional bytes value = 2;
}
message CollectorInfoProto {
optional string collector_addr = 1;
optional hadoop.common.TokenProto collector_token = 2;
}

View File

@ -112,7 +112,7 @@ message AllocateResponseProto {
repeated NMTokenProto nm_tokens = 9;
optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13;
optional string collector_addr = 14;
optional CollectorInfoProto collector_info = 14;
repeated UpdateContainerErrorProto update_errors = 15;
repeated UpdatedContainerProto updated_containers = 16;
}

View File

@ -325,7 +325,11 @@ public void run() {
}
AllocateResponse response = (AllocateResponse) object;
String collectorAddress = response.getCollectorAddr();
String collectorAddress = null;
if (response.getCollectorInfo() != null) {
collectorAddress = response.getCollectorInfo().getCollectorAddr();
}
TimelineV2Client timelineClient =
client.getRegisteredTimelineV2Client();
if (timelineClient != null && collectorAddress != null

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.junit.After;
/**
* Test Base for Application Master Service Protocol.
*/
public abstract class ApplicationMasterServiceProtoTestBase
extends ProtocolHATestBase {
private ApplicationMasterProtocol amClient;
private ApplicationAttemptId attemptId;
protected void startupHAAndSetupClient() throws Exception {
attemptId = this.cluster.createFakeApplicationAttemptId();
Token<AMRMTokenIdentifier> appToken =
this.cluster.getResourceManager().getRMContext()
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appToken);
syncToken(appToken);
amClient = ClientRMProxy
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
}
@After
public void shutDown() {
if(this.amClient != null) {
RPC.stopProxy(this.amClient);
}
}
protected ApplicationMasterProtocol getAMClient() {
return amClient;
}
private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
this.cluster.getResourceManager(i).getRMContext()
.getAMRMTokenSecretManager().addPersistedPassword(token);
}
}
}

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -804,6 +805,14 @@ public FinishApplicationMasterResponse finishApplicationMaster(
}
public AllocateResponse createFakeAllocateResponse() {
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
return AllocateResponse.newInstance(-1,
new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1,
null, new ArrayList<NMToken>(), CollectorInfo.newInstance(
"host:port", Token.newInstance(new byte[] {0}, "TIMELINE",
new byte[] {0}, "rm")));
} else {
return AllocateResponse.newInstance(-1,
new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(),
@ -811,5 +820,6 @@ public AllocateResponse createFakeAllocateResponse() {
null, new ArrayList<NMToken>());
}
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Tests Application Master Protocol with timeline service v2 enabled.
*/
public class TestApplicationMasterServiceProtocolForTimelineV2
extends ApplicationMasterServiceProtoTestBase {
@Before
public void initialize() throws Exception {
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf);
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf);
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
startHACluster(0, false, false, true);
super.startupHAAndSetupClient();
}
@Test(timeout = 15000)
public void testAllocateForTimelineV2OnHA()
throws YarnException, IOException {
AllocateRequest request = AllocateRequest.newInstance(0, 50f,
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>(),
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
new ArrayList<String>()));
AllocateResponse response = getAMClient().allocate(request);
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
Assert.assertNotNull(response.getCollectorInfo());
Assert.assertEquals("host:port",
response.getCollectorInfo().getCollectorAddr());
Assert.assertNotNull(response.getCollectorInfo().getCollectorToken());
}
}

View File

@ -23,10 +23,6 @@
import org.junit.Assert;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -34,45 +30,20 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestApplicationMasterServiceProtocolOnHA
extends ProtocolHATestBase {
private ApplicationMasterProtocol amClient;
private ApplicationAttemptId attemptId ;
extends ApplicationMasterServiceProtoTestBase {
@Before
public void initialize() throws Exception {
startHACluster(0, false, false, true);
attemptId = this.cluster.createFakeApplicationAttemptId();
Token<AMRMTokenIdentifier> appToken =
this.cluster.getResourceManager().getRMContext()
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appToken);
syncToken(appToken);
amClient = ClientRMProxy
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
}
@After
public void shutDown() {
if(this.amClient != null) {
RPC.stopProxy(this.amClient);
}
super.startupHAAndSetupClient();
}
@Test(timeout = 15000)
@ -81,7 +52,7 @@ public void testRegisterApplicationMasterOnHA() throws YarnException,
RegisterApplicationMasterRequest request =
RegisterApplicationMasterRequest.newInstance("localhost", 0, "");
RegisterApplicationMasterResponse response =
amClient.registerApplicationMaster(request);
getAMClient().registerApplicationMaster(request);
Assert.assertEquals(response,
this.cluster.createFakeRegisterApplicationMasterResponse());
}
@ -93,7 +64,7 @@ public void testFinishApplicationMasterOnHA() throws YarnException,
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
FinishApplicationMasterResponse response =
amClient.finishApplicationMaster(request);
getAMClient().finishApplicationMaster(request);
Assert.assertEquals(response,
this.cluster.createFakeFinishApplicationMasterResponse());
}
@ -105,14 +76,7 @@ public void testAllocateOnHA() throws YarnException, IOException {
new ArrayList<ContainerId>(),
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
new ArrayList<String>()));
AllocateResponse response = amClient.allocate(request);
AllocateResponse response = getAMClient().allocate(request);
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
}
private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
this.cluster.getResourceManager(i).getRMContext()
.getAMRMTokenSecretManager().addPersistedPassword(token);
}
}
}

View File

@ -426,7 +426,7 @@ private AllocateResponse createAllocateResponse(
}
AllocateResponse response =
AllocateResponse.newInstance(0, completed, allocated,
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, null,
updatedContainers);
return response;
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -38,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@ -48,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@ -80,6 +83,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
private PreemptionMessage preempt;
private Token amrmToken = null;
private Priority appPriority = null;
private CollectorInfo collectorInfo = null;
public AllocateResponsePBImpl() {
builder = AllocateResponseProto.newBuilder();
@ -162,6 +166,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.amrmToken != null) {
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
}
if (this.collectorInfo != null) {
builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo));
}
if (this.appPriority != null) {
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
}
@ -398,19 +405,25 @@ public synchronized void setAMRMToken(Token amRMToken) {
@Override
public synchronized String getCollectorAddr() {
public synchronized CollectorInfo getCollectorInfo() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getCollectorAddr();
if (this.collectorInfo != null) {
return this.collectorInfo;
}
if (!p.hasCollectorInfo()) {
return null;
}
this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo());
return this.collectorInfo;
}
@Override
public synchronized void setCollectorAddr(String collectorAddr) {
public synchronized void setCollectorInfo(CollectorInfo info) {
maybeInitBuilder();
if (collectorAddr == null) {
builder.clearCollectorAddr();
return;
if (info == null) {
builder.clearCollectorInfo();
}
builder.setCollectorAddr(collectorAddr);
this.collectorInfo = info;
}
@Override
@ -718,6 +731,16 @@ private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
return ((NodeReportPBImpl)t).getProto();
}
private synchronized CollectorInfoPBImpl convertFromProtoFormat(
CollectorInfoProto p) {
return new CollectorInfoPBImpl(p);
}
private synchronized CollectorInfoProto convertToProtoFormat(
CollectorInfo t) {
return ((CollectorInfoPBImpl)t).getProto();
}
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder;
import com.google.protobuf.TextFormat;
/**
* Protocol record implementation of {@link CollectorInfo}.
*/
public class CollectorInfoPBImpl extends CollectorInfo {
private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance();
private CollectorInfoProto.Builder builder = null;
private boolean viaProto = false;
private String collectorAddr = null;
private Token collectorToken = null;
public CollectorInfoPBImpl() {
builder = CollectorInfoProto.newBuilder();
}
public CollectorInfoPBImpl(CollectorInfoProto proto) {
this.proto = proto;
viaProto = true;
}
public CollectorInfoProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = CollectorInfoProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getCollectorAddr() {
CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
if (this.collectorAddr == null && p.hasCollectorAddr()) {
this.collectorAddr = p.getCollectorAddr();
}
return this.collectorAddr;
}
@Override
public void setCollectorAddr(String addr) {
maybeInitBuilder();
if (collectorAddr == null) {
builder.clearCollectorAddr();
}
this.collectorAddr = addr;
}
@Override
public Token getCollectorToken() {
CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
if (this.collectorToken == null && p.hasCollectorToken()) {
this.collectorToken = convertFromProtoFormat(p.getCollectorToken());
}
return this.collectorToken;
}
@Override
public void setCollectorToken(Token token) {
maybeInitBuilder();
if (token == null) {
builder.clearCollectorToken();
}
this.collectorToken = token;
}
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
return new TokenPBImpl(p);
}
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl) t).getProto();
}
private void mergeLocalToBuilder() {
if (this.collectorAddr != null) {
builder.setCollectorAddr(this.collectorAddr);
}
if (this.collectorToken != null) {
builder.setCollectorToken(convertToProtoFormat(this.collectorToken));
}
}
}

View File

@ -101,6 +101,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -406,6 +407,7 @@ public static void setup() throws Exception {
generateByNewInstance(CommitResponse.class);
generateByNewInstance(ApplicationTimeout.class);
generateByNewInstance(QueueConfigurations.class);
generateByNewInstance(CollectorInfo.class);
}
@Test

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.util.Records;
@ -37,11 +38,11 @@ public static ReportNewCollectorInfoRequest newInstance(
}
public static ReportNewCollectorInfoRequest newInstance(
ApplicationId id, String collectorAddr) {
ApplicationId id, String collectorAddr, Token token) {
ReportNewCollectorInfoRequest request =
Records.newRecord(ReportNewCollectorInfoRequest.class);
request.setAppCollectorsList(
Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token)));
return request;
}

View File

@ -26,23 +26,26 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@ -164,9 +167,13 @@ private void addRegisteringCollectorsToProto() {
builder.clearRegisteringCollectors();
for (Map.Entry<ApplicationId, AppCollectorData> entry :
registeringCollectors.entrySet()) {
AppCollectorData data = entry.getValue();
builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setAppCollectorAddr(entry.getValue().getCollectorAddr()));
.setAppCollectorAddr(data.getCollectorAddr())
.setAppCollectorToken(convertToProtoFormat(data.getCollectorToken()))
.setRmIdentifier(data.getRMIdentifier())
.setVersion(data.getVersion()));
}
}
@ -267,8 +274,10 @@ private void initRegisteredCollectors() {
this.registeringCollectors = new HashMap<>();
for (AppCollectorDataProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
AppCollectorData data = AppCollectorData.newInstance(appId,
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
collectorToken);
this.registeringCollectors.put(appId, data);
}
}
@ -309,6 +318,14 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto();
}
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
return new TokenPBImpl(p);
}
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl) t).getProto();
}
@Override
public Set<NodeLabel> getNodeLabels() {
initNodeLabels();

View File

@ -26,31 +26,34 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -154,6 +157,8 @@ private void addAppCollectorsMapToProto() {
builder.addAppCollectors(AppCollectorDataProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setAppCollectorAddr(data.getCollectorAddr())
.setAppCollectorToken(
convertToProtoFormat(entry.getValue().getCollectorToken()))
.setRmIdentifier(data.getRMIdentifier())
.setVersion(data.getVersion()));
}
@ -599,8 +604,10 @@ private void initAppCollectorsMap() {
this.appCollectorsMap = new HashMap<>();
for (AppCollectorDataProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
AppCollectorData data = AppCollectorData.newInstance(appId,
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
collectorToken);
this.appCollectorsMap.put(appId, data);
}
}
@ -780,5 +787,13 @@ private SignalContainerRequestProto convertToProtoFormat(
SignalContainerRequest t) {
return ((SignalContainerRequestPBImpl)t).getProto();
}
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl) t).getProto();
}
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
return new TokenPBImpl(p);
}
}

View File

@ -20,12 +20,12 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
public class ReportNewCollectorInfoRequestPBImpl extends
ReportNewCollectorInfoRequest {

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
@ -31,20 +32,32 @@ public abstract class AppCollectorData {
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
public static AppCollectorData newInstance(
ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
ApplicationId id, String collectorAddr, long rmIdentifier, long version,
Token token) {
AppCollectorData appCollectorData =
Records.newRecord(AppCollectorData.class);
appCollectorData.setApplicationId(id);
appCollectorData.setCollectorAddr(collectorAddr);
appCollectorData.setRMIdentifier(rmIdentifier);
appCollectorData.setVersion(version);
appCollectorData.setCollectorToken(token);
return appCollectorData;
}
public static AppCollectorData newInstance(
ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
return newInstance(id, collectorAddr, rmIdentifier, version, null);
}
public static AppCollectorData newInstance(ApplicationId id,
String collectorAddr, Token token) {
return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
DEFAULT_TIMESTAMP_VALUE, token);
}
public static AppCollectorData newInstance(ApplicationId id,
String collectorAddr) {
return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
DEFAULT_TIMESTAMP_VALUE);
return newInstance(id, collectorAddr, null);
}
/**
@ -101,4 +114,12 @@ public boolean isStamped() {
public abstract void setVersion(long version);
/**
* Get delegation token for app collector which AM will use to publish
* entities.
* @return the delegation token for app collector.
*/
public abstract Token getCollectorToken();
public abstract void setCollectorToken(Token token);
}

View File

@ -19,10 +19,11 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
private String collectorAddr = null;
private Long rmIdentifier = null;
private Long version = null;
private Token collectorToken = null;
public AppCollectorDataPBImpl() {
builder = AppCollectorDataProto.newBuilder();
@ -158,6 +160,24 @@ public void setVersion(long version) {
builder.setVersion(version);
}
@Override
public Token getCollectorToken() {
AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
if (this.collectorToken == null && p.hasAppCollectorToken()) {
this.collectorToken = new TokenPBImpl(p.getAppCollectorToken());
}
return this.collectorToken;
}
@Override
public void setCollectorToken(Token token) {
maybeInitBuilder();
if (token == null) {
builder.clearAppCollectorToken();
}
this.collectorToken = token;
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
@ -195,6 +215,9 @@ private void mergeLocalToBuilder() {
if (this.version != null) {
builder.setVersion(this.version);
}
if (this.collectorToken != null) {
builder.setAppCollectorToken(
((TokenPBImpl)this.collectorToken).getProto());
}
}
}

View File

@ -22,6 +22,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "Security.proto";
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
@ -139,6 +140,7 @@ message AppCollectorDataProto {
optional string app_collector_addr = 2;
optional int64 rm_identifier = 3 [default = -1];
optional int64 version = 4 [default = -1];
optional hadoop.common.TokenProto app_collector_token = 5;
}
//////////////////////////////////////////////////////

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
@ -72,6 +73,7 @@
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
@ -93,6 +95,21 @@ public class TestRPC {
"collectors' number in ReportNewCollectorInfoRequest is not ONE.";
public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
private static final Token DEFAULT_COLLECTOR_TOKEN;
static {
TimelineDelegationTokenIdentifier identifier =
new TimelineDelegationTokenIdentifier();
identifier.setOwner(new Text("user"));
identifier.setRenewer(new Text("user"));
identifier.setRealUser(new Text("user"));
long now = Time.now();
identifier.setIssueDate(now);
identifier.setMaxDate(now + 1000L);
identifier.setMasterKeyId(500);
identifier.setSequenceNumber(5);
DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(),
identifier.getKind().toString(), identifier.getBytes(), "localhost:0");
}
public static final ApplicationId DEFAULT_APP_ID =
ApplicationId.newInstance(0, 0);
@ -173,7 +190,16 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
try {
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(
DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null);
proxy.reportNewCollectorInfo(request);
} catch (YarnException e) {
Assert.fail("RPC call failured is not expected here.");
}
try {
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(
DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN);
proxy.reportNewCollectorInfo(request);
} catch (YarnException e) {
Assert.fail("RPC call failured is not expected here.");
@ -437,6 +463,8 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
DEFAULT_APP_ID);
Assert.assertEquals(appCollector.getCollectorAddr(),
DEFAULT_COLLECTOR_ADDR);
Assert.assertTrue(appCollector.getCollectorToken() == null ||
appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN));
} else {
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
@ -351,7 +352,8 @@ private HashSet<NodeLabel> getValidNodeLabels() {
private Map<ApplicationId, AppCollectorData> getCollectors() {
ApplicationId appID = ApplicationId.newInstance(1L, 1);
String collectorAddr = "localhost:0";
AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr,
Token.newInstance(new byte[0], "kind", new byte[0], "s"));
Map<ApplicationId, AppCollectorData> collectorMap =
new HashMap<>();
collectorMap.put(appID, data);

View File

@ -71,7 +71,6 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;

View File

@ -45,8 +45,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
@ -54,7 +55,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
@ -294,9 +294,9 @@ public void allocate(ApplicationAttemptId appAttemptId,
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(
getRmContext().getYarnConfiguration())) {
AppCollectorData data = app.getCollectorData();
if (data != null) {
response.setCollectorAddr(data.getCollectorAddr());
CollectorInfo collectorInfo = app.getCollectorInfo();
if (collectorInfo != null) {
response.setCollectorInfo(collectorInfo);
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -187,13 +188,23 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* only if the timeline service v.2 is enabled.
*
* @return the data for the application's collector, including collector
* address, collector ID. Return null if the timeline service v.2 is not
* enabled.
* address, RM ID, version and collector token. Return null if the timeline
* service v.2 is not enabled.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
AppCollectorData getCollectorData();
/**
* The timeline collector information to be sent to AM. It should be used
* only if the timeline service v.2 is enabled.
*
* @return collector info, including collector address and collector token.
* Return null if the timeline service v.2 is not enabled.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
CollectorInfo getCollectorInfo();
/**
* The original tracking url for the application master.
* @return the original tracking url for the application master.

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private int firstAttemptIdInStateStore = 1;
private int nextAttemptId = 1;
private AppCollectorData collectorData;
private CollectorInfo collectorInfo;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@ -530,7 +532,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
*/
public void startTimelineCollector() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(applicationId);
new AppLevelTimelineCollector(applicationId, user);
rmContext.getRMTimelineCollectorManager().putIfAbsent(
applicationId, collector);
}
@ -618,6 +620,12 @@ public AppCollectorData getCollectorData() {
public void setCollectorData(AppCollectorData incomingData) {
this.collectorData = incomingData;
this.collectorInfo = CollectorInfo.newInstance(
incomingData.getCollectorAddr(), incomingData.getCollectorToken());
}
public CollectorInfo getCollectorInfo() {
return this.collectorInfo;
}
public void removeCollectorData() {

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -239,6 +240,11 @@ public Priority getApplicationPriority() {
public boolean isAppInCompletedStates() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public CollectorInfo getCollectorInfo() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
public static RMApp newApplication(int i) {

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -325,4 +326,9 @@ public Priority getApplicationPriority() {
public boolean isAppInCompletedStates() {
return false;
}
@Override
public CollectorInfo getCollectorInfo() {
throw new UnsupportedOperationException("Not supported yet.");
}
}

View File

@ -80,7 +80,7 @@ public static void setupClass() throws Exception {
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
collectorManager, conf);
auxService.addApplication(ApplicationId.newInstance(0, 1));
auxService.addApplication(ApplicationId.newInstance(0, 1), "user");
} catch (ExitUtil.ExitException e) {
fail();
}

View File

@ -23,7 +23,10 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
@ -40,6 +43,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -51,10 +55,12 @@
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
@ -76,7 +82,6 @@ public class TestTimelineAuthFilterForV2 {
private static final String FOO_USER = "foo";
private static final String HTTP_USER = "HTTP";
private static final File TEST_ROOT_DIR = new File(
System.getProperty("test.build.dir", "target" + File.separator +
"test-dir"), UUID.randomUUID().toString());
@ -88,21 +93,35 @@ public class TestTimelineAuthFilterForV2 {
private static String httpSpnegoPrincipal = KerberosTestUtils.
getServerPrincipal();
// First param indicates whether HTTPS access or HTTP access and second param
// indicates whether it is kerberos access or token based access.
@Parameterized.Parameters
public static Collection<Object[]> withSsl() {
return Arrays.asList(new Object[][] {{false}, {true}});
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][] {{false, true}, {false, false},
{true, false}, {true, true}});
}
private static MiniKdc testMiniKDC;
private static String keystoresDir;
private static String sslConfDir;
private static Configuration conf;
private static UserGroupInformation nonKerberosUser;
static {
try {
nonKerberosUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {}
}
// Indicates whether HTTPS or HTTP access.
private boolean withSsl;
// Indicates whether Kerberos based login is used or token based access is
// done.
private boolean withKerberosLogin;
private NodeTimelineCollectorManager collectorManager;
private PerNodeTimelineCollectorsAuxService auxService;
public TestTimelineAuthFilterForV2(boolean withSsl) {
public TestTimelineAuthFilterForV2(boolean withSsl,
boolean withKerberosLogin) {
this.withSsl = withSsl;
this.withKerberosLogin = withKerberosLogin;
}
@BeforeClass
@ -143,8 +162,6 @@ public static void setup() {
conf.set("hadoop.proxyuser.HTTP.hosts", "*");
conf.set("hadoop.proxyuser.HTTP.users", FOO_USER);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
} catch (Exception e) {
fail("Couldn't setup TimelineServer V2.");
}
@ -166,9 +183,27 @@ public void initialize() throws Exception {
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_ONLY.name());
}
UserGroupInformation.setConfiguration(conf);
collectorManager = new DummyNodeTimelineCollectorManager();
auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
collectorManager, conf);
auxService = PerNodeTimelineCollectorsAuxService.launchServer(
new String[0], collectorManager, conf);
if (withKerberosLogin) {
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
}
ApplicationId appId = ApplicationId.newInstance(0, 1);
auxService.addApplication(
appId, UserGroupInformation.getCurrentUser().getUserName());
if (!withKerberosLogin) {
AppLevelTimelineCollector collector =
(AppLevelTimelineCollector)collectorManager.get(appId);
org.apache.hadoop.security.token.Token
<TimelineDelegationTokenIdentifier> token =
collector.getDelegationTokenForApp();
token.setService(new Text("localhost" + token.getService().toString().
substring(token.getService().toString().indexOf(":"))));
UserGroupInformation.getCurrentUser().addToken(token);
}
}
private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) {
@ -199,9 +234,14 @@ public void destroy() throws Exception {
}
if (withSsl) {
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
FileUtil.fullyDelete(new File(BASEDIR));
}
if (withKerberosLogin) {
UserGroupInformation.getCurrentUser().logoutUserFromKeytab();
}
// Reset the user for next run.
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser(nonKerberosUser.getUserName()));
}
private static TimelineEntity createEntity(String id, String type) {
@ -241,21 +281,8 @@ private static TimelineEntity readEntityFile(File entityFile)
}
}
@Test
public void testPutTimelineEntities() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
auxService.addApplication(appId);
final String entityType = "dummy_type";
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
File.separator + "entities" + File.separator +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + "test_user" +
File.separator + "test_flow_name" + File.separator +
"test_flow_version" + File.separator + "1" + File.separator +
appId.toString() + File.separator + entityType);
try {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override
public Void call() throws Exception {
private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir,
String entityType) throws Exception {
TimelineV2Client client = createTimelineClientForUGI(appId);
try {
// Sync call. Results available immediately.
@ -264,12 +291,34 @@ public Void call() throws Exception {
verifyEntity(entityTypeDir, "entity1", entityType);
// Async call.
client.putEntitiesAsync(createEntity("entity2", entityType));
return null;
} finally {
client.stop();
}
}
@Test
public void testPutTimelineEntities() throws Exception {
final String entityType = "dummy_type";
ApplicationId appId = ApplicationId.newInstance(0, 1);
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
File.separator + "entities" + File.separator +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
UserGroupInformation.getCurrentUser().getUserName() +
File.separator + "test_flow_name" + File.separator +
"test_flow_version" + File.separator + "1" + File.separator +
appId.toString() + File.separator + entityType);
try {
if (withKerberosLogin) {
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
@Override
public Void call() throws Exception {
publishAndVerifyEntity(appId, entityTypeDir, entityType);
return null;
}
});
} else {
publishAndVerifyEntity(appId, entityTypeDir, entityType);
}
// Wait for async entity to be published.
for (int i = 0; i < 50; i++) {
if (entityTypeDir.listFiles().length == 2) {
@ -279,6 +328,11 @@ public Void call() throws Exception {
}
assertEquals(2, entityTypeDir.listFiles().length);
verifyEntity(entityTypeDir, "entity2", entityType);
AppLevelTimelineCollector collector =
(AppLevelTimelineCollector)collectorManager.get(appId);
auxService.removeApplication(appId);
verify(collectorManager.getTokenManagerService()).cancelToken(
eq(collector.getDelegationTokenForApp()), any(String.class));
} finally {
FileUtils.deleteQuietly(entityTypeDir);
}
@ -290,13 +344,20 @@ private static class DummyNodeTimelineCollectorManager extends
super();
}
@Override
protected TimelineV2DelegationTokenSecretManagerService
createTokenManagerService() {
return spy(new TimelineV2DelegationTokenSecretManagerService());
}
@Override
protected CollectorNodemanagerProtocol getNMCollectorService() {
CollectorNodemanagerProtocol protocol =
mock(CollectorNodemanagerProtocol.class);
try {
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance("test_user",
GetTimelineCollectorContextResponse.newInstance(
UserGroupInformation.getCurrentUser().getUserName(),
"test_flow_name", "test_flow_version", 1L);
when(protocol.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);

View File

@ -22,9 +22,12 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,13 +45,20 @@ public class AppLevelTimelineCollector extends TimelineCollector {
LoggerFactory.getLogger(TimelineCollector.class);
private final ApplicationId appId;
private final String appUser;
private final TimelineCollectorContext context;
private UserGroupInformation currentUser;
private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp;
public AppLevelTimelineCollector(ApplicationId appId) {
this(appId, null);
}
public AppLevelTimelineCollector(ApplicationId appId, String user) {
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
Preconditions.checkNotNull(appId, "AppId shouldn't be null");
this.appId = appId;
this.appUser = user;
context = new TimelineCollectorContext();
}
@ -56,6 +66,20 @@ public UserGroupInformation getCurrentUser() {
return currentUser;
}
public String getAppUser() {
return appUser;
}
void setDelegationTokenForApp(
Token<TimelineDelegationTokenIdentifier> token) {
this.delegationTokenForApp = token;
}
@VisibleForTesting
public Token<TimelineDelegationTokenIdentifier> getDelegationTokenForApp() {
return this.delegationTokenForApp;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,

View File

@ -56,8 +56,8 @@ public class AppLevelTimelineCollectorWithAgg
private ScheduledThreadPoolExecutor appAggregationExecutor;
private AppLevelAggregator appAggregator;
public AppLevelTimelineCollectorWithAgg(ApplicationId appId) {
super(appId);
public AppLevelTimelineCollectorWithAgg(ApplicationId appId, String user) {
super(appId, user);
}
private static Set<String> initializeSkipSet() {

View File

@ -28,14 +28,17 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
@ -71,6 +74,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private final boolean runningAsAuxService;
private UserGroupInformation loginUGI;
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
@VisibleForTesting
@ -85,25 +90,40 @@ protected NodeTimelineCollectorManager(boolean asAuxService) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
tokenMgrService = new TimelineV2DelegationTokenSecretManagerService();
tokenMgrService = createTokenManagerService();
addService(tokenMgrService);
this.loginUGI = UserGroupInformation.getCurrentUser();
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) {
if (UserGroupInformation.isSecurityEnabled()) {
// Do security login for cases where collector is running outside NM.
if (!runningAsAuxService) {
try {
doSecureLogin();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
}
this.loginUGI = UserGroupInformation.getLoginUser();
}
super.serviceStart();
startWebApp();
}
protected TimelineV2DelegationTokenSecretManagerService
createTokenManagerService() {
return new TimelineV2DelegationTokenSecretManagerService();
}
@VisibleForTesting
public TimelineV2DelegationTokenSecretManagerService
getTokenManagerService() {
return tokenMgrService;
}
private void doSecureLogin() throws IOException {
Configuration conf = getConfig();
InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed(
@ -122,13 +142,45 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
@VisibleForTesting
public Token<TimelineDelegationTokenIdentifier> generateTokenForAppCollector(
String user) {
Token<TimelineDelegationTokenIdentifier> token = tokenMgrService.
generateToken(UserGroupInformation.createRemoteUser(user),
loginUGI.getShortUserName());
token.setService(new Text(timelineRestServerBindAddress));
return token;
}
@VisibleForTesting
public void cancelTokenForAppCollector(
AppLevelTimelineCollector appCollector) throws IOException {
if (appCollector.getDelegationTokenForApp() != null) {
tokenMgrService.cancelToken(appCollector.getDelegationTokenForApp(),
appCollector.getAppUser());
}
}
@Override
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
try {
// Get context info from NM
updateTimelineCollectorContext(appId, collector);
// Generate token for app collector.
org.apache.hadoop.yarn.api.records.Token token = null;
if (UserGroupInformation.isSecurityEnabled() &&
collector instanceof AppLevelTimelineCollector) {
AppLevelTimelineCollector appCollector =
(AppLevelTimelineCollector)collector;
Token<TimelineDelegationTokenIdentifier> timelineToken =
generateTokenForAppCollector(appCollector.getAppUser());
appCollector.setDelegationTokenForApp(timelineToken);
token = org.apache.hadoop.yarn.api.records.Token.newInstance(
timelineToken.getIdentifier(), timelineToken.getKind().toString(),
timelineToken.getPassword(), timelineToken.getService().toString());
}
// Report to NM if a new collector is added.
reportNewCollectorToNM(appId);
reportNewCollectorToNM(appId, token);
} catch (YarnException | IOException e) {
// throw exception here as it cannot be used if failed communicate with NM
LOG.error("Failed to communicate with NM Collector Service for " + appId);
@ -136,6 +188,18 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
}
}
@Override
protected void postRemove(ApplicationId appId, TimelineCollector collector) {
if (collector instanceof AppLevelTimelineCollector) {
try {
cancelTokenForAppCollector((AppLevelTimelineCollector)collector);
} catch (IOException e) {
LOG.warn("Failed to cancel token for app collector with appId " +
appId, e);
}
}
}
/**
* Launch the REST web server for this collector manager.
*/
@ -180,11 +244,12 @@ private void startWebApp() {
timelineRestServerBindAddress);
}
private void reportNewCollectorToNM(ApplicationId appId)
private void reportNewCollectorToNM(ApplicationId appId,
org.apache.hadoop.yarn.api.records.Token token)
throws YarnException, IOException {
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(appId,
this.timelineRestServerBindAddress);
this.timelineRestServerBindAddress, token);
LOG.info("Report a new collector for application: " + appId +
" to the NM Collector Service.");
getNMCollectorService().reportNewCollectorInfo(request);

View File

@ -114,11 +114,12 @@ protected void serviceStop() throws Exception {
* exists, no new service is created.
*
* @param appId Application Id to be added.
* @param user Application Master container user.
* @return whether it was added successfully
*/
public boolean addApplication(ApplicationId appId) {
public boolean addApplication(ApplicationId appId, String user) {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollectorWithAgg(appId);
new AppLevelTimelineCollectorWithAgg(appId, user);
return (collectorManager.putIfAbsent(appId, collector)
== collector);
}
@ -147,7 +148,7 @@ public void initializeContainer(ContainerInitializationContext context) {
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
ApplicationId appId = context.getContainerId().
getApplicationAttemptId().getApplicationId();
addApplication(appId);
addApplication(appId, context.getUser());
}
}

View File

@ -18,8 +18,13 @@
package org.apache.hadoop.yarn.server.timelineservice.security;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
@ -43,6 +48,17 @@ public TimelineV2DelegationTokenSecretManagerService() {
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval);
}
public Token<TimelineDelegationTokenIdentifier> generateToken(
UserGroupInformation ugi, String renewer) {
return ((TimelineV2DelegationTokenSecretManager)
getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer);
}
public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
String canceller) throws IOException {
getTimelineDelegationTokenSecretManager().cancelToken(token, canceller);
}
/**
* Delegation token secret manager for ATSv2.
*/
@ -70,6 +86,21 @@ public TimelineV2DelegationTokenSecretManager(
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
}
public Token<TimelineDelegationTokenIdentifier> generateToken(
UserGroupInformation ugi, String renewer) {
Text realUser = null;
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}
TimelineDelegationTokenIdentifier identifier = createIdentifier();
identifier.setOwner(new Text(ugi.getUserName()));
identifier.setRenewer(new Text(renewer));
identifier.setRealUser(realUser);
byte[] password = createPassword(identifier);
return new Token<TimelineDelegationTokenIdentifier>(identifier.getBytes(),
password, identifier.getKind(), null);
}
@Override
public TimelineDelegationTokenIdentifier createIdentifier() {
return new TimelineDelegationTokenIdentifier();

View File

@ -95,7 +95,7 @@ public void testMultithreadedAdd() throws Exception {
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollectorWithAgg(appId);
new AppLevelTimelineCollectorWithAgg(appId, "user");
return (collectorManager.putIfAbsent(appId, collector) == collector);
}
};
@ -126,7 +126,7 @@ public void testMultithreadedAddAndRemove() throws Exception {
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollectorWithAgg(appId);
new AppLevelTimelineCollectorWithAgg(appId, "user");
boolean successPut =
(collectorManager.putIfAbsent(appId, collector) == collector);
return successPut && collectorManager.remove(appId);