From 1962851356745d2f395215aab8289c386376567e Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 14 Oct 2022 07:52:22 +0800 Subject: [PATCH] YARN-11294. [Federation] Router Support DelegationToken store/update/remove Token With MemoryStateStore. (#4915) --- .../client/YARNDelegationTokenIdentifier.java | 12 ++ .../YARNDelegationTokenIdentifierPBImpl.java | 200 ++++++++++++++++++ .../security/client/impl/pb/package-info.java | 20 ++ .../yarn/api/BasePBImplRecordsTest.java | 5 +- .../FederationDelegationTokenStateStore.java | 46 ++++ .../impl/MemoryFederationStateStore.java | 69 ++++++ .../store/impl/SQLFederationStateStore.java | 26 +++ .../impl/ZookeeperFederationStateStore.java | 26 +++ .../store/records/RouterRMTokenRequest.java | 44 ++++ .../store/records/RouterRMTokenResponse.java | 44 ++++ .../store/records/RouterStoreToken.java | 56 +++++ .../impl/pb/RouterRMTokenRequestPBImpl.java | 129 +++++++++++ .../impl/pb/RouterRMTokenResponsePBImpl.java | 131 ++++++++++++ .../impl/pb/RouterStoreTokenPBImpl.java | 171 +++++++++++++++ .../utils/FederationStateStoreFacade.java | 71 +++++++ .../impl/FederationStateStoreBaseTest.java | 108 ++++++++++ .../impl/TestSQLFederationStateStore.java | 20 ++ .../TestZookeeperFederationStateStore.java | 20 ++ .../TestFederationProtocolRecords.java | 26 ++- .../utils/TestFederationStateStoreFacade.java | 101 +++++++++ .../FederationStateStoreService.java | 33 ++- 21 files changed, 1352 insertions(+), 6 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/YARNDelegationTokenIdentifierPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/YARNDelegationTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/YARNDelegationTokenIdentifier.java index 6d8bc4bc1d..95fe4bbc64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/YARNDelegationTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/YARNDelegationTokenIdentifier.java @@ -24,9 +24,11 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto; +import org.apache.hadoop.yarn.util.Records; @Private public abstract class YARNDelegationTokenIdentifier extends @@ -112,4 +114,14 @@ public YARNDelegationTokenIdentifierProto getProto() { setBuilderFields(); return builder.build(); } + + @Private + @Unstable + public static YARNDelegationTokenIdentifier newInstance(Text owner, Text renewer, Text realUser) { + YARNDelegationTokenIdentifier policy = Records.newRecord(YARNDelegationTokenIdentifier.class); + policy.setOwner(owner); + policy.setRenewer(renewer); + policy.setRenewer(realUser); + return policy; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/YARNDelegationTokenIdentifierPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/YARNDelegationTokenIdentifierPBImpl.java new file mode 100644 index 0000000000..f977a3391f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/YARNDelegationTokenIdentifierPBImpl.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.security.client.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProtoOrBuilder; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; + +@Private +@Unstable +public class YARNDelegationTokenIdentifierPBImpl extends YARNDelegationTokenIdentifier { + + private YARNDelegationTokenIdentifierProto proto = + YARNDelegationTokenIdentifierProto.getDefaultInstance(); + private YARNDelegationTokenIdentifierProto.Builder builder = null; + private boolean viaProto = false; + + public YARNDelegationTokenIdentifierPBImpl() { + builder = YARNDelegationTokenIdentifierProto.newBuilder(); + } + + public YARNDelegationTokenIdentifierPBImpl(YARNDelegationTokenIdentifierProto identifierProto) { + this.proto = identifierProto; + viaProto = true; + } + + public YARNDelegationTokenIdentifierProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + // mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + if (proto == null) { + proto = YARNDelegationTokenIdentifierProto.getDefaultInstance(); + } + builder = YARNDelegationTokenIdentifierProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public Text getOwner() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return new Text(p.getOwner()); + } + + @Override + public void setOwner(Text owner) { + super.setOwner(owner); + maybeInitBuilder(); + if (owner == null) { + builder.clearOwner(); + return; + } + builder.setOwner(owner.toString()); + } + + @Override + public Text getRenewer() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return new Text(p.getRenewer()); + } + + @Override + public void setRenewer(Text renewer) { + super.setRenewer(renewer); + maybeInitBuilder(); + if (renewer == null) { + builder.clearRenewer(); + return; + } + builder.setOwner(renewer.toString()); + } + + @Override + public Text getRealUser() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return new Text(p.getRealUser()); + } + + @Override + public void setRealUser(Text realUser) { + super.setRealUser(realUser); + maybeInitBuilder(); + if (realUser == null) { + builder.clearRealUser(); + return; + } + builder.setRealUser(realUser.toString()); + } + + @Override + public void setIssueDate(long issueDate) { + super.setIssueDate(issueDate); + maybeInitBuilder(); + builder.setIssueDate(issueDate); + } + + @Override + public long getIssueDate() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return p.getIssueDate(); + } + + @Override + public void setMaxDate(long maxDate) { + super.setMaxDate(maxDate); + maybeInitBuilder(); + builder.setMaxDate(maxDate); + } + + @Override + public long getMaxDate() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxDate(); + } + + @Override + public void setSequenceNumber(int seqNum) { + super.setSequenceNumber(seqNum); + maybeInitBuilder(); + builder.setSequenceNumber(seqNum); + } + + @Override + public int getSequenceNumber() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return p.getSequenceNumber(); + } + + @Override + public void setMasterKeyId(int newId) { + super.setMasterKeyId(newId); + maybeInitBuilder(); + builder.setMasterKeyId(newId); + } + + @Override + public int getMasterKeyId() { + YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder; + return p.getMasterKeyId(); + } + + @Override + public Text getKind() { + return null; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/package-info.java new file mode 100644 index 0000000000..cb6b1be880 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/impl/pb/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@Public +package org.apache.hadoop.yarn.security.client.impl.pb; +import org.apache.hadoop.classification.InterfaceAudience.Public; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java index 5697923c99..6a908d57b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api; import org.apache.commons.lang3.Range; +import org.apache.hadoop.io.Text; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Sets; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; @@ -80,6 +81,8 @@ private static Object genTypeValue(Type type) { 'a' + rand.nextInt(26)); } else if (type.equals(Float.class)) { return rand.nextFloat(); + } else if (type.equals(Text.class)) { + return new Text('a' + String.valueOf(rand.nextInt(1000000))); } else if (type instanceof Class) { Class clazz = (Class)type; if (clazz.isArray()) { @@ -167,7 +170,7 @@ protected static Object generateByNewInstance(Class clazz) throws Exception { " does not have newInstance method"); } Object [] args = new Object[paramTypes.length]; - for (int i=0;i rmDTState = routerRMSecretManagerState.getTokenState(); + rmDTState.remove(tokenIdentifier); + storeOrUpdateRouterRMDT(tokenIdentifier, renewDate, true); + return RouterRMTokenResponse.newInstance(storeToken); + } + + @Override + public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + RouterStoreToken storeToken = request.getRouterStoreToken(); + RMDelegationTokenIdentifier tokenIdentifier = + (RMDelegationTokenIdentifier) storeToken.getTokenIdentifier(); + Map rmDTState = routerRMSecretManagerState.getTokenState(); + rmDTState.remove(tokenIdentifier); + return RouterRMTokenResponse.newInstance(storeToken); + } + + @Override + public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) + throws YarnException, IOException { + RouterStoreToken storeToken = request.getRouterStoreToken(); + RMDelegationTokenIdentifier tokenIdentifier = + (RMDelegationTokenIdentifier) storeToken.getTokenIdentifier(); + Map rmDTState = routerRMSecretManagerState.getTokenState(); + if (!rmDTState.containsKey(tokenIdentifier)) { + LOG.info("Router RMDelegationToken: {} does not exist.", tokenIdentifier); + throw new IOException("Router RMDelegationToken: " + tokenIdentifier + " does not exist."); + } + RouterStoreToken resultToken = + RouterStoreToken.newInstance(tokenIdentifier, rmDTState.get(tokenIdentifier)); + return RouterRMTokenResponse.newInstance(resultToken); + } + + private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier, + Long renewDate, boolean isUpdate) throws IOException { + Map rmDTState = routerRMSecretManagerState.getTokenState(); + if (rmDTState.containsKey(rmDTIdentifier)) { + LOG.info("Error storing info for RMDelegationToken: {}.", rmDTIdentifier); + throw new IOException("Router RMDelegationToken: " + rmDTIdentifier + "is already stored."); + } + rmDTState.put(rmDTIdentifier, renewDate); + if (!isUpdate) { + routerRMSecretManagerState.setDtSequenceNumber(rmDTIdentifier.getSequenceNumber()); + } + LOG.info("Store Router RM-RMDT with sequence number {}.", rmDTIdentifier.getSequenceNumber()); + } + /** * Get DelegationKey By based on MasterKey. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 889c1e0641..d7b5d27324 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -84,6 +84,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; @@ -1427,4 +1429,28 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques throws YarnException, IOException { throw new NotImplementedException("Code is not implemented"); } + + @Override + public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 3215333cbb..f4d45f5a72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl; @@ -901,4 +903,28 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques throws YarnException, IOException { throw new NotImplementedException("Code is not implemented"); } + + @Override + public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) + throws YarnException, IOException { + throw new NotImplementedException("Code is not implemented"); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenRequest.java new file mode 100644 index 0000000000..790ee513bb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenRequest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Unstable +public abstract class RouterRMTokenRequest { + + @Private + @Unstable + public static RouterRMTokenRequest newInstance(RouterStoreToken routerStoreToken) { + RouterRMTokenRequest request = Records.newRecord(RouterRMTokenRequest.class); + request.setRouterStoreToken(routerStoreToken); + return request; + } + + @Public + @Unstable + public abstract RouterStoreToken getRouterStoreToken(); + + @Private + @Unstable + public abstract void setRouterStoreToken(RouterStoreToken routerStoreToken); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenResponse.java new file mode 100644 index 0000000000..c629e46a04 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterRMTokenResponse.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Unstable +public abstract class RouterRMTokenResponse { + + @Private + @Unstable + public static RouterRMTokenResponse newInstance(RouterStoreToken routerStoreToken) { + RouterRMTokenResponse request = Records.newRecord(RouterRMTokenResponse.class); + request.setRouterStoreToken(routerStoreToken); + return request; + } + + @Public + @Unstable + public abstract RouterStoreToken getRouterStoreToken(); + + @Private + @Unstable + public abstract void setRouterStoreToken(RouterStoreToken routerStoreToken); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java new file mode 100644 index 0000000000..d6eade8780 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; + +import java.io.IOException; + +@Private +@Unstable +public abstract class RouterStoreToken { + + @Private + @Unstable + public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identifier, + Long renewdate) { + RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class); + storeToken.setIdentifier(identifier); + storeToken.setRenewDate(renewdate); + return storeToken; + } + + @Private + @Unstable + public abstract YARNDelegationTokenIdentifier getTokenIdentifier() throws IOException; + + @Private + @Unstable + public abstract void setIdentifier(YARNDelegationTokenIdentifier identifier); + + @Private + @Unstable + public abstract Long getRenewDate(); + + @Private + @Unstable + public abstract void setRenewDate(Long renewDate); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenRequestPBImpl.java new file mode 100644 index 0000000000..1358a78326 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenRequestPBImpl.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenRequestProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenRequestProtoOrBuilder; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; + +public class RouterRMTokenRequestPBImpl extends RouterRMTokenRequest { + + private RouterRMTokenRequestProto proto = RouterRMTokenRequestProto.getDefaultInstance(); + private RouterRMTokenRequestProto.Builder builder = null; + private boolean viaProto = false; + private RouterStoreToken routerStoreToken = null; + + public RouterRMTokenRequestPBImpl() { + builder = RouterRMTokenRequestProto.newBuilder(); + } + + public RouterRMTokenRequestPBImpl(RouterRMTokenRequestProto requestProto) { + this.proto = requestProto; + viaProto = true; + } + + public RouterRMTokenRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RouterRMTokenRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.routerStoreToken != null) { + RouterStoreTokenPBImpl routerStoreTokenPBImpl = + (RouterStoreTokenPBImpl) this.routerStoreToken; + RouterStoreTokenProto storeTokenProto = routerStoreTokenPBImpl.getProto(); + if (!storeTokenProto.equals(builder.getRouterStoreToken())) { + builder.setRouterStoreToken(convertToProtoFormat(this.routerStoreToken)); + } + } + } + + @Override + public RouterStoreToken getRouterStoreToken() { + RouterRMTokenRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.routerStoreToken != null) { + return this.routerStoreToken; + } + if (!p.hasRouterStoreToken()) { + return null; + } + this.routerStoreToken = convertFromProtoFormat(p.getRouterStoreToken()); + return this.routerStoreToken; + } + + @Override + public void setRouterStoreToken(RouterStoreToken storeToken) { + maybeInitBuilder(); + if (storeToken == null) { + builder.clearRouterStoreToken(); + return; + } + this.routerStoreToken = storeToken; + this.builder.setRouterStoreToken(convertToProtoFormat(storeToken)); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private RouterStoreTokenProto convertToProtoFormat(RouterStoreToken storeToken) { + return ((RouterStoreTokenPBImpl) storeToken).getProto(); + } + + private RouterStoreToken convertFromProtoFormat(RouterStoreTokenProto storeTokenProto) { + return new RouterStoreTokenPBImpl(storeTokenProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenResponsePBImpl.java new file mode 100644 index 0000000000..f50967d352 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterRMTokenResponsePBImpl.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenResponseProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenResponseProtoOrBuilder; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; + +@Private +@Unstable +public class RouterRMTokenResponsePBImpl extends RouterRMTokenResponse { + + private RouterRMTokenResponseProto proto = RouterRMTokenResponseProto.getDefaultInstance(); + private RouterRMTokenResponseProto.Builder builder = null; + private boolean viaProto = false; + private RouterStoreToken routerStoreToken = null; + + public RouterRMTokenResponsePBImpl() { + builder = RouterRMTokenResponseProto.newBuilder(); + } + + public RouterRMTokenResponsePBImpl(RouterRMTokenResponseProto requestProto) { + this.proto = requestProto; + viaProto = true; + } + + public RouterRMTokenResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RouterRMTokenResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.routerStoreToken != null) { + RouterStoreTokenPBImpl routerStoreTokenPBImpl = + (RouterStoreTokenPBImpl) this.routerStoreToken; + RouterStoreTokenProto storeTokenProto = routerStoreTokenPBImpl.getProto(); + if (!storeTokenProto.equals(builder.getRouterStoreToken())) { + builder.setRouterStoreToken(convertToProtoFormat(this.routerStoreToken)); + } + } + } + + @Override + public RouterStoreToken getRouterStoreToken() { + RouterRMTokenResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.routerStoreToken != null) { + return this.routerStoreToken; + } + if (!p.hasRouterStoreToken()) { + return null; + } + this.routerStoreToken = convertFromProtoFormat(p.getRouterStoreToken()); + return this.routerStoreToken; + } + + @Override + public void setRouterStoreToken(RouterStoreToken storeToken) { + maybeInitBuilder(); + if (storeToken == null) { + builder.clearRouterStoreToken(); + } + this.routerStoreToken = storeToken; + } + + private RouterStoreTokenProto convertToProtoFormat(RouterStoreToken storeToken) { + return ((RouterStoreTokenPBImpl) storeToken).getProto(); + } + + private RouterStoreToken convertFromProtoFormat(RouterStoreTokenProto storeTokenProto) { + return new RouterStoreTokenPBImpl(storeTokenProto); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java new file mode 100644 index 0000000000..32e148cb5b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProtoOrBuilder; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * Protocol buffer based implementation of {@link RouterStoreToken}. + */ +@Private +@Unstable +public class RouterStoreTokenPBImpl extends RouterStoreToken { + + private RouterStoreTokenProto proto = RouterStoreTokenProto.getDefaultInstance(); + + private RouterStoreTokenProto.Builder builder = null; + + private boolean viaProto = false; + + private YARNDelegationTokenIdentifier rMDelegationTokenIdentifier = null; + private Long renewDate; + + public RouterStoreTokenPBImpl() { + builder = RouterStoreTokenProto.newBuilder(); + } + + public RouterStoreTokenPBImpl(RouterStoreTokenProto storeTokenProto) { + this.proto = storeTokenProto; + viaProto = true; + } + + public RouterStoreTokenProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.rMDelegationTokenIdentifier != null) { + YARNDelegationTokenIdentifierProto idProto = this.rMDelegationTokenIdentifier.getProto(); + if (!idProto.equals(builder.getTokenIdentifier())) { + builder.setTokenIdentifier(convertToProtoFormat(this.rMDelegationTokenIdentifier)); + } + } + + if (this.renewDate != null) { + builder.setRenewDate(this.renewDate); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RouterStoreTokenProto.newBuilder(proto); + } + viaProto = false; + } + + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public YARNDelegationTokenIdentifier getTokenIdentifier() throws IOException { + RouterStoreTokenProtoOrBuilder p = viaProto ? proto : builder; + if (rMDelegationTokenIdentifier != null) { + return rMDelegationTokenIdentifier; + } + if(!p.hasTokenIdentifier()){ + return null; + } + YARNDelegationTokenIdentifierProto identifierProto = p.getTokenIdentifier(); + ByteArrayInputStream in = new ByteArrayInputStream(identifierProto.toByteArray()); + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); + identifier.readFields(new DataInputStream(in)); + this.rMDelegationTokenIdentifier = identifier; + return identifier; + } + + @Override + public Long getRenewDate() { + RouterStoreTokenProtoOrBuilder p = viaProto ? proto : builder; + if (this.renewDate != null) { + return this.renewDate; + } + if (!p.hasRenewDate()) { + return null; + } + this.renewDate = p.getRenewDate(); + return this.renewDate; + } + + @Override + public void setIdentifier(YARNDelegationTokenIdentifier identifier) { + maybeInitBuilder(); + if(identifier == null) { + builder.clearTokenIdentifier(); + return; + } + this.rMDelegationTokenIdentifier = identifier; + this.builder.setTokenIdentifier(identifier.getProto()); + } + + @Override + public void setRenewDate(Long renewDate) { + maybeInitBuilder(); + if(renewDate == null) { + builder.clearRenewDate(); + return; + } + this.renewDate = renewDate; + this.builder.setRenewDate(renewDate); + } + + private YARNDelegationTokenIdentifierProto convertToProtoFormat( + YARNDelegationTokenIdentifier delegationTokenIdentifier) { + return delegationTokenIdentifier.getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 2044f29099..47cb9e9e35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -80,6 +80,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -778,4 +782,71 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(DelegationKey newKey) RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey); return stateStore.getMasterKeyByDelegationKey(keyRequest); } + + /** + * The Router Supports Store RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. + * + * @param identifier delegation tokens from the RM + * @param renewDate renewDate + * @throws YarnException if the call to the state store is unsuccessful + * @throws IOException An IO Error occurred + */ + public void storeNewToken(RMDelegationTokenIdentifier identifier, + long renewDate) throws YarnException, IOException { + LOG.info("storing RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + stateStore.storeNewToken(request); + } + + /** + * The Router Supports Update RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. + * + * @param identifier delegation tokens from the RM + * @param renewDate renewDate + * @throws YarnException if the call to the state store is unsuccessful + * @throws IOException An IO Error occurred + */ + public void updateStoredToken(RMDelegationTokenIdentifier identifier, + long renewDate) throws YarnException, IOException { + LOG.info("updating RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + stateStore.updateStoredToken(request); + } + + /** + * The Router Supports Remove RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}. + * + * @param identifier delegation tokens from the RM + * @throws YarnException if the call to the state store is unsuccessful + * @throws IOException An IO Error occurred + */ + public void removeStoredToken(RMDelegationTokenIdentifier identifier) + throws YarnException, IOException{ + LOG.info("removing RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + stateStore.removeStoredToken(request); + } + + /** + * The Router Supports GetTokenByRouterStoreToken{@link RMDelegationTokenIdentifier}. + * + * @param identifier delegation tokens from the RM + * @return RouterStoreToken + * @throws YarnException if the call to the state store is unsuccessful + * @throws IOException An IO Error occurred + */ + public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentifier identifier) + throws YarnException, IOException { + LOG.info("get RouterStoreToken token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + return stateStore.getTokenByRouterStoreToken(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index d5493f6614..3786f7cccc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -26,12 +26,14 @@ import java.util.TimeZone; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; @@ -74,6 +76,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.apache.hadoop.yarn.util.MonotonicClock; import org.junit.After; import org.junit.Assert; @@ -922,4 +927,107 @@ public void testRemoveStoredMasterKey() throws YarnException, IOException { Assert.assertEquals(routerMasterKey.getKeyBytes(), routerMasterKeyResp.getKeyBytes()); Assert.assertEquals(routerMasterKey.getExpiryDate(), routerMasterKeyResp.getExpiryDate()); } + + @Test + public void testStoreNewToken() throws IOException, YarnException { + // prepare parameters + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner1"), new Text("renewer1"), new Text("realuser1")); + int sequenceNumber = 1; + identifier.setSequenceNumber(sequenceNumber); + Long renewDate = Time.now(); + + // store new rm-token + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + + // Verify the returned result to ensure that the returned Response is not empty + // and the returned result is consistent with the input parameters. + Assert.assertNotNull(routerRMTokenResponse); + RouterStoreToken storeTokenResp = routerRMTokenResponse.getRouterStoreToken(); + Assert.assertNotNull(storeTokenResp); + Assert.assertEquals(storeToken.getRenewDate(), storeTokenResp.getRenewDate()); + Assert.assertEquals(storeToken.getTokenIdentifier(), storeTokenResp.getTokenIdentifier()); + } + + @Test + public void testUpdateStoredToken() throws IOException, YarnException { + // prepare saveToken parameters + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner2"), new Text("renewer2"), new Text("realuser2")); + int sequenceNumber = 2; + identifier.setSequenceNumber(sequenceNumber); + Long renewDate = Time.now(); + + // store new rm-token + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + Assert.assertNotNull(routerRMTokenResponse); + + // prepare updateToken parameters + Long renewDate2 = Time.now(); + int sequenceNumber2 = 3; + identifier.setSequenceNumber(sequenceNumber2); + + // update rm-token + RouterStoreToken updateToken = RouterStoreToken.newInstance(identifier, renewDate2); + RouterRMTokenRequest updateTokenRequest = RouterRMTokenRequest.newInstance(updateToken); + RouterRMTokenResponse updateTokenResponse = stateStore.updateStoredToken(updateTokenRequest); + + Assert.assertNotNull(updateTokenResponse); + RouterStoreToken updateTokenResp = updateTokenResponse.getRouterStoreToken(); + Assert.assertNotNull(updateTokenResp); + Assert.assertEquals(updateToken.getRenewDate(), updateTokenResp.getRenewDate()); + Assert.assertEquals(updateToken.getTokenIdentifier(), updateTokenResp.getTokenIdentifier()); + } + + @Test + public void testRemoveStoredToken() throws IOException, YarnException { + // prepare saveToken parameters + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner3"), new Text("renewer3"), new Text("realuser3")); + int sequenceNumber = 3; + identifier.setSequenceNumber(sequenceNumber); + Long renewDate = Time.now(); + + // store new rm-token + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + Assert.assertNotNull(routerRMTokenResponse); + + // remove rm-token + RouterRMTokenResponse removeTokenResponse = stateStore.removeStoredToken(request); + Assert.assertNotNull(removeTokenResponse); + RouterStoreToken removeTokenResp = removeTokenResponse.getRouterStoreToken(); + Assert.assertNotNull(removeTokenResp); + Assert.assertEquals(removeTokenResp.getRenewDate(), storeToken.getRenewDate()); + Assert.assertEquals(removeTokenResp.getTokenIdentifier(), storeToken.getTokenIdentifier()); + } + + @Test + public void testGetTokenByRouterStoreToken() throws IOException, YarnException { + // prepare saveToken parameters + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner4"), new Text("renewer4"), new Text("realuser4")); + int sequenceNumber = 4; + identifier.setSequenceNumber(sequenceNumber); + Long renewDate = Time.now(); + + // store new rm-token + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + Assert.assertNotNull(routerRMTokenResponse); + + // getTokenByRouterStoreToken + RouterRMTokenResponse getRouterRMTokenResp = stateStore.getTokenByRouterStoreToken(request); + Assert.assertNotNull(getRouterRMTokenResp); + RouterStoreToken getStoreTokenResp = getRouterRMTokenResp.getRouterStoreToken(); + Assert.assertNotNull(getStoreTokenResp); + Assert.assertEquals(getStoreTokenResp.getRenewDate(), storeToken.getRenewDate()); + Assert.assertEquals(getStoreTokenResp.getTokenIdentifier(), storeToken.getTokenIdentifier()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index 6f5f19877c..4d405a1cd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -572,4 +572,24 @@ public void testGetMasterKeyByDelegationKey() throws YarnException, IOException public void testRemoveStoredMasterKey() throws YarnException, IOException { super.testRemoveStoredMasterKey(); } + + @Test(expected = NotImplementedException.class) + public void testStoreNewToken() throws IOException, YarnException { + super.testStoreNewToken(); + } + + @Test(expected = NotImplementedException.class) + public void testUpdateStoredToken() throws IOException, YarnException { + super.testUpdateStoredToken(); + } + + @Test(expected = NotImplementedException.class) + public void testRemoveStoredToken() throws IOException, YarnException { + super.testRemoveStoredToken(); + } + + @Test(expected = NotImplementedException.class) + public void testGetTokenByRouterStoreToken() throws IOException, YarnException { + super.testGetTokenByRouterStoreToken(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index 4571371eb6..18396cb821 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -185,4 +185,24 @@ public void testGetMasterKeyByDelegationKey() throws YarnException, IOException public void testRemoveStoredMasterKey() throws YarnException, IOException { super.testRemoveStoredMasterKey(); } + + @Test(expected = NotImplementedException.class) + public void testStoreNewToken() throws IOException, YarnException { + super.testStoreNewToken(); + } + + @Test(expected = NotImplementedException.class) + public void testUpdateStoredToken() throws IOException, YarnException { + super.testUpdateStoredToken(); + } + + @Test(expected = NotImplementedException.class) + public void testRemoveStoredToken() throws IOException, YarnException { + super.testRemoveStoredToken(); + } + + @Test(expected = NotImplementedException.class) + public void testGetTokenByRouterStoreToken() throws IOException, YarnException { + super.testGetTokenByRouterStoreToken(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/records/TestFederationProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/records/TestFederationProtocolRecords.java index 174b428852..6398a3dac8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/records/TestFederationProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/records/TestFederationProtocolRecords.java @@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterRegisterResponseProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateApplicationHomeSubClusterRequestProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateApplicationHomeSubClusterResponseProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenRequestProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenResponseProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetReservationHomeSubClusterRequestProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterMasterKeyProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterMasterKeyRequestProto; @@ -84,6 +87,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterMasterKeyPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterMasterKeyRequestPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterMasterKeyResponsePBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterStoreTokenPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterRMTokenRequestPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterRMTokenResponsePBImpl; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.ApplicationHomeSubClusterPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.GetReservationHomeSubClusterRequestPBImpl; import org.apache.hadoop.yarn.server.records.Version; @@ -104,6 +111,8 @@ public static void setup() throws Exception { generateByNewInstance(ApplicationHomeSubCluster.class); generateByNewInstance(SubClusterPolicyConfiguration.class); generateByNewInstance(RouterMasterKey.class); + generateByNewInstance(YARNDelegationTokenIdentifier.class); + generateByNewInstance(RouterStoreToken.class); generateByNewInstance(ReservationId.class); } @@ -291,6 +300,21 @@ public void testRouterMasterKeyResponse() throws Exception { validatePBImplRecord(RouterMasterKeyResponsePBImpl.class, RouterMasterKeyResponseProto.class); } + @Test + public void testRouterStoreToken() throws Exception { + validatePBImplRecord(RouterStoreTokenPBImpl.class, RouterStoreTokenProto.class); + } + + @Test + public void testRouterRMTokenRequest() throws Exception { + validatePBImplRecord(RouterRMTokenRequestPBImpl.class, RouterRMTokenRequestProto.class); + } + + @Test + public void testRouterRMTokenResponse() throws Exception { + validatePBImplRecord(RouterRMTokenResponsePBImpl.class, RouterRMTokenResponseProto.class); + } + @Test public void testApplicationHomeSubCluster() throws Exception { validatePBImplRecord(ApplicationHomeSubClusterPBImpl.class, @@ -302,4 +326,4 @@ public void testGetReservationHomeSubClusterRequest() throws Exception { validatePBImplRecord(GetReservationHomeSubClusterRequestPBImpl.class, GetReservationHomeSubClusterRequestProto.class); } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java index 1bfa6b90ff..92dd426f51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -26,10 +26,15 @@ import java.util.HashSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -37,6 +42,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -269,4 +277,97 @@ public void testRemoveStoredMasterKey() throws YarnException, IOException { federationStateStore.getRouterRMSecretManagerState(); Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); } + + @Test + public void testStoreNewToken() throws YarnException, IOException { + // store new rm-token + RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier( + new Text("owner1"), new Text("renewer1"), new Text("realuser1")); + int sequenceNumber = 1; + dtId1.setSequenceNumber(sequenceNumber); + Long renewDate1 = Time.now(); + facade.storeNewToken(dtId1, renewDate1); + + // get RouterStoreToken from StateStore + RouterStoreToken routerStoreToken = RouterStoreToken.newInstance(dtId1, renewDate1); + RouterRMTokenRequest rmTokenRequest = RouterRMTokenRequest.newInstance(routerStoreToken); + RouterRMTokenResponse rmTokenResponse = stateStore.getTokenByRouterStoreToken(rmTokenRequest); + Assert.assertNotNull(rmTokenResponse); + + RouterStoreToken resultStoreToken = rmTokenResponse.getRouterStoreToken(); + YARNDelegationTokenIdentifier resultTokenIdentifier = resultStoreToken.getTokenIdentifier(); + Assert.assertNotNull(resultStoreToken); + Assert.assertNotNull(resultTokenIdentifier); + Assert.assertNotNull(resultStoreToken.getRenewDate()); + + Assert.assertEquals(dtId1, resultTokenIdentifier); + Assert.assertEquals(renewDate1, resultStoreToken.getRenewDate()); + Assert.assertEquals(sequenceNumber, resultTokenIdentifier.getSequenceNumber()); + } + + @Test + public void testUpdateNewToken() throws YarnException, IOException { + // store new rm-token + RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier( + new Text("owner2"), new Text("renewer2"), new Text("realuser2")); + int sequenceNumber = 2; + dtId1.setSequenceNumber(sequenceNumber); + Long renewDate1 = Time.now(); + facade.storeNewToken(dtId1, renewDate1); + + Long renewDate2 = Time.now(); + int sequenceNumber2 = 3; + dtId1.setSequenceNumber(sequenceNumber2); + facade.updateStoredToken(dtId1, renewDate2); + + // get RouterStoreToken from StateStore + RouterStoreToken routerStoreToken = RouterStoreToken.newInstance(dtId1, renewDate1); + RouterRMTokenRequest rmTokenRequest = RouterRMTokenRequest.newInstance(routerStoreToken); + RouterRMTokenResponse rmTokenResponse = stateStore.getTokenByRouterStoreToken(rmTokenRequest); + Assert.assertNotNull(rmTokenResponse); + + RouterStoreToken resultStoreToken = rmTokenResponse.getRouterStoreToken(); + YARNDelegationTokenIdentifier resultTokenIdentifier = resultStoreToken.getTokenIdentifier(); + Assert.assertNotNull(resultStoreToken); + Assert.assertNotNull(resultTokenIdentifier); + Assert.assertNotNull(resultStoreToken.getRenewDate()); + + Assert.assertEquals(dtId1, resultTokenIdentifier); + Assert.assertEquals(renewDate2, resultStoreToken.getRenewDate()); + Assert.assertEquals(sequenceNumber2, resultTokenIdentifier.getSequenceNumber()); + } + + @Test + public void testRemoveStoredToken() throws Exception { + // store new rm-token + RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier( + new Text("owner3"), new Text("renewer3"), new Text("realuser3")); + int sequenceNumber = 3; + dtId1.setSequenceNumber(sequenceNumber); + Long renewDate1 = Time.now(); + facade.storeNewToken(dtId1, renewDate1); + + // get RouterStoreToken from StateStore + RouterStoreToken routerStoreToken = RouterStoreToken.newInstance(dtId1, renewDate1); + RouterRMTokenRequest rmTokenRequest = RouterRMTokenRequest.newInstance(routerStoreToken); + RouterRMTokenResponse rmTokenResponse = stateStore.getTokenByRouterStoreToken(rmTokenRequest); + Assert.assertNotNull(rmTokenResponse); + + RouterStoreToken resultStoreToken = rmTokenResponse.getRouterStoreToken(); + YARNDelegationTokenIdentifier resultTokenIdentifier = resultStoreToken.getTokenIdentifier(); + Assert.assertNotNull(resultStoreToken); + Assert.assertNotNull(resultTokenIdentifier); + Assert.assertNotNull(resultStoreToken.getRenewDate()); + + Assert.assertEquals(dtId1, resultTokenIdentifier); + Assert.assertEquals(renewDate1, resultStoreToken.getRenewDate()); + Assert.assertEquals(sequenceNumber, resultTokenIdentifier.getSequenceNumber()); + + // remove rm-token + facade.removeStoredToken(dtId1); + + // Call again(getTokenByRouterStoreToken) after remove will throw IOException(not exist) + LambdaTestUtils.intercept(IOException.class, "RMDelegationToken: " + dtId1 + " does not exist.", + () -> stateStore.getTokenByRouterStoreToken(rmTokenRequest)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 1d67af926d..38f6dd46f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.List; -import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.NetUtils; @@ -79,6 +78,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.records.Version; @@ -383,19 +384,43 @@ public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster( @Override public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + return stateStoreClient.storeNewMasterKey(request); } @Override public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + return stateStoreClient.removeStoredMasterKey(request); } @Override public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + return stateStoreClient.getMasterKeyByDelegationKey(request); + } + + @Override + public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) + throws YarnException, IOException { + return stateStoreClient.storeNewToken(request); + } + + @Override + public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + return stateStoreClient.updateStoredToken(request); + } + + @Override + public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) + throws YarnException, IOException { + return stateStoreClient.removeStoredToken(request); + } + + @Override + public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) + throws YarnException, IOException { + return stateStoreClient.getTokenByRouterStoreToken(request); } /**