diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 09527977de..969ec4cdfc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -878,13 +878,16 @@ private List getResources() throws Exception { handleUpdatedNodes(response); handleJobPriorityChange(response); // handle receiving the timeline collector address for this app - String collectorAddr = response.getCollectorAddr(); + String collectorAddr = null; + if (response.getCollectorInfo() != null) { + collectorAddr = response.getCollectorInfo().getCollectorAddr(); + } + MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext(); if (collectorAddr != null && !collectorAddr.isEmpty() && appContext.getTimelineV2Client() != null) { - appContext.getTimelineV2Client().setTimelineServiceAddress( - response.getCollectorAddr()); + appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr); } for (ContainerStatus cont : finishedContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index d3ca765f91..9b254ae907 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -92,21 +93,21 @@ public static AllocateResponse newInstance(int responseId, .preemptionMessage(preempt).nmTokens(nmTokens).build(); } - @Public + @Private @Unstable public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, - List updatedContainers) { + CollectorInfo collectorInfo) { return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) .responseId(responseId) .completedContainersStatuses(completedContainers) .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) .availableResources(availResources).amCommand(command) .preemptionMessage(preempt).nmTokens(nmTokens) - .updatedContainers(updatedContainers).build(); + .collectorInfo(collectorInfo).build(); } @Private @@ -133,7 +134,7 @@ public static AllocateResponse newInstance(int responseId, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, - List updatedContainers, String collectorAddr) { + List updatedContainers, CollectorInfo collectorInfo) { return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) .responseId(responseId) .completedContainersStatuses(completedContainers) @@ -141,7 +142,7 @@ public static AllocateResponse newInstance(int responseId, .availableResources(availResources).amCommand(command) .preemptionMessage(preempt).nmTokens(nmTokens) .updatedContainers(updatedContainers).amRmToken(amRMToken) - .collectorAddr(collectorAddr).build(); + .collectorInfo(collectorInfo).build(); } /** @@ -333,17 +334,18 @@ public abstract void setUpdatedContainers( public abstract void setApplicationPriority(Priority priority); /** - * The address of collector that belong to this app + * The data associated with the collector that belongs to this app. Contains + * address and token alongwith identification information. * - * @return The address of collector that belong to this attempt + * @return The data of collector that belong to this attempt */ @Public @Unstable - public abstract String getCollectorAddr(); + public abstract CollectorInfo getCollectorInfo(); @Private @Unstable - public abstract void setCollectorAddr(String collectorAddr); + public abstract void setCollectorInfo(CollectorInfo info); /** * Get the list of container update errors to inform the @@ -559,15 +561,17 @@ public AllocateResponseBuilder applicationPriority( } /** - * Set the collectorAddr of the response. - * @see AllocateResponse#setCollectorAddr(String) - * @param collectorAddr collectorAddr of the response + * Set the collectorInfo of the response. + * @see AllocateResponse#setCollectorInfo(CollectorInfo) + * @param collectorInfo collectorInfo of the response which + * contains collector address, RM id, version and collector token. * @return {@link AllocateResponseBuilder} */ @Private @Unstable - public AllocateResponseBuilder collectorAddr(String collectorAddr) { - allocateResponse.setCollectorAddr(collectorAddr); + public AllocateResponseBuilder collectorInfo( + CollectorInfo collectorInfo) { + allocateResponse.setCollectorInfo(collectorInfo); return this; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java new file mode 100644 index 0000000000..960c992be2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.util.Records; + +/** + * Collector info containing collector address and collector token passed from + * RM to AM in Allocate Response. + */ +@Private +@InterfaceStability.Unstable +public abstract class CollectorInfo { + + protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + + public static CollectorInfo newInstance(String collectorAddr, Token token) { + CollectorInfo amCollectorInfo = + Records.newRecord(CollectorInfo.class); + amCollectorInfo.setCollectorAddr(collectorAddr); + amCollectorInfo.setCollectorToken(token); + return amCollectorInfo; + } + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + /** + * Get delegation token for app collector which AM will use to publish + * entities. + * @return the delegation token for app collector. + */ + public abstract Token getCollectorToken(); + + public abstract void setCollectorToken(Token token); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 81ebd798bb..c5f485fc3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -613,3 +613,8 @@ message StringBytesMapProto { optional string key = 1; optional bytes value = 2; } + +message CollectorInfoProto { + optional string collector_addr = 1; + optional hadoop.common.TokenProto collector_token = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index b92c46e945..7a7f03503c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -112,7 +112,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; - optional string collector_addr = 14; + optional CollectorInfoProto collector_info = 14; repeated UpdateContainerErrorProto update_errors = 15; repeated UpdatedContainerProto updated_containers = 16; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 6711da28cf..265badb13a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -325,7 +325,11 @@ public void run() { } AllocateResponse response = (AllocateResponse) object; - String collectorAddress = response.getCollectorAddr(); + String collectorAddress = null; + if (response.getCollectorInfo() != null) { + collectorAddress = response.getCollectorInfo().getCollectorAddr(); + } + TimelineV2Client timelineClient = client.getRegisteredTimelineV2Client(); if (timelineClient != null && collectorAddress != null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java new file mode 100644 index 0000000000..45210188b5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java @@ -0,0 +1,72 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; + +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; + +/** + * Test Base for Application Master Service Protocol. + */ +public abstract class ApplicationMasterServiceProtoTestBase + extends ProtocolHATestBase { + + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId; + + protected void startupHAAndSetupClient() throws Exception { + attemptId = this.cluster.createFakeApplicationAttemptId(); + + Token appToken = + this.cluster.getResourceManager().getRMContext() + .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); + appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + protected ApplicationMasterProtocol getAMClient() { + return amClient; + } + + private void syncToken(Token token) throws IOException { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + this.cluster.getResourceManager(i).getRMContext() + .getAMRMTokenSecretManager().addPersistedPassword(token); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index a8e91323a3..f4005e9021 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -804,11 +805,20 @@ public FinishApplicationMasterResponse finishApplicationMaster( } public AllocateResponse createFakeAllocateResponse() { - return AllocateResponse.newInstance(-1, - new ArrayList(), - new ArrayList(), new ArrayList(), - Resource.newInstance(1024, 2), null, 1, - null, new ArrayList()); + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + return AllocateResponse.newInstance(-1, + new ArrayList(), new ArrayList(), + new ArrayList(), Resource.newInstance(1024, 2), null, 1, + null, new ArrayList(), CollectorInfo.newInstance( + "host:port", Token.newInstance(new byte[] {0}, "TIMELINE", + new byte[] {0}, "rm"))); + } else { + return AllocateResponse.newInstance(-1, + new ArrayList(), + new ArrayList(), new ArrayList(), + Resource.newInstance(1024, 2), null, 1, + null, new ArrayList()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java new file mode 100644 index 0000000000..be8c3023d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java @@ -0,0 +1,71 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests Application Master Protocol with timeline service v2 enabled. + */ +public class TestApplicationMasterServiceProtocolForTimelineV2 + extends ApplicationMasterServiceProtoTestBase { + + @Before + public void initialize() throws Exception { + HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf); + HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + startHACluster(0, false, false, true); + super.startupHAAndSetupClient(); + } + + @Test(timeout = 15000) + public void testAllocateForTimelineV2OnHA() + throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList(), + new ArrayList(), + ResourceBlacklistRequest.newInstance(new ArrayList(), + new ArrayList())); + AllocateResponse response = getAMClient().allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + Assert.assertNotNull(response.getCollectorInfo()); + Assert.assertEquals("host:port", + response.getCollectorInfo().getCollectorAddr()); + Assert.assertNotNull(response.getCollectorInfo().getCollectorToken()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java index ad86fb377b..c2f39a1d4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java @@ -23,10 +23,6 @@ import org.junit.Assert; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.junit.After; import org.junit.Before; import org.junit.Test; public class TestApplicationMasterServiceProtocolOnHA - extends ProtocolHATestBase { - private ApplicationMasterProtocol amClient; - private ApplicationAttemptId attemptId ; - + extends ApplicationMasterServiceProtoTestBase { @Before public void initialize() throws Exception { startHACluster(0, false, false, true); - attemptId = this.cluster.createFakeApplicationAttemptId(); - - Token appToken = - this.cluster.getResourceManager().getRMContext() - .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); - UserGroupInformation.getCurrentUser().addToken(appToken); - syncToken(appToken); - - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); - } - - @After - public void shutDown() { - if(this.amClient != null) { - RPC.stopProxy(this.amClient); - } + super.startupHAAndSetupClient(); } @Test(timeout = 15000) @@ -81,7 +52,7 @@ public void testRegisterApplicationMasterOnHA() throws YarnException, RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); RegisterApplicationMasterResponse response = - amClient.registerApplicationMaster(request); + getAMClient().registerApplicationMaster(request); Assert.assertEquals(response, this.cluster.createFakeRegisterApplicationMasterResponse()); } @@ -93,7 +64,7 @@ public void testFinishApplicationMasterOnHA() throws YarnException, FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.SUCCEEDED, "", ""); FinishApplicationMasterResponse response = - amClient.finishApplicationMaster(request); + getAMClient().finishApplicationMaster(request); Assert.assertEquals(response, this.cluster.createFakeFinishApplicationMasterResponse()); } @@ -105,14 +76,7 @@ public void testAllocateOnHA() throws YarnException, IOException { new ArrayList(), ResourceBlacklistRequest.newInstance(new ArrayList(), new ArrayList())); - AllocateResponse response = amClient.allocate(request); + AllocateResponse response = getAMClient().allocate(request); Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); } - - private void syncToken(Token token) throws IOException { - for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { - this.cluster.getResourceManager(i).getRMContext() - .getAMRMTokenSecretManager().addPersistedPassword(token); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index ba38340975..56826c431c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -426,7 +426,7 @@ private AllocateResponse createAllocateResponse( } AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null, nmTokens, + new ArrayList(), null, null, 1, null, nmTokens, null, updatedContainers); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 1455d6a82a..ff35da81eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; @@ -80,6 +83,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { private PreemptionMessage preempt; private Token amrmToken = null; private Priority appPriority = null; + private CollectorInfo collectorInfo = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -162,6 +166,9 @@ private synchronized void mergeLocalToBuilder() { if (this.amrmToken != null) { builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); } + if (this.collectorInfo != null) { + builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo)); + } if (this.appPriority != null) { builder.setApplicationPriority(convertToProtoFormat(this.appPriority)); } @@ -398,19 +405,25 @@ public synchronized void setAMRMToken(Token amRMToken) { @Override - public synchronized String getCollectorAddr() { + public synchronized CollectorInfo getCollectorInfo() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - return p.getCollectorAddr(); + if (this.collectorInfo != null) { + return this.collectorInfo; + } + if (!p.hasCollectorInfo()) { + return null; + } + this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo()); + return this.collectorInfo; } @Override - public synchronized void setCollectorAddr(String collectorAddr) { + public synchronized void setCollectorInfo(CollectorInfo info) { maybeInitBuilder(); - if (collectorAddr == null) { - builder.clearCollectorAddr(); - return; + if (info == null) { + builder.clearCollectorInfo(); } - builder.setCollectorAddr(collectorAddr); + this.collectorInfo = info; } @Override @@ -718,6 +731,16 @@ private synchronized NodeReportProto convertToProtoFormat(NodeReport t) { return ((NodeReportPBImpl)t).getProto(); } + private synchronized CollectorInfoPBImpl convertFromProtoFormat( + CollectorInfoProto p) { + return new CollectorInfoPBImpl(p); + } + + private synchronized CollectorInfoProto convertToProtoFormat( + CollectorInfo t) { + return ((CollectorInfoPBImpl)t).getProto(); + } + private synchronized ContainerPBImpl convertFromProtoFormat( ContainerProto p) { return new ContainerPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java new file mode 100644 index 0000000000..bb541336e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java @@ -0,0 +1,148 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.records.CollectorInfo; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +/** + * Protocol record implementation of {@link CollectorInfo}. + */ +public class CollectorInfoPBImpl extends CollectorInfo { + + private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance(); + + private CollectorInfoProto.Builder builder = null; + private boolean viaProto = false; + + private String collectorAddr = null; + private Token collectorToken = null; + + + public CollectorInfoPBImpl() { + builder = CollectorInfoProto.newBuilder(); + } + + public CollectorInfoPBImpl(CollectorInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public CollectorInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CollectorInfoProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public String getCollectorAddr() { + CollectorInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorAddr == null && p.hasCollectorAddr()) { + this.collectorAddr = p.getCollectorAddr(); + } + return this.collectorAddr; + } + + @Override + public void setCollectorAddr(String addr) { + maybeInitBuilder(); + if (collectorAddr == null) { + builder.clearCollectorAddr(); + } + this.collectorAddr = addr; + } + + @Override + public Token getCollectorToken() { + CollectorInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorToken == null && p.hasCollectorToken()) { + this.collectorToken = convertFromProtoFormat(p.getCollectorToken()); + } + return this.collectorToken; + } + + @Override + public void setCollectorToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearCollectorToken(); + } + this.collectorToken = token; + } + + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private void mergeLocalToBuilder() { + if (this.collectorAddr != null) { + builder.setCollectorAddr(this.collectorAddr); + } + if (this.collectorToken != null) { + builder.setCollectorToken(convertToProtoFormat(this.collectorToken)); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index bb688c93a9..a3f5491cdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -406,6 +407,7 @@ public static void setup() throws Exception { generateByNewInstance(CommitResponse.class); generateByNewInstance(ApplicationTimeout.class); generateByNewInstance(QueueConfigurations.class); + generateByNewInstance(CollectorInfo.class); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 1503eca6e8..a4c1a384c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; @@ -37,11 +38,11 @@ public static ReportNewCollectorInfoRequest newInstance( } public static ReportNewCollectorInfoRequest newInstance( - ApplicationId id, String collectorAddr) { + ApplicationId id, String collectorAddr, Token token) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList( - Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); + Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token))); return request; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 73a8abe0fc..c07a6ebac4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -26,23 +26,26 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -164,9 +167,13 @@ private void addRegisteringCollectorsToProto() { builder.clearRegisteringCollectors(); for (Map.Entry entry : registeringCollectors.entrySet()) { + AppCollectorData data = entry.getValue(); builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue().getCollectorAddr())); + .setAppCollectorAddr(data.getCollectorAddr()) + .setAppCollectorToken(convertToProtoFormat(data.getCollectorToken())) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion())); } } @@ -267,8 +274,10 @@ private void initRegisteredCollectors() { this.registeringCollectors = new HashMap<>(); for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); + Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(), + collectorToken); this.registeringCollectors.put(appId, data); } } @@ -309,6 +318,14 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + @Override public Set getNodeLabels() { initNodeLabels(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index ad81a1f981..a1e6a21de0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,31 +26,34 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -154,6 +157,8 @@ private void addAppCollectorsMapToProto() { builder.addAppCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) + .setAppCollectorToken( + convertToProtoFormat(entry.getValue().getCollectorToken())) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion())); } @@ -599,8 +604,10 @@ private void initAppCollectorsMap() { this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); + Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(), + collectorToken); this.appCollectorsMap.put(appId, data); } } @@ -780,5 +787,13 @@ private SignalContainerRequestProto convertToProtoFormat( SignalContainerRequest t) { return ((SignalContainerRequestPBImpl)t).getProto(); } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index 3f3dcf52e9..5ffc3a2408 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java index da2e5de8aa..5266dca158 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; @@ -31,20 +32,32 @@ public abstract class AppCollectorData { protected static final long DEFAULT_TIMESTAMP_VALUE = -1; public static AppCollectorData newInstance( - ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + ApplicationId id, String collectorAddr, long rmIdentifier, long version, + Token token) { AppCollectorData appCollectorData = Records.newRecord(AppCollectorData.class); appCollectorData.setApplicationId(id); appCollectorData.setCollectorAddr(collectorAddr); appCollectorData.setRMIdentifier(rmIdentifier); appCollectorData.setVersion(version); + appCollectorData.setCollectorToken(token); return appCollectorData; } + public static AppCollectorData newInstance( + ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + return newInstance(id, collectorAddr, rmIdentifier, version, null); + } + + public static AppCollectorData newInstance(ApplicationId id, + String collectorAddr, Token token) { + return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, + DEFAULT_TIMESTAMP_VALUE, token); + } + public static AppCollectorData newInstance(ApplicationId id, String collectorAddr) { - return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, - DEFAULT_TIMESTAMP_VALUE); + return newInstance(id, collectorAddr, null); } /** @@ -101,4 +114,12 @@ public boolean isStamped() { public abstract void setVersion(long version); + /** + * Get delegation token for app collector which AM will use to publish + * entities. + * @return the delegation token for app collector. + */ + public abstract Token getCollectorToken(); + + public abstract void setCollectorToken(Token token); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java index 7d3a80545b..7144f51e01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java @@ -19,10 +19,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; - +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; @@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData { private String collectorAddr = null; private Long rmIdentifier = null; private Long version = null; + private Token collectorToken = null; public AppCollectorDataPBImpl() { builder = AppCollectorDataProto.newBuilder(); @@ -158,6 +160,24 @@ public void setVersion(long version) { builder.setVersion(version); } + @Override + public Token getCollectorToken() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorToken == null && p.hasAppCollectorToken()) { + this.collectorToken = new TokenPBImpl(p.getAppCollectorToken()); + } + return this.collectorToken; + } + + @Override + public void setCollectorToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearAppCollectorToken(); + } + this.collectorToken = token; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -195,6 +215,9 @@ private void mergeLocalToBuilder() { if (this.version != null) { builder.setVersion(this.version); } + if (this.collectorToken != null) { + builder.setAppCollectorToken( + ((TokenPBImpl)this.collectorToken).getProto()); + } } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c33b913b4d..c2ba677226 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -22,6 +22,7 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.yarn; +import "Security.proto"; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; import "yarn_service_protos.proto"; @@ -139,6 +140,7 @@ message AppCollectorDataProto { optional string app_collector_addr = 2; optional int64 rm_identifier = 3 [default = -1]; optional int64 version = 4 [default = -1]; + optional hadoop.common.TokenProto app_collector_token = 5; } ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 8291197403..82dfaea32a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; @@ -93,6 +95,21 @@ public class TestRPC { "collectors' number in ReportNewCollectorInfoRequest is not ONE."; public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0"; + private static final Token DEFAULT_COLLECTOR_TOKEN; + static { + TimelineDelegationTokenIdentifier identifier = + new TimelineDelegationTokenIdentifier(); + identifier.setOwner(new Text("user")); + identifier.setRenewer(new Text("user")); + identifier.setRealUser(new Text("user")); + long now = Time.now(); + identifier.setIssueDate(now); + identifier.setMaxDate(now + 1000L); + identifier.setMasterKeyId(500); + identifier.setSequenceNumber(5); + DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(), + identifier.getKind().toString(), identifier.getBytes(), "localhost:0"); + } public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, 0); @@ -173,7 +190,16 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException { try { ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance( - DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR); + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null); + proxy.reportNewCollectorInfo(request); + } catch (YarnException e) { + Assert.fail("RPC call failured is not expected here."); + } + + try { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance( + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN); proxy.reportNewCollectorInfo(request); } catch (YarnException e) { Assert.fail("RPC call failured is not expected here."); @@ -437,6 +463,8 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( DEFAULT_APP_ID); Assert.assertEquals(appCollector.getCollectorAddr(), DEFAULT_COLLECTOR_ADDR); + Assert.assertTrue(appCollector.getCollectorToken() == null || + appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN)); } else { throw new YarnException(ILLEGAL_NUMBER_MESSAGE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 15071f3749..a1890dd8be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -351,7 +352,8 @@ private HashSet getValidNodeLabels() { private Map getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); + AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr, + Token.newInstance(new byte[0], "kind", new byte[0], "s")); Map collectorMap = new HashMap<>(); collectorMap.put(appID, data); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 43cd135079..35b7cb0e5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; - import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 0097cd28f1..39be7a790c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; -import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index a245a97b9d..479aa43df0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -45,8 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - - import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 2d677a7ac4..a993d694e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; @@ -54,7 +55,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt @@ -294,9 +294,9 @@ public void allocate(ApplicationAttemptId appAttemptId, // add collector address for this application if (YarnConfiguration.timelineServiceV2Enabled( getRmContext().getYarnConfiguration())) { - AppCollectorData data = app.getCollectorData(); - if (data != null) { - response.setCollectorAddr(data.getCollectorAddr()); + CollectorInfo collectorInfo = app.getCollectorInfo(); + if (collectorInfo != null) { + response.setCollectorInfo(collectorInfo); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 1a0b920812..93c41b6747 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -187,13 +188,23 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * only if the timeline service v.2 is enabled. * * @return the data for the application's collector, including collector - * address, collector ID. Return null if the timeline service v.2 is not - * enabled. + * address, RM ID, version and collector token. Return null if the timeline + * service v.2 is not enabled. */ @InterfaceAudience.Private @InterfaceStability.Unstable AppCollectorData getCollectorData(); + /** + * The timeline collector information to be sent to AM. It should be used + * only if the timeline service v.2 is enabled. + * + * @return collector info, including collector address and collector token. + * Return null if the timeline service v.2 is not enabled. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + CollectorInfo getCollectorInfo(); /** * The original tracking url for the application master. * @return the original tracking url for the application master. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index dbddc5f471..13b079286f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeout; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable { private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; private AppCollectorData collectorData; + private CollectorInfo collectorInfo; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -530,7 +532,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, */ public void startTimelineCollector() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(applicationId); + new AppLevelTimelineCollector(applicationId, user); rmContext.getRMTimelineCollectorManager().putIfAbsent( applicationId, collector); } @@ -618,6 +620,12 @@ public AppCollectorData getCollectorData() { public void setCollectorData(AppCollectorData incomingData) { this.collectorData = incomingData; + this.collectorInfo = CollectorInfo.newInstance( + incomingData.getCollectorAddr(), incomingData.getCollectorToken()); + } + + public CollectorInfo getCollectorInfo() { + return this.collectorInfo; } public void removeCollectorData() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 3234d6fe48..f826631a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -239,6 +240,11 @@ public Priority getApplicationPriority() { public boolean isAppInCompletedStates() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public CollectorInfo getCollectorInfo() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index b0d47a19e5..39a7f995ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -325,4 +326,9 @@ public Priority getApplicationPriority() { public boolean isAppInCompletedStates() { return false; } + + @Override + public CollectorInfo getCollectorInfo() { + throw new UnsupportedOperationException("Not supported yet."); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 07058f62ef..eb4381d3de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -80,7 +80,7 @@ public static void setupClass() throws Exception { auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], collectorManager, conf); - auxService.addApplication(ApplicationId.newInstance(0, 1)); + auxService.addApplication(ApplicationId.newInstance(0, 1), "user"); } catch (ExitUtil.ExitException e) { fail(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index 608ef67861..0ddf287f35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -23,7 +23,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.BufferedReader; @@ -40,6 +43,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.Text; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -51,10 +55,12 @@ import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; @@ -76,7 +82,6 @@ public class TestTimelineAuthFilterForV2 { private static final String FOO_USER = "foo"; private static final String HTTP_USER = "HTTP"; - private static final File TEST_ROOT_DIR = new File( System.getProperty("test.build.dir", "target" + File.separator + "test-dir"), UUID.randomUUID().toString()); @@ -88,21 +93,35 @@ public class TestTimelineAuthFilterForV2 { private static String httpSpnegoPrincipal = KerberosTestUtils. getServerPrincipal(); + // First param indicates whether HTTPS access or HTTP access and second param + // indicates whether it is kerberos access or token based access. @Parameterized.Parameters - public static Collection withSsl() { - return Arrays.asList(new Object[][] {{false}, {true}}); + public static Collection params() { + return Arrays.asList(new Object[][] {{false, true}, {false, false}, + {true, false}, {true, true}}); } private static MiniKdc testMiniKDC; private static String keystoresDir; private static String sslConfDir; private static Configuration conf; + private static UserGroupInformation nonKerberosUser; + static { + try { + nonKerberosUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) {} + } + // Indicates whether HTTPS or HTTP access. private boolean withSsl; + // Indicates whether Kerberos based login is used or token based access is + // done. + private boolean withKerberosLogin; private NodeTimelineCollectorManager collectorManager; private PerNodeTimelineCollectorsAuxService auxService; - - public TestTimelineAuthFilterForV2(boolean withSsl) { + public TestTimelineAuthFilterForV2(boolean withSsl, + boolean withKerberosLogin) { this.withSsl = withSsl; + this.withKerberosLogin = withKerberosLogin; } @BeforeClass @@ -143,8 +162,6 @@ public static void setup() { conf.set("hadoop.proxyuser.HTTP.hosts", "*"); conf.set("hadoop.proxyuser.HTTP.users", FOO_USER); UserGroupInformation.setConfiguration(conf); - SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB, - YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost"); } catch (Exception e) { fail("Couldn't setup TimelineServer V2."); } @@ -166,9 +183,27 @@ public void initialize() throws Exception { conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name()); } + UserGroupInformation.setConfiguration(conf); collectorManager = new DummyNodeTimelineCollectorManager(); - auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], - collectorManager, conf); + auxService = PerNodeTimelineCollectorsAuxService.launchServer( + new String[0], collectorManager, conf); + if (withKerberosLogin) { + SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB, + YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost"); + } + ApplicationId appId = ApplicationId.newInstance(0, 1); + auxService.addApplication( + appId, UserGroupInformation.getCurrentUser().getUserName()); + if (!withKerberosLogin) { + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector)collectorManager.get(appId); + org.apache.hadoop.security.token.Token + token = + collector.getDelegationTokenForApp(); + token.setService(new Text("localhost" + token.getService().toString(). + substring(token.getService().toString().indexOf(":")))); + UserGroupInformation.getCurrentUser().addToken(token); + } } private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) { @@ -199,9 +234,14 @@ public void destroy() throws Exception { } if (withSsl) { KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir); - File base = new File(BASEDIR); - FileUtil.fullyDelete(base); + FileUtil.fullyDelete(new File(BASEDIR)); } + if (withKerberosLogin) { + UserGroupInformation.getCurrentUser().logoutUserFromKeytab(); + } + // Reset the user for next run. + UserGroupInformation.setLoginUser( + UserGroupInformation.createRemoteUser(nonKerberosUser.getUserName())); } private static TimelineEntity createEntity(String id, String type) { @@ -241,35 +281,44 @@ private static TimelineEntity readEntityFile(File entityFile) } } + private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir, + String entityType) throws Exception { + TimelineV2Client client = createTimelineClientForUGI(appId); + try { + // Sync call. Results available immediately. + client.putEntities(createEntity("entity1", entityType)); + assertEquals(1, entityTypeDir.listFiles().length); + verifyEntity(entityTypeDir, "entity1", entityType); + // Async call. + client.putEntitiesAsync(createEntity("entity2", entityType)); + } finally { + client.stop(); + } + } + @Test public void testPutTimelineEntities() throws Exception { - ApplicationId appId = ApplicationId.newInstance(0, 1); - auxService.addApplication(appId); final String entityType = "dummy_type"; + ApplicationId appId = ApplicationId.newInstance(0, 1); File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() + File.separator + "entities" + File.separator + - YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + "test_user" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + + UserGroupInformation.getCurrentUser().getUserName() + File.separator + "test_flow_name" + File.separator + "test_flow_version" + File.separator + "1" + File.separator + appId.toString() + File.separator + entityType); try { - KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { - @Override - public Void call() throws Exception { - TimelineV2Client client = createTimelineClientForUGI(appId); - try { - // Sync call. Results available immediately. - client.putEntities(createEntity("entity1", entityType)); - assertEquals(1, entityTypeDir.listFiles().length); - verifyEntity(entityTypeDir, "entity1", entityType); - // Async call. - client.putEntitiesAsync(createEntity("entity2", entityType)); + if (withKerberosLogin) { + KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable() { + @Override + public Void call() throws Exception { + publishAndVerifyEntity(appId, entityTypeDir, entityType); return null; - } finally { - client.stop(); } - } - }); + }); + } else { + publishAndVerifyEntity(appId, entityTypeDir, entityType); + } // Wait for async entity to be published. for (int i = 0; i < 50; i++) { if (entityTypeDir.listFiles().length == 2) { @@ -279,6 +328,11 @@ public Void call() throws Exception { } assertEquals(2, entityTypeDir.listFiles().length); verifyEntity(entityTypeDir, "entity2", entityType); + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector)collectorManager.get(appId); + auxService.removeApplication(appId); + verify(collectorManager.getTokenManagerService()).cancelToken( + eq(collector.getDelegationTokenForApp()), any(String.class)); } finally { FileUtils.deleteQuietly(entityTypeDir); } @@ -290,13 +344,20 @@ private static class DummyNodeTimelineCollectorManager extends super(); } + @Override + protected TimelineV2DelegationTokenSecretManagerService + createTokenManagerService() { + return spy(new TimelineV2DelegationTokenSecretManagerService()); + } + @Override protected CollectorNodemanagerProtocol getNMCollectorService() { CollectorNodemanagerProtocol protocol = mock(CollectorNodemanagerProtocol.class); try { GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance("test_user", + GetTimelineCollectorContextResponse.newInstance( + UserGroupInformation.getCurrentUser().getUserName(), "test_flow_name", "test_flow_version", 1L); when(protocol.getTimelineCollectorContext(any( GetTimelineCollectorContextRequest.class))).thenReturn(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 10d68bbe92..08ac89487e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -22,9 +22,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,13 +45,20 @@ public class AppLevelTimelineCollector extends TimelineCollector { LoggerFactory.getLogger(TimelineCollector.class); private final ApplicationId appId; + private final String appUser; private final TimelineCollectorContext context; private UserGroupInformation currentUser; + private Token delegationTokenForApp; public AppLevelTimelineCollector(ApplicationId appId) { + this(appId, null); + } + + public AppLevelTimelineCollector(ApplicationId appId, String user) { super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); Preconditions.checkNotNull(appId, "AppId shouldn't be null"); this.appId = appId; + this.appUser = user; context = new TimelineCollectorContext(); } @@ -56,6 +66,20 @@ public UserGroupInformation getCurrentUser() { return currentUser; } + public String getAppUser() { + return appUser; + } + + void setDelegationTokenForApp( + Token token) { + this.delegationTokenForApp = token; + } + + @VisibleForTesting + public Token getDelegationTokenForApp() { + return this.delegationTokenForApp; + } + @Override protected void serviceInit(Configuration conf) throws Exception { context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java index ac91275d2e..6c0d693f00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java @@ -56,8 +56,8 @@ public class AppLevelTimelineCollectorWithAgg private ScheduledThreadPoolExecutor appAggregationExecutor; private AppLevelAggregator appAggregator; - public AppLevelTimelineCollectorWithAgg(ApplicationId appId) { - super(appId); + public AppLevelTimelineCollectorWithAgg(ApplicationId appId, String user) { + super(appId, user); } private static Set initializeSkipSet() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index bb51734f53..02362b2dc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -28,14 +28,17 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; @@ -71,6 +74,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { private final boolean runningAsAuxService; + private UserGroupInformation loginUGI; + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; @VisibleForTesting @@ -85,25 +90,40 @@ protected NodeTimelineCollectorManager(boolean asAuxService) { @Override protected void serviceInit(Configuration conf) throws Exception { - tokenMgrService = new TimelineV2DelegationTokenSecretManagerService(); + tokenMgrService = createTokenManagerService(); addService(tokenMgrService); + this.loginUGI = UserGroupInformation.getCurrentUser(); super.serviceInit(conf); } @Override protected void serviceStart() throws Exception { - if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) { + if (UserGroupInformation.isSecurityEnabled()) { // Do security login for cases where collector is running outside NM. - try { - doSecureLogin(); - } catch(IOException ie) { - throw new YarnRuntimeException("Failed to login", ie); + if (!runningAsAuxService) { + try { + doSecureLogin(); + } catch(IOException ie) { + throw new YarnRuntimeException("Failed to login", ie); + } } + this.loginUGI = UserGroupInformation.getLoginUser(); } super.serviceStart(); startWebApp(); } + protected TimelineV2DelegationTokenSecretManagerService + createTokenManagerService() { + return new TimelineV2DelegationTokenSecretManagerService(); + } + + @VisibleForTesting + public TimelineV2DelegationTokenSecretManagerService + getTokenManagerService() { + return tokenMgrService; + } + private void doSecureLogin() throws IOException { Configuration conf = getConfig(); InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed( @@ -122,13 +142,45 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + @VisibleForTesting + public Token generateTokenForAppCollector( + String user) { + Token token = tokenMgrService. + generateToken(UserGroupInformation.createRemoteUser(user), + loginUGI.getShortUserName()); + token.setService(new Text(timelineRestServerBindAddress)); + return token; + } + + @VisibleForTesting + public void cancelTokenForAppCollector( + AppLevelTimelineCollector appCollector) throws IOException { + if (appCollector.getDelegationTokenForApp() != null) { + tokenMgrService.cancelToken(appCollector.getDelegationTokenForApp(), + appCollector.getAppUser()); + } + } + @Override protected void doPostPut(ApplicationId appId, TimelineCollector collector) { try { // Get context info from NM updateTimelineCollectorContext(appId, collector); + // Generate token for app collector. + org.apache.hadoop.yarn.api.records.Token token = null; + if (UserGroupInformation.isSecurityEnabled() && + collector instanceof AppLevelTimelineCollector) { + AppLevelTimelineCollector appCollector = + (AppLevelTimelineCollector)collector; + Token timelineToken = + generateTokenForAppCollector(appCollector.getAppUser()); + appCollector.setDelegationTokenForApp(timelineToken); + token = org.apache.hadoop.yarn.api.records.Token.newInstance( + timelineToken.getIdentifier(), timelineToken.getKind().toString(), + timelineToken.getPassword(), timelineToken.getService().toString()); + } // Report to NM if a new collector is added. - reportNewCollectorToNM(appId); + reportNewCollectorToNM(appId, token); } catch (YarnException | IOException e) { // throw exception here as it cannot be used if failed communicate with NM LOG.error("Failed to communicate with NM Collector Service for " + appId); @@ -136,6 +188,18 @@ protected void doPostPut(ApplicationId appId, TimelineCollector collector) { } } + @Override + protected void postRemove(ApplicationId appId, TimelineCollector collector) { + if (collector instanceof AppLevelTimelineCollector) { + try { + cancelTokenForAppCollector((AppLevelTimelineCollector)collector); + } catch (IOException e) { + LOG.warn("Failed to cancel token for app collector with appId " + + appId, e); + } + } + } + /** * Launch the REST web server for this collector manager. */ @@ -180,11 +244,12 @@ private void startWebApp() { timelineRestServerBindAddress); } - private void reportNewCollectorToNM(ApplicationId appId) + private void reportNewCollectorToNM(ApplicationId appId, + org.apache.hadoop.yarn.api.records.Token token) throws YarnException, IOException { ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance(appId, - this.timelineRestServerBindAddress); + this.timelineRestServerBindAddress, token); LOG.info("Report a new collector for application: " + appId + " to the NM Collector Service."); getNMCollectorService().reportNewCollectorInfo(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index d4cde648ad..66f9aab034 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -114,11 +114,12 @@ protected void serviceStop() throws Exception { * exists, no new service is created. * * @param appId Application Id to be added. + * @param user Application Master container user. * @return whether it was added successfully */ - public boolean addApplication(ApplicationId appId) { + public boolean addApplication(ApplicationId appId, String user) { AppLevelTimelineCollector collector = - new AppLevelTimelineCollectorWithAgg(appId); + new AppLevelTimelineCollectorWithAgg(appId, user); return (collectorManager.putIfAbsent(appId, collector) == collector); } @@ -147,7 +148,7 @@ public void initializeContainer(ContainerInitializationContext context) { if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { ApplicationId appId = context.getContainerId(). getApplicationAttemptId().getApplicationId(); - addApplication(appId); + addApplication(appId, context.getUser()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java index eef8436452..de7db58211 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java @@ -18,8 +18,13 @@ package org.apache.hadoop.yarn.server.timelineservice.security; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService; @@ -43,6 +48,17 @@ public TimelineV2DelegationTokenSecretManagerService() { tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval); } + public Token generateToken( + UserGroupInformation ugi, String renewer) { + return ((TimelineV2DelegationTokenSecretManager) + getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer); + } + + public void cancelToken(Token token, + String canceller) throws IOException { + getTimelineDelegationTokenSecretManager().cancelToken(token, canceller); + } + /** * Delegation token secret manager for ATSv2. */ @@ -70,6 +86,21 @@ public TimelineV2DelegationTokenSecretManager( delegationTokenRenewInterval, delegationTokenRemoverScanInterval); } + public Token generateToken( + UserGroupInformation ugi, String renewer) { + Text realUser = null; + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + TimelineDelegationTokenIdentifier identifier = createIdentifier(); + identifier.setOwner(new Text(ugi.getUserName())); + identifier.setRenewer(new Text(renewer)); + identifier.setRealUser(realUser); + byte[] password = createPassword(identifier); + return new Token(identifier.getBytes(), + password, identifier.getKind(), null); + } + @Override public TimelineDelegationTokenIdentifier createIdentifier() { return new TimelineDelegationTokenIdentifier(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index a59f8c1e68..af9acce265 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -95,7 +95,7 @@ public void testMultithreadedAdd() throws Exception { Callable task = new Callable() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollectorWithAgg(appId); + new AppLevelTimelineCollectorWithAgg(appId, "user"); return (collectorManager.putIfAbsent(appId, collector) == collector); } }; @@ -126,7 +126,7 @@ public void testMultithreadedAddAndRemove() throws Exception { Callable task = new Callable() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollectorWithAgg(appId); + new AppLevelTimelineCollectorWithAgg(appId, "user"); boolean successPut = (collectorManager.putIfAbsent(appId, collector) == collector); return successPut && collectorManager.remove(appId);