host = getHostNameFromConfigKeys(conf,
OZONE_OM_ADDRESS_KEY);
- return NetUtils.createSocketAddr(
- host.orElse(OZONE_OM_BIND_HOST_DEFAULT) + ":" +
- getOmRpcPort(conf));
+ return host.orElse(OZONE_OM_BIND_HOST_DEFAULT) + ":" +
+ getOmRpcPort(conf);
}
/**
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 5917a11494..867a3e160f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -184,4 +184,17 @@ private OMConfigKeys() {
"ozone.om.http.kerberos.keytab.file";
public static final String OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY
= "ozone.om.http.kerberos.principal";
+ // Delegation token related keys
+ public static final String DELEGATION_REMOVER_SCAN_INTERVAL_KEY =
+ "ozone.manager.delegation.remover.scan.interval";
+ public static final long DELEGATION_REMOVER_SCAN_INTERVAL_DEFAULT =
+ 60*60*1000;
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ "ozone.manager.delegation.token.renew-interval";
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day = 86400000 ms
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ "ozone.manager.delegation.token.max-lifetime";
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ 7*24*60*60*1000; // 7 days
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index e4bfc83921..337fd227be 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -41,7 +41,7 @@
*/
@KerberosInfo(
serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
-public interface OzoneManagerProtocol {
+public interface OzoneManagerProtocol extends OzoneManagerSecurityProtocol {
/**
* Creates a volume.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerSecurityProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerSecurityProtocol.java
new file mode 100644
index 0000000000..15873d0b97
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerSecurityProtocol.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocol;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Security protocol for a secure OzoneManager.
+ */
+@KerberosInfo(
+ serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
+public interface OzoneManagerSecurityProtocol {
+
+ /**
+ * Get a valid Delegation Token.
+ *
+ * @param renewer the designated renewer for the token
+ * @return Token
+ * @throws IOException
+ */
+ @Idempotent
+ Token getDelegationToken(Text renewer)
+ throws IOException;
+
+ /**
+ * Renew an existing delegation token.
+ *
+ * @param token delegation token obtained earlier
+ * @return the new expiration time
+ * @throws IOException
+ */
+ @Idempotent
+ long renewDelegationToken(Token token)
+ throws IOException;
+
+ /**
+ * Cancel an existing delegation token.
+ *
+ * @param token delegation token
+ * @throws IOException
+ */
+ @Idempotent
+ void cancelDelegationToken(Token token)
+ throws IOException;
+
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 460fcb3816..2a54cb8d34 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -23,6 +23,7 @@
import com.google.protobuf.ServiceException;
import java.util.ArrayList;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
@@ -168,6 +169,18 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.GetDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
+import org.apache.hadoop.security.token.Token;
/**
* The client side implementation of OzoneManagerProtocol.
@@ -1107,4 +1120,90 @@ public List getServiceList() throws IOException {
+ resp.getStatus());
}
}
+
+ /**
+ * Get a valid Delegation Token.
+ *
+ * @param renewer the designated renewer for the token
+ * @return Token
+ * @throws IOException
+ */
+ @Override
+ public Token getDelegationToken(Text renewer)
+ throws IOException {
+ GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto
+ .newBuilder()
+ .setRenewer(renewer == null ? "" : renewer.toString())
+ .build();
+
+ OMRequest omRequest = createOMRequest(Type.GetDelegationToken)
+ .setGetDelegationTokenRequest(req)
+ .build();
+
+ final GetDelegationTokenResponseProto resp = submitRequest(omRequest)
+ .getGetDelegationTokenResponse();
+ if (resp.getStatus() == Status.OK) {
+ return resp.getResponse().hasToken() ?
+ OMPBHelper.convertToDelegationToken(resp.getResponse().getToken())
+ : null;
+ } else {
+ throw new IOException("Get Delegation Token failed, error : " + resp
+ .getStatus());
+ }
+ }
+
+ /**
+ * Renew an existing delegation token.
+ *
+ * @param token delegation token obtained earlier
+ * @return the new expiration time
+ * @throws IOException
+ */
+ @Override
+ public long renewDelegationToken(Token token)
+ throws IOException {
+ RenewDelegationTokenRequestProto req =
+ RenewDelegationTokenRequestProto.newBuilder().
+ setToken(OMPBHelper.convertToTokenProto(token)).
+ build();
+
+ OMRequest omRequest = createOMRequest(Type.RenewDelegationToken)
+ .setRenewDelegationTokenRequest(req)
+ .build();
+
+ final RenewDelegationTokenResponseProto resp = submitRequest(omRequest)
+ .getRenewDelegationTokenResponse();
+ if (resp.getStatus() == Status.OK) {
+ return resp.getResponse().getNewExpiryTime();
+ } else {
+ throw new IOException("Renew Delegation Token failed, error : " + resp
+ .getStatus());
+ }
+ }
+
+ /**
+ * Cancel an existing delegation token.
+ *
+ * @param token delegation token
+ * @throws IOException
+ */
+ @Override
+ public void cancelDelegationToken(Token token)
+ throws IOException {
+ CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
+ .newBuilder()
+ .setToken(OMPBHelper.convertToTokenProto(token))
+ .build();
+
+ OMRequest omRequest = createOMRequest(Type.CancelDelegationToken)
+ .setCancelDelegationTokenRequest(req)
+ .build();
+
+ final CancelDelegationTokenResponseProto resp = submitRequest(omRequest)
+ .getCancelDelegationTokenResponse();
+ if (resp.getStatus() != Status.OK) {
+ throw new IOException("Cancel Delegation Token failed, error : " + resp
+ .getStatus());
+ }
+ }
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
index 27e8f226ba..175527b482 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
@@ -23,6 +23,8 @@
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.OzoneManagerService;
import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
/**
* Protocol used to communicate with OM.
@@ -32,6 +34,7 @@
protocolVersion = 1)
@KerberosInfo(
serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
+@TokenInfo(OzoneDelegationTokenSelector.class)
@InterfaceAudience.Private
public interface OzoneManagerProtocolPB
extends OzoneManagerService.BlockingInterface {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
index d57d32e0c4..df069ce544 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.protocolPB;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.OzoneAclInfo;
@@ -24,6 +26,9 @@
.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
/**
* Utilities for converting protobuf classes.
@@ -110,4 +115,37 @@ public static OzoneAcl convertOzoneAcl(OzoneAclInfo aclInfo) {
return new OzoneAcl(aclType, aclInfo.getName(), aclRights);
}
+
+ /**
+ * Converts Ozone delegation token to @{@link TokenProto}.
+ * @return tokenProto
+ */
+ public static TokenProto convertToTokenProto(Token> tok) {
+ if(tok == null){
+ throw new IllegalArgumentException("Invalid argument: token is null");
+ }
+
+ return TokenProto.newBuilder().
+ setIdentifier(getByteString(tok.getIdentifier())).
+ setPassword(getByteString(tok.getPassword())).
+ setKind(tok.getKind().toString()).
+ setService(tok.getService().toString()).build();
+ }
+
+ public static ByteString getByteString(byte[] bytes) {
+ // return singleton to reduce object allocation
+ return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
+ }
+
+ /**
+ * Converts @{@link TokenProto} to Ozone delegation token.
+ *
+ * @return Ozone
+ */
+ public static Token convertToDelegationToken(
+ TokenProto tokenProto) {
+ return new Token<>(tokenProto.getIdentifier()
+ .toByteArray(), tokenProto.getPassword().toByteArray(), new Text(
+ tokenProto.getKind()), new Text(tokenProto.getService()));
+ }
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSelector.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSelector.java
index 0d63c4ec65..8e35f22185 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSelector.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSelector.java
@@ -19,7 +19,6 @@
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -32,7 +31,7 @@
*/
@InterfaceAudience.Private
public class OzoneDelegationTokenSelector
- extends AbstractDelegationTokenSelector {
+ extends AbstractDelegationTokenSelector {
public OzoneDelegationTokenSelector() {
super(OzoneTokenIdentifier.KIND_NAME);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java
new file mode 100644
index 0000000000..0c84404d65
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java
@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.security;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.SignatureException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.security.OzoneSecretStore.OzoneManagerSecretState;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier.TokenInfo;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SecretManager for Ozone Master. Responsible for signing identifiers with
+ * private key,
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class OzoneSecretManager
+ extends SecretManager {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(OzoneSecretManager.class);
+ /**
+ * The name of the Private/Public Key based hashing algorithm.
+ */
+ private static final String DEFAULT_SIGNATURE_ALGORITHM = "SHA256withRSA";
+ private final long tokenMaxLifetime;
+ private final long tokenRenewInterval;
+ private final long tokenRemoverScanInterval;
+ private final Text service;
+ private final Map allKeys;
+ private final Map currentTokens;
+ private final OzoneSecretStore store;
+ private Thread tokenRemoverThread;
+ private volatile boolean running;
+ private AtomicInteger tokenSequenceNumber;
+ private OzoneSecretKey currentKey;
+ private AtomicInteger currentKeyId;
+ /**
+ * If the delegation token update thread holds this lock, it will not get
+ * interrupted.
+ */
+ private Object noInterruptsLock = new Object();
+ private int maxKeyLength;
+
+ /**
+ * Create a secret manager.
+ *
+ * @param conf configuration.
+ * @param tokenMaxLifetime the maximum lifetime of the delegation tokens in
+ * milliseconds
+ * @param tokenRenewInterval how often the tokens must be renewed in
+ * milliseconds
+ * @param dtRemoverScanInterval how often the tokens are scanned for expired
+ * tokens in milliseconds
+ */
+ public OzoneSecretManager(OzoneConfiguration conf, long tokenMaxLifetime,
+ long tokenRenewInterval, long dtRemoverScanInterval, Text service)
+ throws IOException {
+ this.tokenMaxLifetime = tokenMaxLifetime;
+ this.tokenRenewInterval = tokenRenewInterval;
+ this.tokenRemoverScanInterval = dtRemoverScanInterval;
+
+ currentTokens = new ConcurrentHashMap();
+ allKeys = new ConcurrentHashMap<>();
+ currentKeyId = new AtomicInteger();
+ tokenSequenceNumber = new AtomicInteger();
+ this.store = new OzoneSecretStore(conf);
+ loadTokenSecretState(store.loadState());
+ this.service = service;
+ this.maxKeyLength = conf.getInt(OzoneConfigKeys.OZONE_MAX_KEY_LEN,
+ OzoneConfigKeys.OZONE_MAX_KEY_LEN_DEFAULT);
+ }
+
+ @Override
+ public T createIdentifier() {
+ return (T) T.newInstance();
+ }
+
+ /**
+ * Create new Identifier with given,owner,renwer and realUser.
+ *
+ * @return T
+ */
+ public T createIdentifier(Text owner, Text renewer, Text realUser) {
+ return (T) T.newInstance(owner, renewer, realUser);
+ }
+
+ /**
+ * Returns {@link Token} for given identifier.
+ *
+ * @param owner
+ * @param renewer
+ * @param realUser
+ * @return Token
+ * @throws IOException to allow future exceptions to be added without breaking
+ * compatibility
+ */
+ public Token createToken(Text owner, Text renewer, Text realUser)
+ throws IOException {
+ T identifier = createIdentifier(owner, renewer, realUser);
+ updateIdentifierDetails(identifier);
+
+ byte[] password = createPassword(identifier.getBytes(),
+ currentKey.getPrivateKey());
+ addToTokenStore(identifier, password);
+ Token token = new Token<>(identifier.getBytes(), password,
+ identifier.getKind(), service);
+ if (LOG.isTraceEnabled()) {
+ long expiryTime = identifier.getIssueDate() + tokenRenewInterval;
+ String tokenId = identifier.toStringStable();
+ LOG.trace("Issued delegation token -> expiryTime:{},tokenId:{}",
+ expiryTime, tokenId);
+ }
+
+ return token;
+ }
+
+ /**
+ * Stores given identifier in token store.
+ *
+ * @param identifier
+ * @param password
+ * @throws IOException
+ */
+ private void addToTokenStore(T identifier, byte[] password)
+ throws IOException {
+ TokenInfo tokenInfo = new TokenInfo(identifier.getIssueDate()
+ + tokenRenewInterval, password, identifier.getTrackingId());
+ currentTokens.put(identifier, tokenInfo);
+ store.storeToken(identifier, tokenInfo.getRenewDate());
+ }
+
+ /**
+ * Updates issue date, master key id and sequence number for identifier.
+ *
+ * @param identifier the identifier to validate
+ */
+ private void updateIdentifierDetails(T identifier) {
+ int sequenceNum;
+ long now = Time.monotonicNow();
+ sequenceNum = incrementDelegationTokenSeqNum();
+ identifier.setIssueDate(now);
+ identifier.setMasterKeyId(currentKey.getKeyId());
+ identifier.setSequenceNumber(sequenceNum);
+ identifier.setMaxDate(Time.monotonicNow() + tokenMaxLifetime);
+ }
+
+ /**
+ * Compute HMAC of the identifier using the private key and return the output
+ * as password.
+ *
+ * @param identifier
+ * @param privateKey
+ * @return byte[] signed byte array
+ */
+ public byte[] createPassword(byte[] identifier, PrivateKey privateKey)
+ throws OzoneSecurityException {
+ try {
+ Signature rsaSignature = Signature.getInstance(
+ DEFAULT_SIGNATURE_ALGORITHM);
+ rsaSignature.initSign(privateKey);
+ rsaSignature.update(identifier);
+ return rsaSignature.sign();
+ } catch (InvalidKeyException | NoSuchAlgorithmException |
+ SignatureException ex) {
+ throw new OzoneSecurityException("Error while creating HMAC hash for " +
+ "token.", ex, OzoneSecurityException.ResultCodes
+ .SECRET_MANAGER_HMAC_ERROR);
+ }
+ }
+
+ @Override
+ public byte[] createPassword(T identifier) {
+ LOG.debug("Creating password for identifier: {}, currentKey: {}",
+ formatTokenId(identifier), currentKey.getKeyId());
+ byte[] password = null;
+ try {
+ password = createPassword(identifier.getBytes(),
+ currentKey.getPrivateKey());
+ } catch (IOException ioe) {
+ LOG.error("Could not store token {}!!", formatTokenId(identifier),
+ ioe);
+ }
+ return password;
+ }
+
+ @Override
+ public byte[] retrievePassword(T identifier) throws InvalidToken {
+ return checkToken(identifier).getPassword();
+ }
+
+ /**
+ * Renew a delegation token.
+ *
+ * @param token the token to renew
+ * @param renewer the full principal name of the user doing the renewal
+ * @return the new expiration time
+ * @throws InvalidToken if the token is invalid
+ * @throws AccessControlException if the user can't renew token
+ */
+ public synchronized long renewToken(Token token, String renewer)
+ throws IOException {
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ T id = (T) T.readProtoBuf(in);
+ LOG.debug("Token renewal for identifier: {}, total currentTokens: {}",
+ formatTokenId(id), currentTokens.size());
+
+ long now = Time.monotonicNow();
+ if (id.getMaxDate() < now) {
+ throw new InvalidToken(renewer + " tried to renew an expired token "
+ + formatTokenId(id) + " max expiration date: "
+ + Time.formatTime(id.getMaxDate())
+ + " currentTime: " + Time.formatTime(now));
+ }
+ checkToken(id);
+ if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {
+ throw new AccessControlException(renewer +
+ " tried to renew a token " + formatTokenId(id)
+ + " without a renewer");
+ }
+ if (!id.getRenewer().toString().equals(renewer)) {
+ throw new AccessControlException(renewer
+ + " tries to renew a token " + formatTokenId(id)
+ + " with non-matching renewer " + id.getRenewer());
+ }
+ OzoneSecretKey key = allKeys.get(id.getMasterKeyId());
+ if (key == null) {
+ throw new InvalidToken("Unable to find master key for keyId="
+ + id.getMasterKeyId()
+ + " from cache. Failed to renew an unexpired token "
+ + formatTokenId(id) + " with sequenceNumber="
+ + id.getSequenceNumber());
+ }
+ byte[] password = createPassword(token.getIdentifier(),
+ key.getPrivateKey());
+
+ long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
+ try {
+ addToTokenStore(id, password);
+ } catch (IOException e) {
+ LOG.error("Unable to update token " + id.getSequenceNumber(), e);
+ }
+ return renewTime;
+ }
+
+ /**
+ * Cancel a token by removing it from store and cache.
+ *
+ * @return Identifier of the canceled token
+ * @throws InvalidToken for invalid token
+ * @throws AccessControlException if the user isn't allowed to cancel
+ */
+ public T cancelToken(Token token, String canceller) throws IOException {
+ T id = (T) T.readProtoBuf(token.getIdentifier());
+ LOG.debug("Token cancellation requested for identifier: {}",
+ formatTokenId(id));
+
+ if (id.getUser() == null) {
+ throw new InvalidToken("Token with no owner " + formatTokenId(id));
+ }
+ String owner = id.getUser().getUserName();
+ Text renewer = id.getRenewer();
+ HadoopKerberosName cancelerKrbName = new HadoopKerberosName(canceller);
+ String cancelerShortName = cancelerKrbName.getShortName();
+ if (!canceller.equals(owner)
+ && (renewer == null || renewer.toString().isEmpty()
+ || !cancelerShortName
+ .equals(renewer.toString()))) {
+ throw new AccessControlException(canceller
+ + " is not authorized to cancel the token " + formatTokenId(id));
+ }
+ try {
+ store.removeToken(id);
+ } catch (IOException e) {
+ LOG.error("Unable to remove token " + id.getSequenceNumber(), e);
+ }
+ TokenInfo info = currentTokens.remove(id);
+ if (info == null) {
+ throw new InvalidToken("Token not found " + formatTokenId(id));
+ }
+ return id;
+ }
+
+ public int getCurrentKeyId() {
+ return currentKeyId.get();
+ }
+
+ public void setCurrentKeyId(int keyId) {
+ currentKeyId.set(keyId);
+ }
+
+ public int incrementCurrentKeyId() {
+ return currentKeyId.incrementAndGet();
+ }
+
+ public int getDelegationTokenSeqNum() {
+ return tokenSequenceNumber.get();
+ }
+
+ public void setDelegationTokenSeqNum(int seqNum) {
+ tokenSequenceNumber.set(seqNum);
+ }
+
+ public int incrementDelegationTokenSeqNum() {
+ return tokenSequenceNumber.incrementAndGet();
+ }
+
+ /**
+ * Validates if given token is valid.
+ *
+ * @param identifier
+ * @param password
+ */
+ private boolean validateToken(T identifier, byte[] password) {
+ try {
+ Signature rsaSignature = Signature.getInstance("SHA256withRSA");
+ rsaSignature.initVerify(currentKey.getPublicKey());
+ rsaSignature.update(identifier.getBytes());
+ return rsaSignature.verify(password);
+ } catch (NoSuchAlgorithmException | SignatureException |
+ InvalidKeyException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Checks if TokenInfo for the given identifier exists in database and if the
+ * token is expired.
+ */
+ public TokenInfo checkToken(T identifier) throws InvalidToken {
+ TokenInfo info = currentTokens.get(identifier);
+ if (info == null) {
+ throw new InvalidToken("token " + formatTokenId(identifier)
+ + " can't be found in cache");
+ }
+ long now = Time.monotonicNow();
+ if (info.getRenewDate() < now) {
+ throw new InvalidToken("token " + formatTokenId(identifier) + " is " +
+ "expired, current time: " + Time.formatTime(now) +
+ " expected renewal time: " + Time.formatTime(info.getRenewDate()));
+ }
+ if (!validateToken(identifier, info.getPassword())) {
+ throw new InvalidToken("Tampared/Inavalid token.");
+ }
+ return info;
+ }
+
+ // TODO: handle roll private key/certificate
+ private synchronized void removeExpiredKeys() {
+ long now = Time.monotonicNow();
+ for (Iterator> it = allKeys.entrySet()
+ .iterator(); it.hasNext();) {
+ Map.Entry e = it.next();
+ OzoneSecretKey key = e.getValue();
+ if (key.getExpiryDate() < now && key.getExpiryDate() != -1) {
+ if (!key.equals(currentKey)) {
+ it.remove();
+ try {
+ store.removeTokenMasterKey(key);
+ } catch (IOException ex) {
+ LOG.error("Unable to remove master key " + key.getKeyId(), ex);
+ }
+ }
+ }
+ }
+ }
+
+ private void loadTokenSecretState(OzoneManagerSecretState state)
+ throws IOException {
+ LOG.info("Loading token state into token manager.");
+ for (OzoneSecretKey key : state.ozoneManagerSecretState()) {
+ allKeys.putIfAbsent(key.getKeyId(), key);
+ }
+ for (Map.Entry entry : state.getTokenState().entrySet()) {
+ addPersistedDelegationToken(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private String formatTokenId(T id) {
+ return "(" + id + ")";
+ }
+
+ private void addPersistedDelegationToken(
+ T identifier, long renewDate)
+ throws IOException {
+ if (running) {
+ // a safety check
+ throw new IOException(
+ "Can't add persisted delegation token to a running SecretManager.");
+ }
+ int keyId = identifier.getMasterKeyId();
+ OzoneSecretKey dKey = allKeys.get(keyId);
+ if (dKey == null) {
+ LOG.warn("No KEY found for persisted identifier "
+ + formatTokenId(identifier));
+ return;
+ }
+
+ PrivateKey privateKey = dKey.getPrivateKey();
+ byte[] password = createPassword(identifier.getBytes(), privateKey);
+ if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
+ setDelegationTokenSeqNum(identifier.getSequenceNumber());
+ }
+ if (currentTokens.get(identifier) == null) {
+ currentTokens.put(identifier, new TokenInfo(renewDate,
+ password, identifier.getTrackingId()));
+ } else {
+ throw new IOException("Same delegation token being added twice: "
+ + formatTokenId(identifier));
+ }
+ }
+
+ /**
+ * Should be called before this object is used.
+ */
+ public void startThreads(KeyPair keyPair) throws IOException {
+ Preconditions.checkState(!running);
+ updateCurrentKey(keyPair);
+ removeExpiredKeys();
+ synchronized (this) {
+ running = true;
+ tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+ tokenRemoverThread.start();
+ }
+ }
+
+ public void stopThreads() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping expired delegation token remover thread");
+ }
+ running = false;
+
+ if (tokenRemoverThread != null) {
+ synchronized (noInterruptsLock) {
+ tokenRemoverThread.interrupt();
+ }
+ try {
+ tokenRemoverThread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Unable to join on token removal thread", e);
+ }
+ }
+ }
+
+ /**
+ * Stops the OzoneSecretManager.
+ *
+ * @throws IOException
+ */
+ public void stop() throws IOException {
+ stopThreads();
+ if (this.store != null) {
+ this.store.close();
+ }
+ }
+
+ /**
+ * Update the current master key. This is called once by startThreads before
+ * tokenRemoverThread is created,
+ */
+ private void updateCurrentKey(KeyPair keyPair) throws IOException {
+ LOG.info("Updating the current master key for generating tokens");
+
+ // TODO: fix me based on the certificate expire time to set the key
+ // expire time.
+ int newCurrentId = incrementCurrentKeyId();
+ OzoneSecretKey newKey = new OzoneSecretKey(newCurrentId, -1,
+ keyPair, maxKeyLength);
+
+ store.storeTokenMasterKey(newKey);
+ if (!allKeys.containsKey(newKey.getKeyId())) {
+ allKeys.put(newKey.getKeyId(), newKey);
+ }
+
+ synchronized (this) {
+ currentKey = newKey;
+ }
+ }
+
+ /**
+ * Remove expired delegation tokens from cache and persisted store.
+ */
+ private void removeExpiredToken() throws IOException {
+ long now = Time.monotonicNow();
+ synchronized (this) {
+ Iterator> i = currentTokens.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry entry = i.next();
+ long renewDate = entry.getValue().getRenewDate();
+ if (renewDate < now) {
+ i.remove();
+ store.removeToken(entry.getKey());
+ }
+ }
+ }
+ }
+
+ /**
+ * Is Secret Manager running.
+ *
+ * @return true if secret mgr is running
+ */
+ public synchronized boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * Returns expiry time of a token given its identifier.
+ *
+ * @param dtId DelegationTokenIdentifier of a token
+ * @return Expiry time of the token
+ * @throws IOException
+ */
+ public long getTokenExpiryTime(T dtId)
+ throws IOException {
+ TokenInfo info = currentTokens.get(dtId);
+ if (info != null) {
+ return info.getRenewDate();
+ } else {
+ throw new IOException("No delegation token found for this identifier");
+ }
+ }
+
+ private class ExpiredTokenRemover extends Thread {
+ private long lastTokenCacheCleanup;
+
+ @Override
+ public void run() {
+ LOG.info("Starting expired delegation token remover thread, "
+ + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+ / (60 * 1000) + " min(s)");
+ try {
+ while (running) {
+ long now = Time.monotonicNow();
+ if (lastTokenCacheCleanup + tokenRemoverScanInterval
+ < now) {
+ removeExpiredToken();
+ lastTokenCacheCleanup = now;
+ }
+ try {
+ Thread.sleep(Math.min(5000,
+ tokenRemoverScanInterval)); // 5 seconds
+ } catch (InterruptedException ie) {
+ LOG.error("ExpiredTokenRemover received " + ie);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("ExpiredTokenRemover thread received unexpected exception",
+ t);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+ }
+}
+
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java
new file mode 100644
index 0000000000..6528bcff06
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MANAGER_TOKEN_DB_NAME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_MB;
+
+/**
+ * SecretStore for Ozone Master.
+ */
+public class OzoneSecretStore
+ implements Closeable {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(OzoneSecretStore.class);
+ private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
+ private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
+
+ @Override
+ public void close() throws IOException {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+
+ /**
+ * Support class to maintain state of OzoneSecretStore.
+ */
+ public static class OzoneManagerSecretState {
+
+ private Map tokenState = new HashMap<>();
+ private Set tokenMasterKeyState = new HashSet<>();
+
+ public Map getTokenState() {
+ return tokenState;
+ }
+
+ public Set ozoneManagerSecretState() {
+ return tokenMasterKeyState;
+ }
+ }
+
+ private MetadataStore store;
+
+ public OzoneSecretStore(OzoneConfiguration conf)
+ throws IOException {
+ File metaDir = getOzoneMetaDirPath(conf);
+ final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
+ OZONE_OM_DB_CACHE_SIZE_DEFAULT);
+ File omTokenDBFile = new File(metaDir.getPath(),
+ OZONE_MANAGER_TOKEN_DB_NAME);
+ this.store = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(omTokenDBFile)
+ .setCacheSize(cacheSize * OzoneConsts.MB)
+ .build();
+ }
+
+ public OzoneManagerSecretState loadState() throws IOException {
+ OzoneManagerSecretState state = new OzoneManagerSecretState();
+ int numKeys = loadMasterKeys(state);
+ LOG.info("Loaded " + numKeys + " token master keys");
+ int numTokens = loadTokens(state);
+ LOG.info("Loaded " + numTokens + " tokens");
+ return state;
+ }
+
+ public void storeTokenMasterKey(OzoneSecretKey key) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing master key " + key.getKeyId());
+ }
+ ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+ DataOutputStream dataStream = new DataOutputStream(memStream);
+ try {
+ key.write(dataStream);
+ dataStream.close();
+ dataStream = null;
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, dataStream);
+ }
+ try {
+ byte[] dbKey = getMasterKeyDBKey(key);
+ store.put(dbKey, memStream.toByteArray());
+ } catch (IOException e) {
+ LOG.error("Unable to store master key " + key.getKeyId(), e);
+ throw e;
+ }
+ }
+
+
+ public void removeTokenMasterKey(OzoneSecretKey key)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing master key " + key.getKeyId());
+ }
+
+ byte[] dbKey = getMasterKeyDBKey(key);
+ try {
+ store.delete(dbKey);
+ } catch (IOException e) {
+ LOG.error("Unable to delete master key " + key.getKeyId(), e);
+ throw e;
+ }
+ }
+
+ public void storeToken(T tokenId, Long renewDate)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token " + tokenId.getSequenceNumber());
+ }
+
+ ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+ DataOutputStream dataStream = new DataOutputStream(memStream);
+ try {
+ tokenId.write(dataStream);
+ dataStream.writeLong(renewDate);
+ dataStream.close();
+ dataStream = null;
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, dataStream);
+ }
+
+ byte[] dbKey = getTokenDBKey(tokenId);
+ try {
+ store.put(dbKey, memStream.toByteArray());
+ } catch (IOException e) {
+ LOG.error("Unable to store token " + tokenId.toString(), e);
+ throw e;
+ }
+ }
+
+ public void updateToken(T tokenId, Long renewDate)
+ throws IOException {
+ storeToken(tokenId, renewDate);
+ }
+
+ public void removeToken(T tokenId)
+ throws IOException {
+ byte[] dbKey = getTokenDBKey(tokenId);
+ try {
+ store.delete(dbKey);
+ } catch (IOException e) {
+ LOG.error("Unable to remove token " + tokenId.toString(), e);
+ throw e;
+ }
+ }
+
+ public int loadMasterKeys(OzoneManagerSecretState state) throws IOException {
+ MetadataKeyFilters.MetadataKeyFilter filter =
+ (preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
+ .startsWith(TOKEN_MASTER_KEY_KEY_PREFIX);
+ List> kvs = store
+ .getRangeKVs(null, Integer.MAX_VALUE, filter);
+ kvs.forEach(entry -> {
+ try {
+ loadTokenMasterKey(state, entry.getValue());
+ } catch (IOException e) {
+ LOG.warn("Failed to load master key ",
+ DFSUtil.bytes2String(entry.getKey()), e);
+ }
+ });
+ return kvs.size();
+ }
+
+ private void loadTokenMasterKey(OzoneManagerSecretState state, byte[] data)
+ throws IOException {
+ OzoneSecretKey key = OzoneSecretKey.readProtoBuf(data);
+ state.tokenMasterKeyState.add(key);
+ }
+
+ public int loadTokens(OzoneManagerSecretState state) throws IOException {
+ MetadataKeyFilters.MetadataKeyFilter filter =
+ (preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
+ .startsWith(TOKEN_STATE_KEY_PREFIX);
+ List> kvs =
+ store.getRangeKVs(null, Integer.MAX_VALUE, filter);
+ kvs.forEach(entry -> {
+ try {
+ loadToken(state, entry.getValue());
+ } catch (IOException e) {
+ LOG.warn("Failed to load token ",
+ DFSUtil.bytes2String(entry.getKey()), e);
+ }
+ });
+ return kvs.size();
+ }
+
+ private void loadToken(OzoneManagerSecretState state, byte[] data)
+ throws IOException {
+ long renewDate;
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+ T tokenId = (T) T.readProtoBuf(in);
+ try {
+ tokenId.readFields(in);
+ renewDate = in.readLong();
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, in);
+ }
+ state.tokenState.put(tokenId, renewDate);
+ }
+
+ private byte[] getMasterKeyDBKey(OzoneSecretKey masterKey) {
+ return DFSUtil.string2Bytes(
+ TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId());
+ }
+
+ private byte[] getTokenDBKey(T tokenId) {
+ return DFSUtil.string2Bytes(
+ TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber());
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecurityException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecurityException.java
new file mode 100644
index 0000000000..66533f39bb
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecurityException.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.security;
+
+import java.io.IOException;
+
+/**
+ * Security exceptions thrown at Ozone layer.
+ */
+public class OzoneSecurityException extends IOException {
+ private final OzoneSecurityException.ResultCodes result;
+
+ /**
+ * Constructs an {@code IOException} with {@code null}
+ * as its error detail message.
+ */
+ public OzoneSecurityException(OzoneSecurityException.ResultCodes result) {
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ */
+ public OzoneSecurityException(String message,
+ OzoneSecurityException.ResultCodes result) {
+ super(message);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message
+ * and cause.
+ *
+ *
Note that the detail message associated with {@code cause} is
+ * not automatically incorporated into this exception's detail
+ * message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and indicates that the
+ * cause is nonexistent or unknown.)
+ * @since 1.6
+ */
+ public OzoneSecurityException(String message, Throwable cause,
+ OzoneSecurityException.ResultCodes result) {
+ super(message, cause);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified cause and a
+ * detail message of {@code (cause==null ? null : cause.toString())}
+ * (which typically contains the class and detail message of {@code cause}).
+ * This constructor is useful for IO exceptions that are little more
+ * than wrappers for other throwables.
+ *
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and indicates that the
+ * cause is nonexistent or unknown.)
+ * @since 1.6
+ */
+ public OzoneSecurityException(Throwable cause,
+ OzoneSecurityException.ResultCodes result) {
+ super(cause);
+ this.result = result;
+ }
+
+ /**
+ * Returns resultCode.
+ * @return ResultCode
+ */
+ public OzoneSecurityException.ResultCodes getResult() {
+ return result;
+ }
+
+ /**
+ * Error codes to make it easy to decode these exceptions.
+ */
+ public enum ResultCodes {
+ OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST,
+ SECRET_MANAGER_HMAC_ERROR
+ }
+}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 643ba6d92f..cfa1e4373f 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -36,6 +36,7 @@ This is similar to Namenode for Ozone.
*/
import "hdds.proto";
+import "Security.proto";
enum Type {
CreateVolume = 11;
@@ -69,6 +70,10 @@ enum Type {
AbortMultiPartUpload = 48;
ServiceList = 51;
+
+ GetDelegationToken = 61;
+ RenewDelegationToken = 62;
+ CancelDelegationToken = 63;
}
message OMRequest {
@@ -111,6 +116,11 @@ message OMRequest {
optional MultipartUploadAbortRequest abortMultiPartUploadRequest = 48;
optional ServiceListRequest serviceListRequest = 51;
+
+ optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
+ optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
+ optional hadoop.common.CancelDelegationTokenRequestProto cancelDelegationTokenRequest = 63;
+
}
message OMResponse {
@@ -154,6 +164,10 @@ message OMResponse {
optional MultipartUploadAbortResponse abortMultiPartUploadResponse = 48;
optional ServiceListResponse ServiceListResponse = 51;
+
+ optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
+ optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
+ optional CancelDelegationTokenResponseProto cancelDelegationTokenResponse = 63;
}
enum Status {
@@ -640,6 +654,21 @@ message MultipartUploadAbortResponse {
required Status status = 1;
}
+message GetDelegationTokenResponseProto{
+ required Status status = 1;
+ optional hadoop.common.GetDelegationTokenResponseProto response = 2;
+}
+
+message RenewDelegationTokenResponseProto{
+ required Status status = 1;
+ optional hadoop.common.RenewDelegationTokenResponseProto response = 2;
+}
+
+message CancelDelegationTokenResponseProto{
+ required Status status = 1;
+ optional hadoop.common.CancelDelegationTokenResponseProto response = 2;
+}
+
/**
The OM service that takes care of Ozone namespace.
*/
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneSecretManager.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneSecretManager.java
new file mode 100644
index 0000000000..e4a8f2b40c
--- /dev/null
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneSecretManager.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.KeyPair;
+import java.security.Signature;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for {@link OzoneSecretManager}.
+ */
+public class TestOzoneSecretManager {
+
+ private OzoneSecretManager secretManager;
+ private SecurityConfig securityConfig;
+ private KeyPair keyPair;
+ private long expiryTime;
+ private Text serviceRpcAdd;
+ private OzoneConfiguration conf;
+ private static final String BASEDIR = GenericTestUtils
+ .getTempPath(TestOzoneSecretManager.class.getSimpleName());
+ private final static Text TEST_USER = new Text("testUser");
+ private long tokenMaxLifetime = 1000 * 20;
+ private long tokenRemoverScanInterval = 1000 * 20;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, BASEDIR);
+ securityConfig = new SecurityConfig(conf);
+ // Create Ozone Master key pair.
+ keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
+ expiryTime = Time.monotonicNow() + 60 * 60 * 24;
+ serviceRpcAdd = new Text("localhost");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ secretManager.stop();
+ FileUtils.deleteQuietly(new File(BASEDIR));
+ }
+
+ @Test
+ public void testCreateToken() throws Exception {
+ secretManager = createSecretManager(conf, tokenMaxLifetime,
+ expiryTime, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ OzoneTokenIdentifier identifier =
+ OzoneTokenIdentifier.readProtoBuf(token.getIdentifier());
+ // Check basic details.
+ Assert.assertTrue(identifier.getRealUser().equals(TEST_USER));
+ Assert.assertTrue(identifier.getRenewer().equals(TEST_USER));
+ Assert.assertTrue(identifier.getOwner().equals(TEST_USER));
+
+ validateHash(token.getPassword(), token.getIdentifier());
+ }
+
+ @Test
+ public void testRenewTokenSuccess() throws Exception {
+ secretManager = createSecretManager(conf, tokenMaxLifetime,
+ expiryTime, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ Thread.sleep(10 * 5);
+ long renewalTime = secretManager.renewToken(token, TEST_USER.toString());
+ Assert.assertTrue(renewalTime > 0);
+ }
+
+ /**
+ * Tests failure for mismatch in renewer.
+ */
+ @Test
+ public void testRenewTokenFailure() throws Exception {
+ secretManager = createSecretManager(conf, tokenMaxLifetime,
+ expiryTime, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ LambdaTestUtils.intercept(AccessControlException.class,
+ "rougeUser tries to renew a token", () -> {
+ secretManager.renewToken(token, "rougeUser");
+ });
+ }
+
+ /**
+ * Tests token renew failure due to max time.
+ */
+ @Test
+ public void testRenewTokenFailureMaxTime() throws Exception {
+ secretManager = createSecretManager(conf, 100,
+ 100, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ Thread.sleep(101);
+ LambdaTestUtils.intercept(IOException.class,
+ "testUser tried to renew an expired token", () -> {
+ secretManager.renewToken(token, TEST_USER.toString());
+ });
+ }
+
+ /**
+ * Tests token renew failure due to renewal time.
+ */
+ @Test
+ public void testRenewTokenFailureRenewalTime() throws Exception {
+ secretManager = createSecretManager(conf, 1000 * 10,
+ 10, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ Thread.sleep(15);
+ LambdaTestUtils.intercept(IOException.class, "is expired", () -> {
+ secretManager.renewToken(token, TEST_USER.toString());
+ });
+ }
+
+ @Test
+ public void testCreateIdentifier() throws Exception {
+ secretManager = createSecretManager(conf, tokenMaxLifetime,
+ expiryTime, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ OzoneTokenIdentifier identifier = secretManager.createIdentifier();
+ // Check basic details.
+ Assert.assertTrue(identifier.getOwner().equals(new Text("")));
+ Assert.assertTrue(identifier.getRealUser().equals(new Text("")));
+ Assert.assertTrue(identifier.getRenewer().equals(new Text("")));
+ }
+
+ @Test
+ public void testCancelTokenSuccess() throws Exception {
+ secretManager = createSecretManager(conf, tokenMaxLifetime,
+ expiryTime, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ secretManager.cancelToken(token, TEST_USER.toString());
+ }
+
+ @Test
+ public void testCancelTokenFailure() throws Exception {
+ secretManager = createSecretManager(conf, tokenMaxLifetime,
+ expiryTime, tokenRemoverScanInterval);
+ secretManager.startThreads(keyPair);
+ Token token = secretManager.createToken(TEST_USER,
+ TEST_USER,
+ TEST_USER);
+ LambdaTestUtils.intercept(AccessControlException.class,
+ "rougeUser is not authorized to cancel the token", () -> {
+ secretManager.cancelToken(token, "rougeUser");
+ });
+ }
+
+ /**
+ * Validate hash using public key of KeyPair.
+ */
+ private void validateHash(byte[] hash, byte[] identifier) throws Exception {
+ Signature rsaSignature =
+ Signature.getInstance(securityConfig.getSignatureAlgo(),
+ securityConfig.getProvider());
+ rsaSignature.initVerify(keyPair.getPublic());
+ rsaSignature.update(identifier);
+ Assert.assertTrue(rsaSignature.verify(hash));
+ }
+
+ /**
+ * Create instance of {@link OzoneSecretManager}.
+ */
+ private OzoneSecretManager createSecretManager(
+ OzoneConfiguration config, long tokenMaxLife, long expiry, long
+ tokenRemoverScanTime) throws IOException {
+ return new OzoneSecretManager<>(config, tokenMaxLife,
+ expiry, tokenRemoverScanTime, serviceRpcAdd);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 3d3462650e..8340be59f5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -17,17 +17,24 @@
*/
package org.apache.hadoop.ozone;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.slf4j.event.Level.INFO;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -37,14 +44,29 @@
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
+import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyPEMWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.KerberosAuthException;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.LambdaTestUtils;
@@ -54,6 +76,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +86,8 @@
@InterfaceAudience.Private
public final class TestSecureOzoneCluster {
+ private static final String TEST_USER = "testUgiUser";
+ private static final int CLIENT_TIMEOUT = 2 * 1000;
private Logger LOGGER = LoggerFactory
.getLogger(TestSecureOzoneCluster.class);
@@ -83,14 +108,24 @@ public final class TestSecureOzoneCluster {
private static String clusterId;
private static String scmId;
private static String omId;
+ private OzoneManagerProtocolClientSideTranslatorPB omClient;
+ private KeyPair keyPair;
+ private Path metaDirPath;
@Before
public void init() {
try {
conf = new OzoneConfiguration();
+ conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ final String path = GenericTestUtils
+ .getTempPath(UUID.randomUUID().toString());
+ metaDirPath = Paths.get(path, "om-meta");
+ conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
startMiniKdc();
setSecureConfig(conf);
createCredentialsInKDC(conf, miniKdc);
+ generateKeyPair(conf);
} catch (IOException e) {
LOGGER.error("Failed to initialize TestSecureOzoneCluster", e);
} catch (Exception e) {
@@ -108,6 +143,10 @@ public void stop() {
if (om != null) {
om.stop();
}
+ if (omClient != null) {
+ omClient.close();
+ }
+ FileUtils.deleteQuietly(metaDirPath.toFile());
} catch (Exception e) {
LOGGER.error("Failed to stop TestSecureOzoneCluster", e);
}
@@ -117,11 +156,11 @@ private void createCredentialsInKDC(Configuration conf, MiniKdc miniKdc)
throws Exception {
createPrincipal(scmKeytab,
conf.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
- createPrincipal(spnegoKeytab,
- conf.get(ScmConfigKeys
- .HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
- conf.get(OMConfigKeys
- .OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
+ createPrincipal(spnegoKeytab,
+ conf.get(ScmConfigKeys
+ .HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
+ conf.get(OMConfigKeys
+ .OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
createPrincipal(omKeyTab,
conf.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
}
@@ -139,12 +178,13 @@ private void startMiniKdc() throws Exception {
miniKdc.start();
}
- private void stopMiniKdc() throws Exception {
+ private void stopMiniKdc() {
miniKdc.stop();
}
private void setSecureConfig(Configuration conf) throws IOException {
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ conf.setBoolean(OZONE_ENABLED, true);
String host = InetAddress.getLocalHost().getCanonicalHostName();
String realm = miniKdc.getRealm();
curUser = UserGroupInformation.getCurrentUser()
@@ -247,60 +287,262 @@ private void testCommonKerberosFailures(Callable callable) throws Exception {
}
/**
- * Tests the secure KSM Initialization Failure.
+ * Tests the secure om Initialization Failure.
*
* @throws IOException
*/
@Test
- public void testSecureKsmInitializationFailure() throws Exception {
+ public void testSecureOMInitializationFailure() throws Exception {
initSCM();
// Create a secure SCM instance as om client will connect to it
scm = StorageContainerManager.createSCM(null, conf);
- final String path = GenericTestUtils
- .getTempPath(UUID.randomUUID().toString());
- OMStorage ksmStore = new OMStorage(conf);
- ksmStore.setClusterId("testClusterId");
- ksmStore.setScmId("testScmId");
- // writes the version file properties
- ksmStore.initialize();
+ setupOm(conf);
conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
"non-existent-user@EXAMPLE.com");
testCommonKerberosFailures(() -> OzoneManager.createOm(null, conf));
}
/**
- * Tests the secure KSM Initialization success.
+ * Tests the secure om Initialization success.
*
* @throws IOException
*/
@Test
- public void testSecureKsmInitializationSuccess() throws Exception {
+ public void testSecureOmInitializationSuccess() throws Exception {
initSCM();
// Create a secure SCM instance as om client will connect to it
scm = StorageContainerManager.createSCM(null, conf);
LogCapturer logs = LogCapturer.captureLogs(OzoneManager.LOG);
+ GenericTestUtils.setLogLevel(OzoneManager.LOG, INFO);
+
+ setupOm(conf);
+ try {
+ om.start();
+ } catch (Exception ex) {
+ // Expects timeout failure from scmClient in om but om user login via
+ // kerberos should succeed.
+ Assert.assertTrue(logs.getOutput().contains("Ozone Manager login"
+ + " successful"));
+ }
+ }
+
+ /**
+ * Performs following tests for delegation token.
+ * 1. Get valid delegation token
+ * 2. Test successful token renewal.
+ * 3. Client can authenticate using token.
+ * 4. Delegation token renewal without Kerberos auth fails.
+ * 5. Test success of token cancellation.
+ * 5. Test failure of token cancellation.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDelegationToken() throws Exception {
+
+ // Capture logs for assertions
+ LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
GenericTestUtils
- .setLogLevel(LoggerFactory.getLogger(OzoneManager.class.getName()),
- org.slf4j.event.Level.INFO);
+ .setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
- final String path = GenericTestUtils
- .getTempPath(UUID.randomUUID().toString());
- Path metaDirPath = Paths.get(path, "om-meta");
+ // Setup secure OM for start
+ setupOm(conf);
+ long omVersion =
+ RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+ // Start OM
+ om.start();
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String username = ugi.getUserName();
+ ugi.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
- OMStorage omStore = new OMStorage(conf);
+ // Get first OM client which will authenticate via Kerberos
+ omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+ RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+ OmUtils.getOmAddress(conf), ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ CLIENT_TIMEOUT), RandomStringUtils.randomAscii(5));
+
+ // Assert if auth was successful via Kerberos
+ Assert.assertFalse(logs.getOutput().contains(
+ "Auth successful for " + username + " (auth:KERBEROS)"));
+
+ // Case 1: Test successful delegation token.
+ Token token = omClient
+ .getDelegationToken(new Text("om"));
+
+ // Case 2: Test successful token renewal.
+ long renewalTime = omClient.renewDelegationToken(token);
+ Assert.assertTrue(renewalTime > 0);
+
+ // Check if token is of right kind and renewer is running om instance
+ Assert.assertEquals(token.getKind().toString(), "OzoneToken");
+ Assert.assertEquals(token.getService().toString(),
+ OmUtils.getOmRpcAddress(conf));
+ omClient.close();
+
+ // Create a remote ugi and set its authentication method to Token
+ UserGroupInformation testUser = UserGroupInformation
+ .createRemoteUser(TEST_USER);
+ testUser.addToken(token);
+ testUser.setAuthenticationMethod(AuthMethod.TOKEN);
+ UserGroupInformation.setLoginUser(testUser);
+
+ // Get Om client, this time authentication should happen via Token
+ testUser.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws Exception {
+ omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+ RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+ OmUtils.getOmAddress(conf), testUser, conf,
+ NetUtils.getDefaultSocketFactory(conf), CLIENT_TIMEOUT),
+ RandomStringUtils.randomAscii(5));
+ return null;
+ }
+ });
+
+ // Case 3: Test Client can authenticate using token.
+ Assert.assertFalse(logs.getOutput().contains(
+ "Auth successful for " + username + " (auth:TOKEN)"));
+ LambdaTestUtils.intercept(IOException.class, "Delete Volume failed," +
+ " error:VOLUME_NOT_FOUND",
+ () -> omClient.deleteVolume("vol1"));
+ Assert.assertTrue(logs.getOutput().contains(
+ "Auth successful for " + username + " (auth:TOKEN)"));
+
+ // Case 4: Test failure of token renewal.
+ // Call to renewDelegationToken will fail but it will confirm that
+ // initial connection via DT succeeded
+ LambdaTestUtils.intercept(RemoteException.class, "Delegation "
+ + "Token can be renewed only with kerberos or web authentication",
+ () -> omClient.renewDelegationToken(token));
+ Assert.assertTrue(logs.getOutput().contains(
+ "Auth successful for " + username + " (auth:TOKEN)"));
+ //testUser.setAuthenticationMethod(AuthMethod.KERBEROS);
+ UserGroupInformation.setLoginUser(ugi);
+ omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+ RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+ OmUtils.getOmAddress(conf), ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf)), RandomStringUtils.randomAscii(5));
+
+ // Case 5: Test success of token cancellation.
+ omClient.cancelDelegationToken(token);
+ omClient.close();
+
+ // Wait for client to timeout
+ Thread.sleep(CLIENT_TIMEOUT);
+
+ Assert.assertFalse(logs.getOutput().contains("Auth failed for"));
+
+ // Case 6: Test failure of token cancellation.
+ // Get Om client, this time authentication using Token will fail as
+ // token is expired
+ omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+ RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+ OmUtils.getOmAddress(conf), testUser, conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf)), RandomStringUtils.randomAscii(5));
+ LambdaTestUtils.intercept(RemoteException.class, "can't be found in cache",
+ () -> omClient.cancelDelegationToken(token));
+ Assert.assertTrue(logs.getOutput().contains(
+ "Auth failed for"));
+ }
+
+ private void generateKeyPair(OzoneConfiguration config) throws Exception {
+ HDDSKeyGenerator keyGenerator = new HDDSKeyGenerator(conf);
+ keyPair = keyGenerator.generateKey();
+ HDDSKeyPEMWriter pemWriter = new HDDSKeyPEMWriter(config);
+ pemWriter.writeKey(keyPair, true);
+ }
+
+ /**
+ * Tests delegation token renewal.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDelegationTokenRenewal() throws Exception {
+ // Capture logs for assertions.
+ LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
+ GenericTestUtils
+ .setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
+
+ // Setup secure OM for start.
+ OzoneConfiguration newConf = new OzoneConfiguration(conf);
+ newConf.setLong(OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY, 500);
+ setupOm(newConf);
+ long omVersion =
+ RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+ OzoneManager.setTestSecureOmFlag(true);
+ // Start OM
+
+ om.start();
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String username = ugi.getUserName();
+
+ // Get first OM client which will authenticate via Kerberos
+ omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+ RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+ OmUtils.getOmAddress(conf), ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ CLIENT_TIMEOUT), RandomStringUtils.randomAscii(5));
+
+ // Since client is already connected get a delegation token
+ Token token = omClient
+ .getDelegationToken(new Text("om"));
+
+ // Check if token is of right kind and renewer is running om instance
+ Assert.assertEquals(token.getKind().toString(), "OzoneToken");
+ Assert.assertEquals(token.getService().toString(),
+ OmUtils.getOmRpcAddress(conf));
+
+ // Renew delegation token
+ long expiryTime = omClient.renewDelegationToken(token);
+ Assert.assertTrue(expiryTime > 0);
+
+ // Test failure of delegation renewal
+ // 1. When renewer doesn't match (implicitly covers when renewer is
+ // null or empty )
+ Token token2 = omClient.getDelegationToken(new Text("randomService"));
+ LambdaTestUtils.intercept(RemoteException.class,
+ " with non-matching renewer randomService",
+ () -> omClient.renewDelegationToken(token2));
+
+ // 2. Test tampered token
+ OzoneTokenIdentifier tokenId = OzoneTokenIdentifier
+ .readProtoBuf(token.getIdentifier());
+ tokenId.setRenewer(new Text("om"));
+ tokenId.setMaxDate(System.currentTimeMillis() * 2);
+ Token tamperedToken = new Token<>(
+ tokenId.getBytes(), token2.getPassword(), token2.getKind(),
+ token2.getService());
+ LambdaTestUtils
+ .intercept(RemoteException.class, "can't be found in cache",
+ () -> omClient.renewDelegationToken(tamperedToken));
+
+ // 3. When token maxExpiryTime exceeds
+ Thread.sleep(500);
+ LambdaTestUtils
+ .intercept(RemoteException.class, "om tried to renew an expired"
+ + " token", () -> omClient.renewDelegationToken(token));
+ }
+
+ private void setupOm(OzoneConfiguration config) throws Exception {
+ OMStorage omStore = new OMStorage(config);
omStore.setClusterId("testClusterId");
omStore.setScmId("testScmId");
// writes the version file properties
omStore.initialize();
- try {
- om = OzoneManager.createOm(null, conf);
- } catch (Exception ex) {
- // Expects timeout failure from scmClient in KSM but KSM user login via
- // kerberos should succeed
- Assert.assertTrue(
- logs.getOutput().contains("Ozone Manager login successful."));
- }
+ OzoneManager.setTestSecureOmFlag(true);
+ om = OzoneManager.createOm(null, config);
+ CertificateClient certClient = Mockito.mock(CertificateClient.class);
+ Mockito.when(certClient.getPrivateKey("om"))
+ .thenReturn(keyPair.getPrivate());
+ Mockito.when(certClient.getPublicKey("om"))
+ .thenReturn(keyPair.getPublic());
+ om.setCertClient(certClient);
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 80b9d7a582..f4b85ef100 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -23,6 +23,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
+import java.security.KeyPair;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -35,16 +37,23 @@
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.security.OzoneSecurityException;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.OzoneSecretManager;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -53,7 +62,6 @@
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.OMAction;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -91,6 +99,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils;
@@ -101,7 +111,6 @@
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
-import javax.ws.rs.HEAD;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
@@ -118,8 +127,8 @@
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.ozone.security.OzoneSecurityException.ResultCodes.*;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
@@ -145,7 +154,6 @@
import static org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.OzoneManagerService
.newReflectiveBlockingService;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys
@@ -161,13 +169,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
public static final Logger LOG =
LoggerFactory.getLogger(OzoneManager.class);
- private static final AuditLogger AUDIT =
- new AuditLogger(AuditLoggerType.OMLOGGER);
+ private static final AuditLogger AUDIT = new AuditLogger(
+ AuditLoggerType.OMLOGGER);
private static final String USAGE =
"Usage: \n ozone om [genericOptions] " + "[ "
+ StartupOption.INIT.getName() + " ]\n " + "ozone om [ "
+ StartupOption.HELP.getName() + " ]\n";
+ private static final String OM_DAEMON = "om";
+ private static boolean securityEnabled = false;
+ private static OzoneSecretManager secretManager;
+ // TO DO: For testing purpose only, remove before commiting
+ private KeyPair keyPair;
+ private CertificateClient certClient;
+ private static boolean testSecureOmFlag = false;
private final OzoneConfiguration configuration;
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
@@ -208,21 +223,60 @@ private OzoneManager(OzoneConfiguration conf) throws IOException {
ResultCodes.OM_NOT_INITIALIZED);
}
- scmContainerClient = getScmContainerClient(configuration);
+ if (!testSecureOmFlag) {
+ scmContainerClient = getScmContainerClient(configuration);
+ // verifies that the SCM info in the OM Version file is correct.
+ scmBlockClient = getScmBlockClient(configuration);
+ ScmInfo scmInfo = scmBlockClient.getScmInfo();
+ if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
+ .getScmId().equals(omStorage.getScmId()))) {
+ throw new OMException("SCM version info mismatch.",
+ ResultCodes.SCM_VERSION_MISMATCH_ERROR);
+ }
+ } else {
+ // For testing purpose only
+ scmContainerClient = null;
+ scmBlockClient = null;
+ }
+ InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
+ int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+ OZONE_OM_HANDLER_COUNT_DEFAULT);
- // verifies that the SCM info in the OM Version file is correct.
- scmBlockClient = getScmBlockClient(configuration);
+ // This is a temporary check. Once fully implemented, all OM state change
+ // should go through Ratis - either standalone (for non-HA) or replicated
+ // (for HA).
+ boolean omRatisEnabled = configuration.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+ if (omRatisEnabled) {
+ omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
+ omNodeRpcAddr.getAddress(), configuration);
+ omRatisServer.start();
- ScmInfo scmInfo = scmBlockClient.getScmInfo();
- if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
- .getScmId().equals(omStorage.getScmId()))) {
- throw new OMException("SCM version info mismatch.",
- ResultCodes.SCM_VERSION_MISMATCH_ERROR);
+ LOG.info("OzoneManager Ratis server started at port {}",
+ omRatisServer.getServerPort());
+
+ omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
+ omId, omRatisServer.getRaftGroup(), configuration);
+ omRatisClient.connect();
+ } else {
+ omRatisServer = null;
+ omRatisClient = null;
}
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
+ BlockingService omService = newReflectiveBlockingService(
+ new OzoneManagerProtocolServerSideTranslatorPB(
+ this, omRatisClient, omRatisEnabled));
+ secretManager = createSecretManager(configuration);
+
+ omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
+ OzoneManagerProtocolPB.class, omService,
+ handlerCount);
+ omRpcAddress = updateRPCListenAddress(configuration,
+ OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
metadataManager = new OmMetadataManagerImpl(configuration);
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
bucketManager = new BucketManagerImpl(metadataManager);
@@ -313,6 +367,67 @@ private File getMetricsStorageFile() {
}
+ private OzoneSecretManager createSecretManager(
+ OzoneConfiguration conf)
+ throws IOException {
+ long tokenRemoverScanInterval =
+ conf.getTimeDuration(OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_KEY,
+ OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long tokenMaxLifetime =
+ conf.getTimeDuration(OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+ OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long tokenRenewInterval =
+ conf.getTimeDuration(OMConfigKeys.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ OMConfigKeys.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ Text omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
+
+ return new OzoneSecretManager(conf, tokenMaxLifetime, tokenRenewInterval,
+ tokenRemoverScanInterval, omRpcAddressTxt);
+ }
+
+ private void stopSecretManager() throws IOException {
+ if (secretManager != null) {
+ LOG.info("Stopping OM secret manager");
+ secretManager.stop();
+ }
+ }
+
+ private void startSecretManager() {
+ if (secretManager != null) {
+ try {
+ readKeyPair();
+ LOG.info("Starting OM secret manager");
+ secretManager.startThreads(keyPair);
+ } catch (IOException e) {
+ // Inability to start secret manager
+ // can't be recovered from.
+ LOG.error("Error starting secret manager.", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void setCertClient(CertificateClient certClient) {
+ // TODO: Initialize it in contructor with implementation for certClient.
+ this.certClient = certClient;
+ }
+
+ /**
+ * Read private key from file.
+ */
+ private void readKeyPair() throws OzoneSecurityException {
+ try {
+ keyPair = new KeyPair(certClient.getPublicKey(OM_DAEMON),
+ certClient.getPrivateKey(OM_DAEMON));
+ } catch (Exception e) {
+ throw new OzoneSecurityException("Error reading private file for "
+ + "OzoneManager", e, OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST);
+ }
+ }
+
/**
* Login OM service user if security and Kerberos are enabled.
*
@@ -322,8 +437,8 @@ private File getMetricsStorageFile() {
private static void loginOMUser(OzoneConfiguration conf)
throws IOException, AuthenticationException {
- if (SecurityUtil.getAuthenticationMethod(conf).equals
- (AuthenticationMethod.KERBEROS)) {
+ if (SecurityUtil.getAuthenticationMethod(conf).equals(
+ AuthenticationMethod.KERBEROS)) {
LOG.debug("Ozone security is enabled. Attempting login for OM user. "
+ "Principal: {},keytab: {}", conf.get(
OZONE_OM_KERBEROS_PRINCIPAL_KEY),
@@ -335,8 +450,8 @@ private static void loginOMUser(OzoneConfiguration conf)
SecurityUtil.login(conf, OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
OZONE_OM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
} else {
- throw new AuthenticationException(SecurityUtil.getAuthenticationMethod
- (conf) + " authentication method not supported. OM user login "
+ throw new AuthenticationException(SecurityUtil.getAuthenticationMethod(
+ conf) + " authentication method not supported. OM user login "
+ "failed.");
}
LOG.info("Ozone Manager login successful.");
@@ -409,7 +524,7 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf,
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
- .setSecretManager(null)
+ .setSecretManager(secretManager)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
@@ -449,6 +564,10 @@ private static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
}
+ private static boolean isOzoneSecurityEnabled() {
+ return securityEnabled;
+ }
+
/**
* Constructs OM instance based on command line arguments.
*
@@ -493,8 +612,10 @@ private static OzoneManager createOm(String[] argv,
terminate(1);
return null;
}
+
+ securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
// Authenticate KSM if security is enabled
- if (conf.getBoolean(OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY, true)) {
+ if (securityEnabled) {
loginOMUser(conf);
}
switch (startOpt) {
@@ -637,49 +758,13 @@ public OMMetrics getMetrics() {
*/
public void start() throws IOException {
- InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
- int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
- OZONE_OM_HANDLER_COUNT_DEFAULT);
-
- // This is a temporary check. Once fully implemented, all OM state change
- // should go through Ratis - either standalone (for non-HA) or replicated
- // (for HA).
- boolean omRatisEnabled = configuration.getBoolean(
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
- if (omRatisEnabled) {
- omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
- omNodeRpcAddr.getAddress(), configuration);
- omRatisServer.start();
-
- LOG.info("OzoneManager Ratis server started at port {}",
- omRatisServer.getServerPort());
-
- omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
- omId, omRatisServer.getRaftGroup(), configuration);
- omRatisClient.connect();
- } else {
- omRatisServer = null;
- omRatisClient = null;
- }
-
- BlockingService omService = newReflectiveBlockingService(
- new OzoneManagerProtocolServerSideTranslatorPB(
- this, omRatisClient, omRatisEnabled));
- omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
- OzoneManagerProtocolPB.class, omService,
- handlerCount);
- omRpcAddress = updateRPCListenAddress(configuration,
- OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
- omRpcServer.start();
-
LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
omRpcAddress));
DefaultMetricsSystem.initialize("OzoneManager");
metadataManager.start(configuration);
-
+ startSecretManagerIfNecessary();
// Set metrics and start metrics back ground thread
metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
@@ -700,8 +785,7 @@ public void start() throws IOException {
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
keyManager.start(configuration);
-
- httpServer = new OzoneManagerHttpServer(configuration, this);
+ omRpcServer.start();
try {
httpServer.start();
} catch (Exception ex) {
@@ -731,6 +815,7 @@ public void stop() {
omRatisServer.stop();
}
keyManager.stop();
+ stopSecretManager();
httpServer.stop();
metadataManager.stop();
metrics.unRegister();
@@ -755,6 +840,140 @@ public void join() {
}
}
+ private void startSecretManagerIfNecessary() {
+ boolean shouldRun = shouldUseDelegationTokens() && isOzoneSecurityEnabled();
+ boolean running = secretManager.isRunning();
+ if (shouldRun && !running) {
+ startSecretManager();
+ }
+ }
+
+ private boolean shouldUseDelegationTokens() {
+ return UserGroupInformation.isSecurityEnabled();
+ }
+
+
+ /**
+ *
+ * @return true if delegation token operation is allowed
+ */
+ private boolean isAllowedDelegationTokenOp() throws IOException {
+ AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
+ if (UserGroupInformation.isSecurityEnabled()
+ && (authMethod != AuthenticationMethod.KERBEROS)
+ && (authMethod != AuthenticationMethod.KERBEROS_SSL)
+ && (authMethod != AuthenticationMethod.CERTIFICATE)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns authentication method used to establish the connection.
+ * @return AuthenticationMethod used to establish connection
+ * @throws IOException
+ */
+ private AuthenticationMethod getConnectionAuthenticationMethod()
+ throws IOException {
+ UserGroupInformation ugi = getRemoteUser();
+ AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
+ if (authMethod == AuthenticationMethod.PROXY) {
+ authMethod = ugi.getRealUser().getAuthenticationMethod();
+ }
+ return authMethod;
+ }
+
+ // optimize ugi lookup for RPC operations to avoid a trip through
+ // UGI.getCurrentUser which is synch'ed
+ private static UserGroupInformation getRemoteUser() throws IOException {
+ UserGroupInformation ugi = Server.getRemoteUser();
+ return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
+ }
+
+ /**
+ * Get delegation token from OzoneManager.
+ * @param renewer Renewer information
+ * @return delegationToken DelegationToken signed by OzoneManager
+ * @throws IOException on error
+ */
+ @Override
+ public Token getDelegationToken(Text renewer)
+ throws IOException {
+ final boolean success;
+ final String tokenId;
+ Token token;
+
+ if (!isAllowedDelegationTokenOp()) {
+ throw new IOException("Delegation Token can be issued only with "
+ + "kerberos or web authentication");
+ }
+ if (secretManager == null || !secretManager.isRunning()) {
+ LOG.warn("trying to get DT with no secret manager running in OM.");
+ return null;
+ }
+
+ UserGroupInformation ugi = getRemoteUser();
+ String user = ugi.getUserName();
+ Text owner = new Text(user);
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+
+ token = secretManager.createToken(owner, renewer, realUser);
+ return token;
+ }
+
+ /**
+ * Method to renew a delegationToken issued by OzoneManager.
+ * @param token token to renew
+ * @return new expiryTime of the token
+ * @throws InvalidToken if {@code token} is invalid
+ * @throws IOException on other errors
+ */
+ @Override
+ public long renewDelegationToken(Token token)
+ throws InvalidToken, IOException {
+ long expiryTime;
+
+ try {
+
+ if (!isAllowedDelegationTokenOp()) {
+ throw new IOException("Delegation Token can be renewed only with "
+ + "kerberos or web authentication");
+ }
+ String renewer = getRemoteUser().getShortUserName();
+ expiryTime = secretManager.renewToken(token, renewer);
+
+ } catch (AccessControlException ace) {
+ final OzoneTokenIdentifier id = OzoneTokenIdentifier.readProtoBuf(
+ token.getIdentifier());
+ LOG.error("Delegation token renewal failed for dt: {}, cause: {}",
+ id.toString(), ace.getMessage());
+ throw ace;
+ }
+ return expiryTime;
+ }
+
+ /**
+ * Cancels a delegation token.
+ * @param token token to cancel
+ * @throws IOException on error
+ */
+ @Override
+ public void cancelDelegationToken(Token token)
+ throws IOException {
+ OzoneTokenIdentifier id = null;
+ try {
+ String canceller = getRemoteUser().getUserName();
+ id = secretManager.cancelToken(token, canceller);
+ LOG.trace("Delegation token renewed for dt: {}", id);
+ } catch (AccessControlException ace) {
+ LOG.error("Delegation token renewal failed for dt: {}, cause: {}", id,
+ ace.getMessage());
+ throw ace;
+ }
+ }
/**
* Creates a volume.
*
@@ -1760,4 +1979,8 @@ public String getName() {
public static Logger getLogger() {
return LOG;
}
+
+ public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
+ OzoneManager.testSecureOmFlag = testSecureOmFlag;
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 894297f52a..6db6cd6dab 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -22,7 +22,9 @@
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -153,6 +155,15 @@
.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
+import org.apache.hadoop.security.token.Token;
+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.slf4j.Logger;
@@ -317,6 +328,21 @@ public OMResponse handle(OMRequest request) {
request.getServiceListRequest());
responseBuilder.setServiceListResponse(serviceListResponse);
break;
+ case GetDelegationToken:
+ GetDelegationTokenResponseProto getDtResp = getDelegationToken(
+ request.getGetDelegationTokenRequest());
+ responseBuilder.setGetDelegationTokenResponse(getDtResp);
+ break;
+ case RenewDelegationToken:
+ RenewDelegationTokenResponseProto renewDtResp = renewDelegationToken(
+ request.getRenewDelegationTokenRequest());
+ responseBuilder.setRenewDelegationTokenResponse(renewDtResp);
+ break;
+ case CancelDelegationToken:
+ CancelDelegationTokenResponseProto cancelDtResp = cancelDelegationToken(
+ request.getCancelDelegationTokenRequest());
+ responseBuilder.setCancelDelegationTokenResponse(cancelDtResp);
+ break;
default:
responseBuilder.setSuccess(false);
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -915,4 +941,61 @@ private MultipartUploadAbortResponse abortMultipartUpload(
}
return response.build();
}
+
+ private GetDelegationTokenResponseProto getDelegationToken(
+ GetDelegationTokenRequestProto request){
+ GetDelegationTokenResponseProto.Builder rb =
+ GetDelegationTokenResponseProto.newBuilder();
+ try {
+ Token token = impl
+ .getDelegationToken(new Text(request.getRenewer()));
+ if (token != null) {
+ rb.setResponse(org.apache.hadoop.security.proto.SecurityProtos
+ .GetDelegationTokenResponseProto.newBuilder().setToken(OMPBHelper
+ .convertToTokenProto(token)).build());
+ }
+ rb.setStatus(Status.OK);
+ } catch (IOException ex) {
+ rb.setStatus(exceptionToResponseStatus(ex));
+ }
+ return rb.build();
+ }
+
+ private RenewDelegationTokenResponseProto renewDelegationToken(
+ RenewDelegationTokenRequestProto request) {
+ RenewDelegationTokenResponseProto.Builder rb =
+ RenewDelegationTokenResponseProto.newBuilder();
+ try {
+ if(request.hasToken()) {
+ long expiryTime = impl
+ .renewDelegationToken(
+ OMPBHelper.convertToDelegationToken(request.getToken()));
+ rb.setResponse(org.apache.hadoop.security.proto.SecurityProtos
+ .RenewDelegationTokenResponseProto.newBuilder()
+ .setNewExpiryTime(expiryTime).build());
+ }
+ rb.setStatus(Status.OK);
+ } catch (IOException ex) {
+ rb.setStatus(exceptionToResponseStatus(ex));
+ }
+ return rb.build();
+ }
+
+ private CancelDelegationTokenResponseProto cancelDelegationToken(
+ CancelDelegationTokenRequestProto req) {
+ CancelDelegationTokenResponseProto.Builder rb =
+ CancelDelegationTokenResponseProto.newBuilder();
+ try {
+ if(req.hasToken()) {
+ impl.cancelDelegationToken(
+ OMPBHelper.convertToDelegationToken(req.getToken()));
+ }
+ rb.setResponse(org.apache.hadoop.security.proto.SecurityProtos
+ .CancelDelegationTokenResponseProto.getDefaultInstance());
+ rb.setStatus(Status.OK);
+ } catch (IOException ex) {
+ rb.setStatus(exceptionToResponseStatus(ex));
+ }
+ return rb.build();
+ }
}