HDDS-8. Add OzoneManager Delegation Token support. Contributed by Ajay Kumar.
This commit is contained in:
parent
6d6b1a00c2
commit
bb4a26ca32
@ -350,6 +350,10 @@ public final class OzoneConfigKeys {
|
||||
public static final String OZONE_CONTAINER_COPY_WORKDIR =
|
||||
"hdds.datanode.replication.work.dir";
|
||||
|
||||
public static final String OZONE_MAX_KEY_LEN =
|
||||
"ozone.max.key.len";
|
||||
public static final int OZONE_MAX_KEY_LEN_DEFAULT = 1024 * 1024;
|
||||
|
||||
/**
|
||||
* Config properties to set client side checksum properties.
|
||||
*/
|
||||
|
@ -108,6 +108,7 @@ public final class OzoneConsts {
|
||||
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
|
||||
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
|
||||
public static final String OM_DB_NAME = "om.db";
|
||||
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
|
||||
|
||||
public static final String STORAGE_DIR_CHUNKS = "chunks";
|
||||
|
||||
|
@ -1014,6 +1014,15 @@
|
||||
every principal specified in the keytab file.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.max.key.len</name>
|
||||
<value>1048576</value>
|
||||
<tag>OZONE, SECURITY</tag>
|
||||
<description>
|
||||
Maximum length of private key in Ozone. Used in Ozone delegation and
|
||||
block tokens.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!--Client Settings-->
|
||||
<property>
|
||||
@ -1691,4 +1700,28 @@
|
||||
Name of file which stores public key generated for SCM CA.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.manager.delegation.remover.scan.interval</name>
|
||||
<value>3600000</value>
|
||||
<description>
|
||||
Time interval after which ozone secret manger scans for expired
|
||||
delegation token.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.manager.delegation.token.renew-interval</name>
|
||||
<value>1d</value>
|
||||
<description>
|
||||
Default time interval after which ozone delegation token will
|
||||
require renewal before any further use.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.manager.delegation.token.max-lifetime</name>
|
||||
<value>7d</value>
|
||||
<description>
|
||||
Default max time interval after which ozone delegation token will
|
||||
not be renewed.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
@ -43,7 +43,7 @@
|
||||
import java.security.NoSuchProviderException;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
|
||||
|
||||
public class TestCertificateSignRequest {
|
||||
|
||||
|
@ -176,9 +176,10 @@ public static InetSocketAddress getScmSecurityInetAddress(
|
||||
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY);
|
||||
|
||||
return NetUtils.createSocketAddr(
|
||||
host.or(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT) +
|
||||
":" + port
|
||||
.or(conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
|
||||
host.orElse(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT)
|
||||
+ ":" + port
|
||||
.orElse(conf.getInt(ScmConfigKeys
|
||||
.OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
|
||||
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT)));
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.client.*;
|
||||
import org.apache.hadoop.hdds.client.OzoneQuota;
|
||||
@ -33,7 +34,9 @@
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* An implementer of this interface is capable of connecting to Ozone Cluster
|
||||
@ -391,7 +394,6 @@ List<OzoneBucket> listS3Buckets(String userName, String bucketPrefix,
|
||||
*/
|
||||
void close() throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Initiate Multipart upload.
|
||||
* @param volumeName
|
||||
@ -448,4 +450,33 @@ OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName,
|
||||
void abortMultipartUpload(String volumeName,
|
||||
String bucketName, String keyName, String uploadID) throws IOException;
|
||||
|
||||
/**
|
||||
* Get a valid Delegation Token.
|
||||
*
|
||||
* @param renewer the designated renewer for the token
|
||||
* @return Token<OzoneDelegationTokenSelector>
|
||||
* @throws IOException
|
||||
*/
|
||||
Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Renew an existing delegation token.
|
||||
*
|
||||
* @param token delegation token obtained earlier
|
||||
* @return the new expiration time
|
||||
* @throws IOException
|
||||
*/
|
||||
long renewDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Cancel an existing delegation token.
|
||||
*
|
||||
* @param token delegation token
|
||||
* @throws IOException
|
||||
*/
|
||||
void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException;
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
@ -46,10 +47,12 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.ozone.web.response.ListBuckets;
|
||||
import org.apache.hadoop.ozone.web.response.ListKeys;
|
||||
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHeaders;
|
||||
@ -670,6 +673,44 @@ public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a valid Delegation Token. Not supported for RestClient.
|
||||
*
|
||||
* @param renewer the designated renewer for the token
|
||||
* @return Token<OzoneDelegationTokenSelector>
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
throw new IOException("Method not supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew an existing delegation token. Not supported for RestClient.
|
||||
*
|
||||
* @param token delegation token obtained earlier
|
||||
* @return the new expiration time
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
throw new IOException("Method not supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel an existing delegation token. Not supported for RestClient.
|
||||
*
|
||||
* @param token delegation token
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
throw new IOException("Method not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public OzoneInputStream getKey(
|
||||
String volumeName, String bucketName, String keyName)
|
||||
|
@ -70,7 +70,10 @@
|
||||
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB
|
||||
.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.apache.ratis.protocol.ClientId;
|
||||
import org.slf4j.Logger;
|
||||
@ -411,6 +414,44 @@ public void removeBucketAcls(
|
||||
ozoneManagerClient.setBucketProperty(builder.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a valid Delegation Token.
|
||||
*
|
||||
* @param renewer the designated renewer for the token
|
||||
* @return Token<OzoneDelegationTokenSelector>
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
return ozoneManagerClient.getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew an existing delegation token.
|
||||
*
|
||||
* @param token delegation token obtained earlier
|
||||
* @return the new expiration time
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
return ozoneManagerClient.renewDelegationToken(token);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel an existing delegation token.
|
||||
*
|
||||
* @param token delegation token
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
ozoneManagerClient.cancelDelegationToken(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBucketVersioning(
|
||||
String volumeName, String bucketName, Boolean versioning)
|
||||
|
@ -84,7 +84,7 @@ public void testGetScmClientAddress() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOmAddress() {
|
||||
public void testgetOmSocketAddress() {
|
||||
final Configuration conf = new OzoneConfiguration();
|
||||
|
||||
// First try a client address with just a host name. Verify it falls
|
||||
|
@ -54,14 +54,21 @@ private OmUtils() {
|
||||
* @param conf
|
||||
* @return Target InetSocketAddress for the SCM service endpoint.
|
||||
*/
|
||||
public static InetSocketAddress getOmAddress(
|
||||
Configuration conf) {
|
||||
public static InetSocketAddress getOmAddress(Configuration conf) {
|
||||
return NetUtils.createSocketAddr(getOmRpcAddress(conf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the socket address that is used by OM.
|
||||
* @param conf
|
||||
* @return Target InetSocketAddress for the SCM service endpoint.
|
||||
*/
|
||||
public static String getOmRpcAddress(Configuration conf) {
|
||||
final Optional<String> host = getHostNameFromConfigKeys(conf,
|
||||
OZONE_OM_ADDRESS_KEY);
|
||||
|
||||
return NetUtils.createSocketAddr(
|
||||
host.orElse(OZONE_OM_BIND_HOST_DEFAULT) + ":" +
|
||||
getOmRpcPort(conf));
|
||||
return host.orElse(OZONE_OM_BIND_HOST_DEFAULT) + ":" +
|
||||
getOmRpcPort(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -184,4 +184,17 @@ private OMConfigKeys() {
|
||||
"ozone.om.http.kerberos.keytab.file";
|
||||
public static final String OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY
|
||||
= "ozone.om.http.kerberos.principal";
|
||||
// Delegation token related keys
|
||||
public static final String DELEGATION_REMOVER_SCAN_INTERVAL_KEY =
|
||||
"ozone.manager.delegation.remover.scan.interval";
|
||||
public static final long DELEGATION_REMOVER_SCAN_INTERVAL_DEFAULT =
|
||||
60*60*1000;
|
||||
public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
|
||||
"ozone.manager.delegation.token.renew-interval";
|
||||
public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
|
||||
24*60*60*1000; // 1 day = 86400000 ms
|
||||
public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
|
||||
"ozone.manager.delegation.token.max-lifetime";
|
||||
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
|
||||
7*24*60*60*1000; // 7 days
|
||||
}
|
||||
|
@ -41,7 +41,7 @@
|
||||
*/
|
||||
@KerberosInfo(
|
||||
serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
|
||||
public interface OzoneManagerProtocol {
|
||||
public interface OzoneManagerProtocol extends OzoneManagerSecurityProtocol {
|
||||
|
||||
/**
|
||||
* Creates a volume.
|
||||
|
@ -0,0 +1,67 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* Security protocol for a secure OzoneManager.
|
||||
*/
|
||||
@KerberosInfo(
|
||||
serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
|
||||
public interface OzoneManagerSecurityProtocol {
|
||||
|
||||
/**
|
||||
* Get a valid Delegation Token.
|
||||
*
|
||||
* @param renewer the designated renewer for the token
|
||||
* @return Token<OzoneDelegationTokenSelector>
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Renew an existing delegation token.
|
||||
*
|
||||
* @param token delegation token obtained earlier
|
||||
* @return the new expiration time
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
long renewDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Cancel an existing delegation token.
|
||||
*
|
||||
* @param token delegation token
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException;
|
||||
|
||||
}
|
@ -23,6 +23,7 @@
|
||||
import com.google.protobuf.ServiceException;
|
||||
import java.util.ArrayList;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
||||
@ -168,6 +169,18 @@
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.GetDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* The client side implementation of OzoneManagerProtocol.
|
||||
@ -1107,4 +1120,90 @@ public List<ServiceInfo> getServiceList() throws IOException {
|
||||
+ resp.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a valid Delegation Token.
|
||||
*
|
||||
* @param renewer the designated renewer for the token
|
||||
* @return Token<OzoneDelegationTokenSelector>
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
GetDelegationTokenRequestProto req = GetDelegationTokenRequestProto
|
||||
.newBuilder()
|
||||
.setRenewer(renewer == null ? "" : renewer.toString())
|
||||
.build();
|
||||
|
||||
OMRequest omRequest = createOMRequest(Type.GetDelegationToken)
|
||||
.setGetDelegationTokenRequest(req)
|
||||
.build();
|
||||
|
||||
final GetDelegationTokenResponseProto resp = submitRequest(omRequest)
|
||||
.getGetDelegationTokenResponse();
|
||||
if (resp.getStatus() == Status.OK) {
|
||||
return resp.getResponse().hasToken() ?
|
||||
OMPBHelper.convertToDelegationToken(resp.getResponse().getToken())
|
||||
: null;
|
||||
} else {
|
||||
throw new IOException("Get Delegation Token failed, error : " + resp
|
||||
.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew an existing delegation token.
|
||||
*
|
||||
* @param token delegation token obtained earlier
|
||||
* @return the new expiration time
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
RenewDelegationTokenRequestProto req =
|
||||
RenewDelegationTokenRequestProto.newBuilder().
|
||||
setToken(OMPBHelper.convertToTokenProto(token)).
|
||||
build();
|
||||
|
||||
OMRequest omRequest = createOMRequest(Type.RenewDelegationToken)
|
||||
.setRenewDelegationTokenRequest(req)
|
||||
.build();
|
||||
|
||||
final RenewDelegationTokenResponseProto resp = submitRequest(omRequest)
|
||||
.getRenewDelegationTokenResponse();
|
||||
if (resp.getStatus() == Status.OK) {
|
||||
return resp.getResponse().getNewExpiryTime();
|
||||
} else {
|
||||
throw new IOException("Renew Delegation Token failed, error : " + resp
|
||||
.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel an existing delegation token.
|
||||
*
|
||||
* @param token delegation token
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
|
||||
.newBuilder()
|
||||
.setToken(OMPBHelper.convertToTokenProto(token))
|
||||
.build();
|
||||
|
||||
OMRequest omRequest = createOMRequest(Type.CancelDelegationToken)
|
||||
.setCancelDelegationTokenRequest(req)
|
||||
.build();
|
||||
|
||||
final CancelDelegationTokenResponseProto resp = submitRequest(omRequest)
|
||||
.getCancelDelegationTokenResponse();
|
||||
if (resp.getStatus() != Status.OK) {
|
||||
throw new IOException("Cancel Delegation Token failed, error : " + resp
|
||||
.getStatus());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,8 @@
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.OzoneManagerService;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.token.TokenInfo;
|
||||
import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
|
||||
|
||||
/**
|
||||
* Protocol used to communicate with OM.
|
||||
@ -32,6 +34,7 @@
|
||||
protocolVersion = 1)
|
||||
@KerberosInfo(
|
||||
serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
|
||||
@TokenInfo(OzoneDelegationTokenSelector.class)
|
||||
@InterfaceAudience.Private
|
||||
public interface OzoneManagerProtocolPB
|
||||
extends OzoneManagerService.BlockingInterface {
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.protocolPB;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.OzoneAclInfo;
|
||||
@ -24,6 +26,9 @@
|
||||
.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* Utilities for converting protobuf classes.
|
||||
@ -110,4 +115,37 @@ public static OzoneAcl convertOzoneAcl(OzoneAclInfo aclInfo) {
|
||||
|
||||
return new OzoneAcl(aclType, aclInfo.getName(), aclRights);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts Ozone delegation token to @{@link TokenProto}.
|
||||
* @return tokenProto
|
||||
*/
|
||||
public static TokenProto convertToTokenProto(Token<?> tok) {
|
||||
if(tok == null){
|
||||
throw new IllegalArgumentException("Invalid argument: token is null");
|
||||
}
|
||||
|
||||
return TokenProto.newBuilder().
|
||||
setIdentifier(getByteString(tok.getIdentifier())).
|
||||
setPassword(getByteString(tok.getPassword())).
|
||||
setKind(tok.getKind().toString()).
|
||||
setService(tok.getService().toString()).build();
|
||||
}
|
||||
|
||||
public static ByteString getByteString(byte[] bytes) {
|
||||
// return singleton to reduce object allocation
|
||||
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts @{@link TokenProto} to Ozone delegation token.
|
||||
*
|
||||
* @return Ozone
|
||||
*/
|
||||
public static Token<OzoneTokenIdentifier> convertToDelegationToken(
|
||||
TokenProto tokenProto) {
|
||||
return new Token<>(tokenProto.getIdentifier()
|
||||
.toByteArray(), tokenProto.getPassword().toByteArray(), new Text(
|
||||
tokenProto.getKind()), new Text(tokenProto.getService()));
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
import java.util.Collection;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
@ -32,7 +31,7 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class OzoneDelegationTokenSelector
|
||||
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||
extends AbstractDelegationTokenSelector<OzoneTokenIdentifier> {
|
||||
|
||||
public OzoneDelegationTokenSelector() {
|
||||
super(OzoneTokenIdentifier.KIND_NAME);
|
||||
|
@ -0,0 +1,598 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.security;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.KeyPair;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.PrivateKey;
|
||||
import java.security.Signature;
|
||||
import java.security.SignatureException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.security.OzoneSecretStore.OzoneManagerSecretState;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier.TokenInfo;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.HadoopKerberosName;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* SecretManager for Ozone Master. Responsible for signing identifiers with
|
||||
* private key,
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class OzoneSecretManager<T extends OzoneTokenIdentifier>
|
||||
extends SecretManager<T> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(OzoneSecretManager.class);
|
||||
/**
|
||||
* The name of the Private/Public Key based hashing algorithm.
|
||||
*/
|
||||
private static final String DEFAULT_SIGNATURE_ALGORITHM = "SHA256withRSA";
|
||||
private final long tokenMaxLifetime;
|
||||
private final long tokenRenewInterval;
|
||||
private final long tokenRemoverScanInterval;
|
||||
private final Text service;
|
||||
private final Map<Integer, OzoneSecretKey> allKeys;
|
||||
private final Map<T, TokenInfo> currentTokens;
|
||||
private final OzoneSecretStore store;
|
||||
private Thread tokenRemoverThread;
|
||||
private volatile boolean running;
|
||||
private AtomicInteger tokenSequenceNumber;
|
||||
private OzoneSecretKey currentKey;
|
||||
private AtomicInteger currentKeyId;
|
||||
/**
|
||||
* If the delegation token update thread holds this lock, it will not get
|
||||
* interrupted.
|
||||
*/
|
||||
private Object noInterruptsLock = new Object();
|
||||
private int maxKeyLength;
|
||||
|
||||
/**
|
||||
* Create a secret manager.
|
||||
*
|
||||
* @param conf configuration.
|
||||
* @param tokenMaxLifetime the maximum lifetime of the delegation tokens in
|
||||
* milliseconds
|
||||
* @param tokenRenewInterval how often the tokens must be renewed in
|
||||
* milliseconds
|
||||
* @param dtRemoverScanInterval how often the tokens are scanned for expired
|
||||
* tokens in milliseconds
|
||||
*/
|
||||
public OzoneSecretManager(OzoneConfiguration conf, long tokenMaxLifetime,
|
||||
long tokenRenewInterval, long dtRemoverScanInterval, Text service)
|
||||
throws IOException {
|
||||
this.tokenMaxLifetime = tokenMaxLifetime;
|
||||
this.tokenRenewInterval = tokenRenewInterval;
|
||||
this.tokenRemoverScanInterval = dtRemoverScanInterval;
|
||||
|
||||
currentTokens = new ConcurrentHashMap();
|
||||
allKeys = new ConcurrentHashMap<>();
|
||||
currentKeyId = new AtomicInteger();
|
||||
tokenSequenceNumber = new AtomicInteger();
|
||||
this.store = new OzoneSecretStore(conf);
|
||||
loadTokenSecretState(store.loadState());
|
||||
this.service = service;
|
||||
this.maxKeyLength = conf.getInt(OzoneConfigKeys.OZONE_MAX_KEY_LEN,
|
||||
OzoneConfigKeys.OZONE_MAX_KEY_LEN_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T createIdentifier() {
|
||||
return (T) T.newInstance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new Identifier with given,owner,renwer and realUser.
|
||||
*
|
||||
* @return T
|
||||
*/
|
||||
public T createIdentifier(Text owner, Text renewer, Text realUser) {
|
||||
return (T) T.newInstance(owner, renewer, realUser);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@link Token} for given identifier.
|
||||
*
|
||||
* @param owner
|
||||
* @param renewer
|
||||
* @param realUser
|
||||
* @return Token
|
||||
* @throws IOException to allow future exceptions to be added without breaking
|
||||
* compatibility
|
||||
*/
|
||||
public Token<T> createToken(Text owner, Text renewer, Text realUser)
|
||||
throws IOException {
|
||||
T identifier = createIdentifier(owner, renewer, realUser);
|
||||
updateIdentifierDetails(identifier);
|
||||
|
||||
byte[] password = createPassword(identifier.getBytes(),
|
||||
currentKey.getPrivateKey());
|
||||
addToTokenStore(identifier, password);
|
||||
Token<T> token = new Token<>(identifier.getBytes(), password,
|
||||
identifier.getKind(), service);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
long expiryTime = identifier.getIssueDate() + tokenRenewInterval;
|
||||
String tokenId = identifier.toStringStable();
|
||||
LOG.trace("Issued delegation token -> expiryTime:{},tokenId:{}",
|
||||
expiryTime, tokenId);
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores given identifier in token store.
|
||||
*
|
||||
* @param identifier
|
||||
* @param password
|
||||
* @throws IOException
|
||||
*/
|
||||
private void addToTokenStore(T identifier, byte[] password)
|
||||
throws IOException {
|
||||
TokenInfo tokenInfo = new TokenInfo(identifier.getIssueDate()
|
||||
+ tokenRenewInterval, password, identifier.getTrackingId());
|
||||
currentTokens.put(identifier, tokenInfo);
|
||||
store.storeToken(identifier, tokenInfo.getRenewDate());
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates issue date, master key id and sequence number for identifier.
|
||||
*
|
||||
* @param identifier the identifier to validate
|
||||
*/
|
||||
private void updateIdentifierDetails(T identifier) {
|
||||
int sequenceNum;
|
||||
long now = Time.monotonicNow();
|
||||
sequenceNum = incrementDelegationTokenSeqNum();
|
||||
identifier.setIssueDate(now);
|
||||
identifier.setMasterKeyId(currentKey.getKeyId());
|
||||
identifier.setSequenceNumber(sequenceNum);
|
||||
identifier.setMaxDate(Time.monotonicNow() + tokenMaxLifetime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute HMAC of the identifier using the private key and return the output
|
||||
* as password.
|
||||
*
|
||||
* @param identifier
|
||||
* @param privateKey
|
||||
* @return byte[] signed byte array
|
||||
*/
|
||||
public byte[] createPassword(byte[] identifier, PrivateKey privateKey)
|
||||
throws OzoneSecurityException {
|
||||
try {
|
||||
Signature rsaSignature = Signature.getInstance(
|
||||
DEFAULT_SIGNATURE_ALGORITHM);
|
||||
rsaSignature.initSign(privateKey);
|
||||
rsaSignature.update(identifier);
|
||||
return rsaSignature.sign();
|
||||
} catch (InvalidKeyException | NoSuchAlgorithmException |
|
||||
SignatureException ex) {
|
||||
throw new OzoneSecurityException("Error while creating HMAC hash for " +
|
||||
"token.", ex, OzoneSecurityException.ResultCodes
|
||||
.SECRET_MANAGER_HMAC_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] createPassword(T identifier) {
|
||||
LOG.debug("Creating password for identifier: {}, currentKey: {}",
|
||||
formatTokenId(identifier), currentKey.getKeyId());
|
||||
byte[] password = null;
|
||||
try {
|
||||
password = createPassword(identifier.getBytes(),
|
||||
currentKey.getPrivateKey());
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not store token {}!!", formatTokenId(identifier),
|
||||
ioe);
|
||||
}
|
||||
return password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(T identifier) throws InvalidToken {
|
||||
return checkToken(identifier).getPassword();
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew a delegation token.
|
||||
*
|
||||
* @param token the token to renew
|
||||
* @param renewer the full principal name of the user doing the renewal
|
||||
* @return the new expiration time
|
||||
* @throws InvalidToken if the token is invalid
|
||||
* @throws AccessControlException if the user can't renew token
|
||||
*/
|
||||
public synchronized long renewToken(Token<T> token, String renewer)
|
||||
throws IOException {
|
||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||
DataInputStream in = new DataInputStream(buf);
|
||||
T id = (T) T.readProtoBuf(in);
|
||||
LOG.debug("Token renewal for identifier: {}, total currentTokens: {}",
|
||||
formatTokenId(id), currentTokens.size());
|
||||
|
||||
long now = Time.monotonicNow();
|
||||
if (id.getMaxDate() < now) {
|
||||
throw new InvalidToken(renewer + " tried to renew an expired token "
|
||||
+ formatTokenId(id) + " max expiration date: "
|
||||
+ Time.formatTime(id.getMaxDate())
|
||||
+ " currentTime: " + Time.formatTime(now));
|
||||
}
|
||||
checkToken(id);
|
||||
if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {
|
||||
throw new AccessControlException(renewer +
|
||||
" tried to renew a token " + formatTokenId(id)
|
||||
+ " without a renewer");
|
||||
}
|
||||
if (!id.getRenewer().toString().equals(renewer)) {
|
||||
throw new AccessControlException(renewer
|
||||
+ " tries to renew a token " + formatTokenId(id)
|
||||
+ " with non-matching renewer " + id.getRenewer());
|
||||
}
|
||||
OzoneSecretKey key = allKeys.get(id.getMasterKeyId());
|
||||
if (key == null) {
|
||||
throw new InvalidToken("Unable to find master key for keyId="
|
||||
+ id.getMasterKeyId()
|
||||
+ " from cache. Failed to renew an unexpired token "
|
||||
+ formatTokenId(id) + " with sequenceNumber="
|
||||
+ id.getSequenceNumber());
|
||||
}
|
||||
byte[] password = createPassword(token.getIdentifier(),
|
||||
key.getPrivateKey());
|
||||
|
||||
long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
|
||||
try {
|
||||
addToTokenStore(id, password);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to update token " + id.getSequenceNumber(), e);
|
||||
}
|
||||
return renewTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a token by removing it from store and cache.
|
||||
*
|
||||
* @return Identifier of the canceled token
|
||||
* @throws InvalidToken for invalid token
|
||||
* @throws AccessControlException if the user isn't allowed to cancel
|
||||
*/
|
||||
public T cancelToken(Token<T> token, String canceller) throws IOException {
|
||||
T id = (T) T.readProtoBuf(token.getIdentifier());
|
||||
LOG.debug("Token cancellation requested for identifier: {}",
|
||||
formatTokenId(id));
|
||||
|
||||
if (id.getUser() == null) {
|
||||
throw new InvalidToken("Token with no owner " + formatTokenId(id));
|
||||
}
|
||||
String owner = id.getUser().getUserName();
|
||||
Text renewer = id.getRenewer();
|
||||
HadoopKerberosName cancelerKrbName = new HadoopKerberosName(canceller);
|
||||
String cancelerShortName = cancelerKrbName.getShortName();
|
||||
if (!canceller.equals(owner)
|
||||
&& (renewer == null || renewer.toString().isEmpty()
|
||||
|| !cancelerShortName
|
||||
.equals(renewer.toString()))) {
|
||||
throw new AccessControlException(canceller
|
||||
+ " is not authorized to cancel the token " + formatTokenId(id));
|
||||
}
|
||||
try {
|
||||
store.removeToken(id);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove token " + id.getSequenceNumber(), e);
|
||||
}
|
||||
TokenInfo info = currentTokens.remove(id);
|
||||
if (info == null) {
|
||||
throw new InvalidToken("Token not found " + formatTokenId(id));
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
public int getCurrentKeyId() {
|
||||
return currentKeyId.get();
|
||||
}
|
||||
|
||||
public void setCurrentKeyId(int keyId) {
|
||||
currentKeyId.set(keyId);
|
||||
}
|
||||
|
||||
public int incrementCurrentKeyId() {
|
||||
return currentKeyId.incrementAndGet();
|
||||
}
|
||||
|
||||
public int getDelegationTokenSeqNum() {
|
||||
return tokenSequenceNumber.get();
|
||||
}
|
||||
|
||||
public void setDelegationTokenSeqNum(int seqNum) {
|
||||
tokenSequenceNumber.set(seqNum);
|
||||
}
|
||||
|
||||
public int incrementDelegationTokenSeqNum() {
|
||||
return tokenSequenceNumber.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if given token is valid.
|
||||
*
|
||||
* @param identifier
|
||||
* @param password
|
||||
*/
|
||||
private boolean validateToken(T identifier, byte[] password) {
|
||||
try {
|
||||
Signature rsaSignature = Signature.getInstance("SHA256withRSA");
|
||||
rsaSignature.initVerify(currentKey.getPublicKey());
|
||||
rsaSignature.update(identifier.getBytes());
|
||||
return rsaSignature.verify(password);
|
||||
} catch (NoSuchAlgorithmException | SignatureException |
|
||||
InvalidKeyException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if TokenInfo for the given identifier exists in database and if the
|
||||
* token is expired.
|
||||
*/
|
||||
public TokenInfo checkToken(T identifier) throws InvalidToken {
|
||||
TokenInfo info = currentTokens.get(identifier);
|
||||
if (info == null) {
|
||||
throw new InvalidToken("token " + formatTokenId(identifier)
|
||||
+ " can't be found in cache");
|
||||
}
|
||||
long now = Time.monotonicNow();
|
||||
if (info.getRenewDate() < now) {
|
||||
throw new InvalidToken("token " + formatTokenId(identifier) + " is " +
|
||||
"expired, current time: " + Time.formatTime(now) +
|
||||
" expected renewal time: " + Time.formatTime(info.getRenewDate()));
|
||||
}
|
||||
if (!validateToken(identifier, info.getPassword())) {
|
||||
throw new InvalidToken("Tampared/Inavalid token.");
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
// TODO: handle roll private key/certificate
|
||||
private synchronized void removeExpiredKeys() {
|
||||
long now = Time.monotonicNow();
|
||||
for (Iterator<Map.Entry<Integer, OzoneSecretKey>> it = allKeys.entrySet()
|
||||
.iterator(); it.hasNext();) {
|
||||
Map.Entry<Integer, OzoneSecretKey> e = it.next();
|
||||
OzoneSecretKey key = e.getValue();
|
||||
if (key.getExpiryDate() < now && key.getExpiryDate() != -1) {
|
||||
if (!key.equals(currentKey)) {
|
||||
it.remove();
|
||||
try {
|
||||
store.removeTokenMasterKey(key);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Unable to remove master key " + key.getKeyId(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void loadTokenSecretState(OzoneManagerSecretState<T> state)
|
||||
throws IOException {
|
||||
LOG.info("Loading token state into token manager.");
|
||||
for (OzoneSecretKey key : state.ozoneManagerSecretState()) {
|
||||
allKeys.putIfAbsent(key.getKeyId(), key);
|
||||
}
|
||||
for (Map.Entry<T, Long> entry : state.getTokenState().entrySet()) {
|
||||
addPersistedDelegationToken(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private String formatTokenId(T id) {
|
||||
return "(" + id + ")";
|
||||
}
|
||||
|
||||
private void addPersistedDelegationToken(
|
||||
T identifier, long renewDate)
|
||||
throws IOException {
|
||||
if (running) {
|
||||
// a safety check
|
||||
throw new IOException(
|
||||
"Can't add persisted delegation token to a running SecretManager.");
|
||||
}
|
||||
int keyId = identifier.getMasterKeyId();
|
||||
OzoneSecretKey dKey = allKeys.get(keyId);
|
||||
if (dKey == null) {
|
||||
LOG.warn("No KEY found for persisted identifier "
|
||||
+ formatTokenId(identifier));
|
||||
return;
|
||||
}
|
||||
|
||||
PrivateKey privateKey = dKey.getPrivateKey();
|
||||
byte[] password = createPassword(identifier.getBytes(), privateKey);
|
||||
if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
|
||||
setDelegationTokenSeqNum(identifier.getSequenceNumber());
|
||||
}
|
||||
if (currentTokens.get(identifier) == null) {
|
||||
currentTokens.put(identifier, new TokenInfo(renewDate,
|
||||
password, identifier.getTrackingId()));
|
||||
} else {
|
||||
throw new IOException("Same delegation token being added twice: "
|
||||
+ formatTokenId(identifier));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be called before this object is used.
|
||||
*/
|
||||
public void startThreads(KeyPair keyPair) throws IOException {
|
||||
Preconditions.checkState(!running);
|
||||
updateCurrentKey(keyPair);
|
||||
removeExpiredKeys();
|
||||
synchronized (this) {
|
||||
running = true;
|
||||
tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
|
||||
tokenRemoverThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
public void stopThreads() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Stopping expired delegation token remover thread");
|
||||
}
|
||||
running = false;
|
||||
|
||||
if (tokenRemoverThread != null) {
|
||||
synchronized (noInterruptsLock) {
|
||||
tokenRemoverThread.interrupt();
|
||||
}
|
||||
try {
|
||||
tokenRemoverThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Unable to join on token removal thread", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the OzoneSecretManager.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void stop() throws IOException {
|
||||
stopThreads();
|
||||
if (this.store != null) {
|
||||
this.store.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the current master key. This is called once by startThreads before
|
||||
* tokenRemoverThread is created,
|
||||
*/
|
||||
private void updateCurrentKey(KeyPair keyPair) throws IOException {
|
||||
LOG.info("Updating the current master key for generating tokens");
|
||||
|
||||
// TODO: fix me based on the certificate expire time to set the key
|
||||
// expire time.
|
||||
int newCurrentId = incrementCurrentKeyId();
|
||||
OzoneSecretKey newKey = new OzoneSecretKey(newCurrentId, -1,
|
||||
keyPair, maxKeyLength);
|
||||
|
||||
store.storeTokenMasterKey(newKey);
|
||||
if (!allKeys.containsKey(newKey.getKeyId())) {
|
||||
allKeys.put(newKey.getKeyId(), newKey);
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
currentKey = newKey;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove expired delegation tokens from cache and persisted store.
|
||||
*/
|
||||
private void removeExpiredToken() throws IOException {
|
||||
long now = Time.monotonicNow();
|
||||
synchronized (this) {
|
||||
Iterator<Map.Entry<T,
|
||||
TokenInfo>> i = currentTokens.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Map.Entry<T,
|
||||
TokenInfo> entry = i.next();
|
||||
long renewDate = entry.getValue().getRenewDate();
|
||||
if (renewDate < now) {
|
||||
i.remove();
|
||||
store.removeToken(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Is Secret Manager running.
|
||||
*
|
||||
* @return true if secret mgr is running
|
||||
*/
|
||||
public synchronized boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns expiry time of a token given its identifier.
|
||||
*
|
||||
* @param dtId DelegationTokenIdentifier of a token
|
||||
* @return Expiry time of the token
|
||||
* @throws IOException
|
||||
*/
|
||||
public long getTokenExpiryTime(T dtId)
|
||||
throws IOException {
|
||||
TokenInfo info = currentTokens.get(dtId);
|
||||
if (info != null) {
|
||||
return info.getRenewDate();
|
||||
} else {
|
||||
throw new IOException("No delegation token found for this identifier");
|
||||
}
|
||||
}
|
||||
|
||||
private class ExpiredTokenRemover extends Thread {
|
||||
private long lastTokenCacheCleanup;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Starting expired delegation token remover thread, "
|
||||
+ "tokenRemoverScanInterval=" + tokenRemoverScanInterval
|
||||
/ (60 * 1000) + " min(s)");
|
||||
try {
|
||||
while (running) {
|
||||
long now = Time.monotonicNow();
|
||||
if (lastTokenCacheCleanup + tokenRemoverScanInterval
|
||||
< now) {
|
||||
removeExpiredToken();
|
||||
lastTokenCacheCleanup = now;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(Math.min(5000,
|
||||
tokenRemoverScanInterval)); // 5 seconds
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("ExpiredTokenRemover received " + ie);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error("ExpiredTokenRemover thread received unexpected exception",
|
||||
t);
|
||||
Runtime.getRuntime().exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,250 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.security;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.utils.MetadataKeyFilters;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MANAGER_TOKEN_DB_NAME;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_MB;
|
||||
|
||||
/**
|
||||
* SecretStore for Ozone Master.
|
||||
*/
|
||||
public class OzoneSecretStore<T extends OzoneTokenIdentifier>
|
||||
implements Closeable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(OzoneSecretStore.class);
|
||||
private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
|
||||
private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (store != null) {
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Support class to maintain state of OzoneSecretStore.
|
||||
*/
|
||||
public static class OzoneManagerSecretState<T> {
|
||||
|
||||
private Map<T, Long> tokenState = new HashMap<>();
|
||||
private Set<OzoneSecretKey> tokenMasterKeyState = new HashSet<>();
|
||||
|
||||
public Map<T, Long> getTokenState() {
|
||||
return tokenState;
|
||||
}
|
||||
|
||||
public Set<OzoneSecretKey> ozoneManagerSecretState() {
|
||||
return tokenMasterKeyState;
|
||||
}
|
||||
}
|
||||
|
||||
private MetadataStore store;
|
||||
|
||||
public OzoneSecretStore(OzoneConfiguration conf)
|
||||
throws IOException {
|
||||
File metaDir = getOzoneMetaDirPath(conf);
|
||||
final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
|
||||
OZONE_OM_DB_CACHE_SIZE_DEFAULT);
|
||||
File omTokenDBFile = new File(metaDir.getPath(),
|
||||
OZONE_MANAGER_TOKEN_DB_NAME);
|
||||
this.store = MetadataStoreBuilder.newBuilder()
|
||||
.setConf(conf)
|
||||
.setDbFile(omTokenDBFile)
|
||||
.setCacheSize(cacheSize * OzoneConsts.MB)
|
||||
.build();
|
||||
}
|
||||
|
||||
public OzoneManagerSecretState loadState() throws IOException {
|
||||
OzoneManagerSecretState state = new OzoneManagerSecretState();
|
||||
int numKeys = loadMasterKeys(state);
|
||||
LOG.info("Loaded " + numKeys + " token master keys");
|
||||
int numTokens = loadTokens(state);
|
||||
LOG.info("Loaded " + numTokens + " tokens");
|
||||
return state;
|
||||
}
|
||||
|
||||
public void storeTokenMasterKey(OzoneSecretKey key) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Storing master key " + key.getKeyId());
|
||||
}
|
||||
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
|
||||
DataOutputStream dataStream = new DataOutputStream(memStream);
|
||||
try {
|
||||
key.write(dataStream);
|
||||
dataStream.close();
|
||||
dataStream = null;
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, dataStream);
|
||||
}
|
||||
try {
|
||||
byte[] dbKey = getMasterKeyDBKey(key);
|
||||
store.put(dbKey, memStream.toByteArray());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to store master key " + key.getKeyId(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void removeTokenMasterKey(OzoneSecretKey key)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing master key " + key.getKeyId());
|
||||
}
|
||||
|
||||
byte[] dbKey = getMasterKeyDBKey(key);
|
||||
try {
|
||||
store.delete(dbKey);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to delete master key " + key.getKeyId(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public void storeToken(T tokenId, Long renewDate)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Storing token " + tokenId.getSequenceNumber());
|
||||
}
|
||||
|
||||
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
|
||||
DataOutputStream dataStream = new DataOutputStream(memStream);
|
||||
try {
|
||||
tokenId.write(dataStream);
|
||||
dataStream.writeLong(renewDate);
|
||||
dataStream.close();
|
||||
dataStream = null;
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, dataStream);
|
||||
}
|
||||
|
||||
byte[] dbKey = getTokenDBKey(tokenId);
|
||||
try {
|
||||
store.put(dbKey, memStream.toByteArray());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to store token " + tokenId.toString(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public void updateToken(T tokenId, Long renewDate)
|
||||
throws IOException {
|
||||
storeToken(tokenId, renewDate);
|
||||
}
|
||||
|
||||
public void removeToken(T tokenId)
|
||||
throws IOException {
|
||||
byte[] dbKey = getTokenDBKey(tokenId);
|
||||
try {
|
||||
store.delete(dbKey);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove token " + tokenId.toString(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public int loadMasterKeys(OzoneManagerSecretState state) throws IOException {
|
||||
MetadataKeyFilters.MetadataKeyFilter filter =
|
||||
(preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
|
||||
.startsWith(TOKEN_MASTER_KEY_KEY_PREFIX);
|
||||
List<Map.Entry<byte[], byte[]>> kvs = store
|
||||
.getRangeKVs(null, Integer.MAX_VALUE, filter);
|
||||
kvs.forEach(entry -> {
|
||||
try {
|
||||
loadTokenMasterKey(state, entry.getValue());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to load master key ",
|
||||
DFSUtil.bytes2String(entry.getKey()), e);
|
||||
}
|
||||
});
|
||||
return kvs.size();
|
||||
}
|
||||
|
||||
private void loadTokenMasterKey(OzoneManagerSecretState state, byte[] data)
|
||||
throws IOException {
|
||||
OzoneSecretKey key = OzoneSecretKey.readProtoBuf(data);
|
||||
state.tokenMasterKeyState.add(key);
|
||||
}
|
||||
|
||||
public int loadTokens(OzoneManagerSecretState state) throws IOException {
|
||||
MetadataKeyFilters.MetadataKeyFilter filter =
|
||||
(preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
|
||||
.startsWith(TOKEN_STATE_KEY_PREFIX);
|
||||
List<Map.Entry<byte[], byte[]>> kvs =
|
||||
store.getRangeKVs(null, Integer.MAX_VALUE, filter);
|
||||
kvs.forEach(entry -> {
|
||||
try {
|
||||
loadToken(state, entry.getValue());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to load token ",
|
||||
DFSUtil.bytes2String(entry.getKey()), e);
|
||||
}
|
||||
});
|
||||
return kvs.size();
|
||||
}
|
||||
|
||||
private void loadToken(OzoneManagerSecretState state, byte[] data)
|
||||
throws IOException {
|
||||
long renewDate;
|
||||
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
|
||||
T tokenId = (T) T.readProtoBuf(in);
|
||||
try {
|
||||
tokenId.readFields(in);
|
||||
renewDate = in.readLong();
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, in);
|
||||
}
|
||||
state.tokenState.put(tokenId, renewDate);
|
||||
}
|
||||
|
||||
private byte[] getMasterKeyDBKey(OzoneSecretKey masterKey) {
|
||||
return DFSUtil.string2Bytes(
|
||||
TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId());
|
||||
}
|
||||
|
||||
private byte[] getTokenDBKey(T tokenId) {
|
||||
return DFSUtil.string2Bytes(
|
||||
TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber());
|
||||
}
|
||||
}
|
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.security;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Security exceptions thrown at Ozone layer.
|
||||
*/
|
||||
public class OzoneSecurityException extends IOException {
|
||||
private final OzoneSecurityException.ResultCodes result;
|
||||
|
||||
/**
|
||||
* Constructs an {@code IOException} with {@code null}
|
||||
* as its error detail message.
|
||||
*/
|
||||
public OzoneSecurityException(OzoneSecurityException.ResultCodes result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an {@code IOException} with the specified detail message.
|
||||
*
|
||||
* @param message The detail message (which is saved for later retrieval by
|
||||
* the
|
||||
* {@link #getMessage()} method)
|
||||
*/
|
||||
public OzoneSecurityException(String message,
|
||||
OzoneSecurityException.ResultCodes result) {
|
||||
super(message);
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an {@code IOException} with the specified detail message
|
||||
* and cause.
|
||||
* <p>
|
||||
* <p> Note that the detail message associated with {@code cause} is
|
||||
* <i>not</i> automatically incorporated into this exception's detail
|
||||
* message.
|
||||
*
|
||||
* @param message The detail message (which is saved for later retrieval by
|
||||
* the
|
||||
* {@link #getMessage()} method)
|
||||
* @param cause The cause (which is saved for later retrieval by the {@link
|
||||
* #getCause()} method). (A null value is permitted, and indicates that the
|
||||
* cause is nonexistent or unknown.)
|
||||
* @since 1.6
|
||||
*/
|
||||
public OzoneSecurityException(String message, Throwable cause,
|
||||
OzoneSecurityException.ResultCodes result) {
|
||||
super(message, cause);
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an {@code IOException} with the specified cause and a
|
||||
* detail message of {@code (cause==null ? null : cause.toString())}
|
||||
* (which typically contains the class and detail message of {@code cause}).
|
||||
* This constructor is useful for IO exceptions that are little more
|
||||
* than wrappers for other throwables.
|
||||
*
|
||||
* @param cause The cause (which is saved for later retrieval by the {@link
|
||||
* #getCause()} method). (A null value is permitted, and indicates that the
|
||||
* cause is nonexistent or unknown.)
|
||||
* @since 1.6
|
||||
*/
|
||||
public OzoneSecurityException(Throwable cause,
|
||||
OzoneSecurityException.ResultCodes result) {
|
||||
super(cause);
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns resultCode.
|
||||
* @return ResultCode
|
||||
*/
|
||||
public OzoneSecurityException.ResultCodes getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Error codes to make it easy to decode these exceptions.
|
||||
*/
|
||||
public enum ResultCodes {
|
||||
OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST,
|
||||
SECRET_MANAGER_HMAC_ERROR
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@ This is similar to Namenode for Ozone.
|
||||
*/
|
||||
|
||||
import "hdds.proto";
|
||||
import "Security.proto";
|
||||
|
||||
enum Type {
|
||||
CreateVolume = 11;
|
||||
@ -69,6 +70,10 @@ enum Type {
|
||||
AbortMultiPartUpload = 48;
|
||||
|
||||
ServiceList = 51;
|
||||
|
||||
GetDelegationToken = 61;
|
||||
RenewDelegationToken = 62;
|
||||
CancelDelegationToken = 63;
|
||||
}
|
||||
|
||||
message OMRequest {
|
||||
@ -111,6 +116,11 @@ message OMRequest {
|
||||
optional MultipartUploadAbortRequest abortMultiPartUploadRequest = 48;
|
||||
|
||||
optional ServiceListRequest serviceListRequest = 51;
|
||||
|
||||
optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
|
||||
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
|
||||
optional hadoop.common.CancelDelegationTokenRequestProto cancelDelegationTokenRequest = 63;
|
||||
|
||||
}
|
||||
|
||||
message OMResponse {
|
||||
@ -154,6 +164,10 @@ message OMResponse {
|
||||
optional MultipartUploadAbortResponse abortMultiPartUploadResponse = 48;
|
||||
|
||||
optional ServiceListResponse ServiceListResponse = 51;
|
||||
|
||||
optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
|
||||
optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
|
||||
optional CancelDelegationTokenResponseProto cancelDelegationTokenResponse = 63;
|
||||
}
|
||||
|
||||
enum Status {
|
||||
@ -640,6 +654,21 @@ message MultipartUploadAbortResponse {
|
||||
required Status status = 1;
|
||||
}
|
||||
|
||||
message GetDelegationTokenResponseProto{
|
||||
required Status status = 1;
|
||||
optional hadoop.common.GetDelegationTokenResponseProto response = 2;
|
||||
}
|
||||
|
||||
message RenewDelegationTokenResponseProto{
|
||||
required Status status = 1;
|
||||
optional hadoop.common.RenewDelegationTokenResponseProto response = 2;
|
||||
}
|
||||
|
||||
message CancelDelegationTokenResponseProto{
|
||||
required Status status = 1;
|
||||
optional hadoop.common.CancelDelegationTokenResponseProto response = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
The OM service that takes care of Ozone namespace.
|
||||
*/
|
||||
|
@ -0,0 +1,216 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.security;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyPair;
|
||||
import java.security.Signature;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test class for {@link OzoneSecretManager}.
|
||||
*/
|
||||
public class TestOzoneSecretManager {
|
||||
|
||||
private OzoneSecretManager<OzoneTokenIdentifier> secretManager;
|
||||
private SecurityConfig securityConfig;
|
||||
private KeyPair keyPair;
|
||||
private long expiryTime;
|
||||
private Text serviceRpcAdd;
|
||||
private OzoneConfiguration conf;
|
||||
private static final String BASEDIR = GenericTestUtils
|
||||
.getTempPath(TestOzoneSecretManager.class.getSimpleName());
|
||||
private final static Text TEST_USER = new Text("testUser");
|
||||
private long tokenMaxLifetime = 1000 * 20;
|
||||
private long tokenRemoverScanInterval = 1000 * 20;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, BASEDIR);
|
||||
securityConfig = new SecurityConfig(conf);
|
||||
// Create Ozone Master key pair.
|
||||
keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
|
||||
expiryTime = Time.monotonicNow() + 60 * 60 * 24;
|
||||
serviceRpcAdd = new Text("localhost");
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
secretManager.stop();
|
||||
FileUtils.deleteQuietly(new File(BASEDIR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateToken() throws Exception {
|
||||
secretManager = createSecretManager(conf, tokenMaxLifetime,
|
||||
expiryTime, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
OzoneTokenIdentifier identifier =
|
||||
OzoneTokenIdentifier.readProtoBuf(token.getIdentifier());
|
||||
// Check basic details.
|
||||
Assert.assertTrue(identifier.getRealUser().equals(TEST_USER));
|
||||
Assert.assertTrue(identifier.getRenewer().equals(TEST_USER));
|
||||
Assert.assertTrue(identifier.getOwner().equals(TEST_USER));
|
||||
|
||||
validateHash(token.getPassword(), token.getIdentifier());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenewTokenSuccess() throws Exception {
|
||||
secretManager = createSecretManager(conf, tokenMaxLifetime,
|
||||
expiryTime, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
Thread.sleep(10 * 5);
|
||||
long renewalTime = secretManager.renewToken(token, TEST_USER.toString());
|
||||
Assert.assertTrue(renewalTime > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests failure for mismatch in renewer.
|
||||
*/
|
||||
@Test
|
||||
public void testRenewTokenFailure() throws Exception {
|
||||
secretManager = createSecretManager(conf, tokenMaxLifetime,
|
||||
expiryTime, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
LambdaTestUtils.intercept(AccessControlException.class,
|
||||
"rougeUser tries to renew a token", () -> {
|
||||
secretManager.renewToken(token, "rougeUser");
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests token renew failure due to max time.
|
||||
*/
|
||||
@Test
|
||||
public void testRenewTokenFailureMaxTime() throws Exception {
|
||||
secretManager = createSecretManager(conf, 100,
|
||||
100, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
Thread.sleep(101);
|
||||
LambdaTestUtils.intercept(IOException.class,
|
||||
"testUser tried to renew an expired token", () -> {
|
||||
secretManager.renewToken(token, TEST_USER.toString());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests token renew failure due to renewal time.
|
||||
*/
|
||||
@Test
|
||||
public void testRenewTokenFailureRenewalTime() throws Exception {
|
||||
secretManager = createSecretManager(conf, 1000 * 10,
|
||||
10, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
Thread.sleep(15);
|
||||
LambdaTestUtils.intercept(IOException.class, "is expired", () -> {
|
||||
secretManager.renewToken(token, TEST_USER.toString());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateIdentifier() throws Exception {
|
||||
secretManager = createSecretManager(conf, tokenMaxLifetime,
|
||||
expiryTime, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
OzoneTokenIdentifier identifier = secretManager.createIdentifier();
|
||||
// Check basic details.
|
||||
Assert.assertTrue(identifier.getOwner().equals(new Text("")));
|
||||
Assert.assertTrue(identifier.getRealUser().equals(new Text("")));
|
||||
Assert.assertTrue(identifier.getRenewer().equals(new Text("")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelTokenSuccess() throws Exception {
|
||||
secretManager = createSecretManager(conf, tokenMaxLifetime,
|
||||
expiryTime, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
secretManager.cancelToken(token, TEST_USER.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelTokenFailure() throws Exception {
|
||||
secretManager = createSecretManager(conf, tokenMaxLifetime,
|
||||
expiryTime, tokenRemoverScanInterval);
|
||||
secretManager.startThreads(keyPair);
|
||||
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
|
||||
TEST_USER,
|
||||
TEST_USER);
|
||||
LambdaTestUtils.intercept(AccessControlException.class,
|
||||
"rougeUser is not authorized to cancel the token", () -> {
|
||||
secretManager.cancelToken(token, "rougeUser");
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate hash using public key of KeyPair.
|
||||
*/
|
||||
private void validateHash(byte[] hash, byte[] identifier) throws Exception {
|
||||
Signature rsaSignature =
|
||||
Signature.getInstance(securityConfig.getSignatureAlgo(),
|
||||
securityConfig.getProvider());
|
||||
rsaSignature.initVerify(keyPair.getPublic());
|
||||
rsaSignature.update(identifier);
|
||||
Assert.assertTrue(rsaSignature.verify(hash));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create instance of {@link OzoneSecretManager}.
|
||||
*/
|
||||
private OzoneSecretManager<OzoneTokenIdentifier> createSecretManager(
|
||||
OzoneConfiguration config, long tokenMaxLife, long expiry, long
|
||||
tokenRemoverScanTime) throws IOException {
|
||||
return new OzoneSecretManager<>(config, tokenMaxLife,
|
||||
expiry, tokenRemoverScanTime, serviceRpcAdd);
|
||||
}
|
||||
}
|
@ -17,17 +17,24 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
|
||||
import static org.slf4j.event.Level.INFO;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.KeyPair;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
@ -37,14 +44,29 @@
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorage;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
|
||||
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
|
||||
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyPEMWriter;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMStorage;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.KerberosAuthException;
|
||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.util.KerberosUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
@ -54,6 +76,7 @@
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -63,6 +86,8 @@
|
||||
@InterfaceAudience.Private
|
||||
public final class TestSecureOzoneCluster {
|
||||
|
||||
private static final String TEST_USER = "testUgiUser";
|
||||
private static final int CLIENT_TIMEOUT = 2 * 1000;
|
||||
private Logger LOGGER = LoggerFactory
|
||||
.getLogger(TestSecureOzoneCluster.class);
|
||||
|
||||
@ -83,14 +108,24 @@ public final class TestSecureOzoneCluster {
|
||||
private static String clusterId;
|
||||
private static String scmId;
|
||||
private static String omId;
|
||||
private OzoneManagerProtocolClientSideTranslatorPB omClient;
|
||||
private KeyPair keyPair;
|
||||
private Path metaDirPath;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
try {
|
||||
conf = new OzoneConfiguration();
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
|
||||
DefaultMetricsSystem.setMiniClusterMode(true);
|
||||
final String path = GenericTestUtils
|
||||
.getTempPath(UUID.randomUUID().toString());
|
||||
metaDirPath = Paths.get(path, "om-meta");
|
||||
conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
|
||||
startMiniKdc();
|
||||
setSecureConfig(conf);
|
||||
createCredentialsInKDC(conf, miniKdc);
|
||||
generateKeyPair(conf);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Failed to initialize TestSecureOzoneCluster", e);
|
||||
} catch (Exception e) {
|
||||
@ -108,6 +143,10 @@ public void stop() {
|
||||
if (om != null) {
|
||||
om.stop();
|
||||
}
|
||||
if (omClient != null) {
|
||||
omClient.close();
|
||||
}
|
||||
FileUtils.deleteQuietly(metaDirPath.toFile());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to stop TestSecureOzoneCluster", e);
|
||||
}
|
||||
@ -117,11 +156,11 @@ private void createCredentialsInKDC(Configuration conf, MiniKdc miniKdc)
|
||||
throws Exception {
|
||||
createPrincipal(scmKeytab,
|
||||
conf.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
|
||||
createPrincipal(spnegoKeytab,
|
||||
conf.get(ScmConfigKeys
|
||||
.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
|
||||
conf.get(OMConfigKeys
|
||||
.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
|
||||
createPrincipal(spnegoKeytab,
|
||||
conf.get(ScmConfigKeys
|
||||
.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
|
||||
conf.get(OMConfigKeys
|
||||
.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
|
||||
createPrincipal(omKeyTab,
|
||||
conf.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
|
||||
}
|
||||
@ -139,12 +178,13 @@ private void startMiniKdc() throws Exception {
|
||||
miniKdc.start();
|
||||
}
|
||||
|
||||
private void stopMiniKdc() throws Exception {
|
||||
private void stopMiniKdc() {
|
||||
miniKdc.stop();
|
||||
}
|
||||
|
||||
private void setSecureConfig(Configuration conf) throws IOException {
|
||||
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
|
||||
conf.setBoolean(OZONE_ENABLED, true);
|
||||
String host = InetAddress.getLocalHost().getCanonicalHostName();
|
||||
String realm = miniKdc.getRealm();
|
||||
curUser = UserGroupInformation.getCurrentUser()
|
||||
@ -247,60 +287,262 @@ private void testCommonKerberosFailures(Callable callable) throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the secure KSM Initialization Failure.
|
||||
* Tests the secure om Initialization Failure.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testSecureKsmInitializationFailure() throws Exception {
|
||||
public void testSecureOMInitializationFailure() throws Exception {
|
||||
initSCM();
|
||||
// Create a secure SCM instance as om client will connect to it
|
||||
scm = StorageContainerManager.createSCM(null, conf);
|
||||
|
||||
final String path = GenericTestUtils
|
||||
.getTempPath(UUID.randomUUID().toString());
|
||||
OMStorage ksmStore = new OMStorage(conf);
|
||||
ksmStore.setClusterId("testClusterId");
|
||||
ksmStore.setScmId("testScmId");
|
||||
// writes the version file properties
|
||||
ksmStore.initialize();
|
||||
setupOm(conf);
|
||||
conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
|
||||
"non-existent-user@EXAMPLE.com");
|
||||
testCommonKerberosFailures(() -> OzoneManager.createOm(null, conf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the secure KSM Initialization success.
|
||||
* Tests the secure om Initialization success.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testSecureKsmInitializationSuccess() throws Exception {
|
||||
public void testSecureOmInitializationSuccess() throws Exception {
|
||||
initSCM();
|
||||
// Create a secure SCM instance as om client will connect to it
|
||||
scm = StorageContainerManager.createSCM(null, conf);
|
||||
LogCapturer logs = LogCapturer.captureLogs(OzoneManager.LOG);
|
||||
GenericTestUtils.setLogLevel(OzoneManager.LOG, INFO);
|
||||
|
||||
setupOm(conf);
|
||||
try {
|
||||
om.start();
|
||||
} catch (Exception ex) {
|
||||
// Expects timeout failure from scmClient in om but om user login via
|
||||
// kerberos should succeed.
|
||||
Assert.assertTrue(logs.getOutput().contains("Ozone Manager login"
|
||||
+ " successful"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs following tests for delegation token.
|
||||
* 1. Get valid delegation token
|
||||
* 2. Test successful token renewal.
|
||||
* 3. Client can authenticate using token.
|
||||
* 4. Delegation token renewal without Kerberos auth fails.
|
||||
* 5. Test success of token cancellation.
|
||||
* 5. Test failure of token cancellation.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDelegationToken() throws Exception {
|
||||
|
||||
// Capture logs for assertions
|
||||
LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
|
||||
GenericTestUtils
|
||||
.setLogLevel(LoggerFactory.getLogger(OzoneManager.class.getName()),
|
||||
org.slf4j.event.Level.INFO);
|
||||
.setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
|
||||
|
||||
final String path = GenericTestUtils
|
||||
.getTempPath(UUID.randomUUID().toString());
|
||||
Path metaDirPath = Paths.get(path, "om-meta");
|
||||
// Setup secure OM for start
|
||||
setupOm(conf);
|
||||
long omVersion =
|
||||
RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
|
||||
// Start OM
|
||||
om.start();
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
String username = ugi.getUserName();
|
||||
ugi.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
||||
|
||||
OMStorage omStore = new OMStorage(conf);
|
||||
// Get first OM client which will authenticate via Kerberos
|
||||
omClient = new OzoneManagerProtocolClientSideTranslatorPB(
|
||||
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
|
||||
OmUtils.getOmAddress(conf), ugi, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf),
|
||||
CLIENT_TIMEOUT), RandomStringUtils.randomAscii(5));
|
||||
|
||||
// Assert if auth was successful via Kerberos
|
||||
Assert.assertFalse(logs.getOutput().contains(
|
||||
"Auth successful for " + username + " (auth:KERBEROS)"));
|
||||
|
||||
// Case 1: Test successful delegation token.
|
||||
Token<OzoneTokenIdentifier> token = omClient
|
||||
.getDelegationToken(new Text("om"));
|
||||
|
||||
// Case 2: Test successful token renewal.
|
||||
long renewalTime = omClient.renewDelegationToken(token);
|
||||
Assert.assertTrue(renewalTime > 0);
|
||||
|
||||
// Check if token is of right kind and renewer is running om instance
|
||||
Assert.assertEquals(token.getKind().toString(), "OzoneToken");
|
||||
Assert.assertEquals(token.getService().toString(),
|
||||
OmUtils.getOmRpcAddress(conf));
|
||||
omClient.close();
|
||||
|
||||
// Create a remote ugi and set its authentication method to Token
|
||||
UserGroupInformation testUser = UserGroupInformation
|
||||
.createRemoteUser(TEST_USER);
|
||||
testUser.addToken(token);
|
||||
testUser.setAuthenticationMethod(AuthMethod.TOKEN);
|
||||
UserGroupInformation.setLoginUser(testUser);
|
||||
|
||||
// Get Om client, this time authentication should happen via Token
|
||||
testUser.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
omClient = new OzoneManagerProtocolClientSideTranslatorPB(
|
||||
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
|
||||
OmUtils.getOmAddress(conf), testUser, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf), CLIENT_TIMEOUT),
|
||||
RandomStringUtils.randomAscii(5));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// Case 3: Test Client can authenticate using token.
|
||||
Assert.assertFalse(logs.getOutput().contains(
|
||||
"Auth successful for " + username + " (auth:TOKEN)"));
|
||||
LambdaTestUtils.intercept(IOException.class, "Delete Volume failed," +
|
||||
" error:VOLUME_NOT_FOUND",
|
||||
() -> omClient.deleteVolume("vol1"));
|
||||
Assert.assertTrue(logs.getOutput().contains(
|
||||
"Auth successful for " + username + " (auth:TOKEN)"));
|
||||
|
||||
// Case 4: Test failure of token renewal.
|
||||
// Call to renewDelegationToken will fail but it will confirm that
|
||||
// initial connection via DT succeeded
|
||||
LambdaTestUtils.intercept(RemoteException.class, "Delegation "
|
||||
+ "Token can be renewed only with kerberos or web authentication",
|
||||
() -> omClient.renewDelegationToken(token));
|
||||
Assert.assertTrue(logs.getOutput().contains(
|
||||
"Auth successful for " + username + " (auth:TOKEN)"));
|
||||
//testUser.setAuthenticationMethod(AuthMethod.KERBEROS);
|
||||
UserGroupInformation.setLoginUser(ugi);
|
||||
omClient = new OzoneManagerProtocolClientSideTranslatorPB(
|
||||
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
|
||||
OmUtils.getOmAddress(conf), ugi, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf),
|
||||
Client.getRpcTimeout(conf)), RandomStringUtils.randomAscii(5));
|
||||
|
||||
// Case 5: Test success of token cancellation.
|
||||
omClient.cancelDelegationToken(token);
|
||||
omClient.close();
|
||||
|
||||
// Wait for client to timeout
|
||||
Thread.sleep(CLIENT_TIMEOUT);
|
||||
|
||||
Assert.assertFalse(logs.getOutput().contains("Auth failed for"));
|
||||
|
||||
// Case 6: Test failure of token cancellation.
|
||||
// Get Om client, this time authentication using Token will fail as
|
||||
// token is expired
|
||||
omClient = new OzoneManagerProtocolClientSideTranslatorPB(
|
||||
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
|
||||
OmUtils.getOmAddress(conf), testUser, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf),
|
||||
Client.getRpcTimeout(conf)), RandomStringUtils.randomAscii(5));
|
||||
LambdaTestUtils.intercept(RemoteException.class, "can't be found in cache",
|
||||
() -> omClient.cancelDelegationToken(token));
|
||||
Assert.assertTrue(logs.getOutput().contains(
|
||||
"Auth failed for"));
|
||||
}
|
||||
|
||||
private void generateKeyPair(OzoneConfiguration config) throws Exception {
|
||||
HDDSKeyGenerator keyGenerator = new HDDSKeyGenerator(conf);
|
||||
keyPair = keyGenerator.generateKey();
|
||||
HDDSKeyPEMWriter pemWriter = new HDDSKeyPEMWriter(config);
|
||||
pemWriter.writeKey(keyPair, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests delegation token renewal.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDelegationTokenRenewal() throws Exception {
|
||||
// Capture logs for assertions.
|
||||
LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
|
||||
GenericTestUtils
|
||||
.setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
|
||||
|
||||
// Setup secure OM for start.
|
||||
OzoneConfiguration newConf = new OzoneConfiguration(conf);
|
||||
newConf.setLong(OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY, 500);
|
||||
setupOm(newConf);
|
||||
long omVersion =
|
||||
RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
|
||||
OzoneManager.setTestSecureOmFlag(true);
|
||||
// Start OM
|
||||
|
||||
om.start();
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
String username = ugi.getUserName();
|
||||
|
||||
// Get first OM client which will authenticate via Kerberos
|
||||
omClient = new OzoneManagerProtocolClientSideTranslatorPB(
|
||||
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
|
||||
OmUtils.getOmAddress(conf), ugi, conf,
|
||||
NetUtils.getDefaultSocketFactory(conf),
|
||||
CLIENT_TIMEOUT), RandomStringUtils.randomAscii(5));
|
||||
|
||||
// Since client is already connected get a delegation token
|
||||
Token<OzoneTokenIdentifier> token = omClient
|
||||
.getDelegationToken(new Text("om"));
|
||||
|
||||
// Check if token is of right kind and renewer is running om instance
|
||||
Assert.assertEquals(token.getKind().toString(), "OzoneToken");
|
||||
Assert.assertEquals(token.getService().toString(),
|
||||
OmUtils.getOmRpcAddress(conf));
|
||||
|
||||
// Renew delegation token
|
||||
long expiryTime = omClient.renewDelegationToken(token);
|
||||
Assert.assertTrue(expiryTime > 0);
|
||||
|
||||
// Test failure of delegation renewal
|
||||
// 1. When renewer doesn't match (implicitly covers when renewer is
|
||||
// null or empty )
|
||||
Token token2 = omClient.getDelegationToken(new Text("randomService"));
|
||||
LambdaTestUtils.intercept(RemoteException.class,
|
||||
" with non-matching renewer randomService",
|
||||
() -> omClient.renewDelegationToken(token2));
|
||||
|
||||
// 2. Test tampered token
|
||||
OzoneTokenIdentifier tokenId = OzoneTokenIdentifier
|
||||
.readProtoBuf(token.getIdentifier());
|
||||
tokenId.setRenewer(new Text("om"));
|
||||
tokenId.setMaxDate(System.currentTimeMillis() * 2);
|
||||
Token<OzoneTokenIdentifier> tamperedToken = new Token<>(
|
||||
tokenId.getBytes(), token2.getPassword(), token2.getKind(),
|
||||
token2.getService());
|
||||
LambdaTestUtils
|
||||
.intercept(RemoteException.class, "can't be found in cache",
|
||||
() -> omClient.renewDelegationToken(tamperedToken));
|
||||
|
||||
// 3. When token maxExpiryTime exceeds
|
||||
Thread.sleep(500);
|
||||
LambdaTestUtils
|
||||
.intercept(RemoteException.class, "om tried to renew an expired"
|
||||
+ " token", () -> omClient.renewDelegationToken(token));
|
||||
}
|
||||
|
||||
private void setupOm(OzoneConfiguration config) throws Exception {
|
||||
OMStorage omStore = new OMStorage(config);
|
||||
omStore.setClusterId("testClusterId");
|
||||
omStore.setScmId("testScmId");
|
||||
// writes the version file properties
|
||||
omStore.initialize();
|
||||
try {
|
||||
om = OzoneManager.createOm(null, conf);
|
||||
} catch (Exception ex) {
|
||||
// Expects timeout failure from scmClient in KSM but KSM user login via
|
||||
// kerberos should succeed
|
||||
Assert.assertTrue(
|
||||
logs.getOutput().contains("Ozone Manager login successful."));
|
||||
}
|
||||
OzoneManager.setTestSecureOmFlag(true);
|
||||
om = OzoneManager.createOm(null, config);
|
||||
CertificateClient certClient = Mockito.mock(CertificateClient.class);
|
||||
Mockito.when(certClient.getPrivateKey("om"))
|
||||
.thenReturn(keyPair.getPrivate());
|
||||
Mockito.when(certClient.getPublicKey("om"))
|
||||
.thenReturn(keyPair.getPublic());
|
||||
om.setCertClient(certClient);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,6 +23,8 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import java.security.KeyPair;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
@ -35,16 +37,23 @@
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
|
||||
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
||||
import org.apache.hadoop.ozone.security.OzoneSecurityException;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.ozone.security.OzoneSecretManager;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.audit.AuditAction;
|
||||
import org.apache.hadoop.ozone.audit.AuditEventStatus;
|
||||
@ -53,7 +62,6 @@
|
||||
import org.apache.hadoop.ozone.audit.AuditMessage;
|
||||
import org.apache.hadoop.ozone.audit.Auditor;
|
||||
import org.apache.hadoop.ozone.audit.OMAction;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.common.Storage.StorageState;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
||||
@ -91,6 +99,8 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
@ -101,7 +111,6 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
import javax.ws.rs.HEAD;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
@ -118,8 +127,8 @@
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.ozone.security.OzoneSecurityException.ResultCodes.*;
|
||||
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
|
||||
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
|
||||
import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
|
||||
@ -145,7 +154,6 @@
|
||||
import static org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.OzoneManagerService
|
||||
.newReflectiveBlockingService;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys
|
||||
.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys
|
||||
@ -161,13 +169,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneManager.class);
|
||||
|
||||
private static final AuditLogger AUDIT =
|
||||
new AuditLogger(AuditLoggerType.OMLOGGER);
|
||||
private static final AuditLogger AUDIT = new AuditLogger(
|
||||
AuditLoggerType.OMLOGGER);
|
||||
|
||||
private static final String USAGE =
|
||||
"Usage: \n ozone om [genericOptions] " + "[ "
|
||||
+ StartupOption.INIT.getName() + " ]\n " + "ozone om [ "
|
||||
+ StartupOption.HELP.getName() + " ]\n";
|
||||
private static final String OM_DAEMON = "om";
|
||||
private static boolean securityEnabled = false;
|
||||
private static OzoneSecretManager secretManager;
|
||||
// TO DO: For testing purpose only, remove before commiting
|
||||
private KeyPair keyPair;
|
||||
private CertificateClient certClient;
|
||||
private static boolean testSecureOmFlag = false;
|
||||
private final OzoneConfiguration configuration;
|
||||
private RPC.Server omRpcServer;
|
||||
private InetSocketAddress omRpcAddress;
|
||||
@ -208,21 +223,60 @@ private OzoneManager(OzoneConfiguration conf) throws IOException {
|
||||
ResultCodes.OM_NOT_INITIALIZED);
|
||||
}
|
||||
|
||||
scmContainerClient = getScmContainerClient(configuration);
|
||||
if (!testSecureOmFlag) {
|
||||
scmContainerClient = getScmContainerClient(configuration);
|
||||
// verifies that the SCM info in the OM Version file is correct.
|
||||
scmBlockClient = getScmBlockClient(configuration);
|
||||
ScmInfo scmInfo = scmBlockClient.getScmInfo();
|
||||
if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
|
||||
.getScmId().equals(omStorage.getScmId()))) {
|
||||
throw new OMException("SCM version info mismatch.",
|
||||
ResultCodes.SCM_VERSION_MISMATCH_ERROR);
|
||||
}
|
||||
} else {
|
||||
// For testing purpose only
|
||||
scmContainerClient = null;
|
||||
scmBlockClient = null;
|
||||
}
|
||||
InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
|
||||
int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
|
||||
OZONE_OM_HANDLER_COUNT_DEFAULT);
|
||||
|
||||
// verifies that the SCM info in the OM Version file is correct.
|
||||
scmBlockClient = getScmBlockClient(configuration);
|
||||
// This is a temporary check. Once fully implemented, all OM state change
|
||||
// should go through Ratis - either standalone (for non-HA) or replicated
|
||||
// (for HA).
|
||||
boolean omRatisEnabled = configuration.getBoolean(
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
|
||||
if (omRatisEnabled) {
|
||||
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
|
||||
omNodeRpcAddr.getAddress(), configuration);
|
||||
omRatisServer.start();
|
||||
|
||||
ScmInfo scmInfo = scmBlockClient.getScmInfo();
|
||||
if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
|
||||
.getScmId().equals(omStorage.getScmId()))) {
|
||||
throw new OMException("SCM version info mismatch.",
|
||||
ResultCodes.SCM_VERSION_MISMATCH_ERROR);
|
||||
LOG.info("OzoneManager Ratis server started at port {}",
|
||||
omRatisServer.getServerPort());
|
||||
|
||||
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
|
||||
omId, omRatisServer.getRaftGroup(), configuration);
|
||||
omRatisClient.connect();
|
||||
} else {
|
||||
omRatisServer = null;
|
||||
omRatisClient = null;
|
||||
}
|
||||
|
||||
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
BlockingService omService = newReflectiveBlockingService(
|
||||
new OzoneManagerProtocolServerSideTranslatorPB(
|
||||
this, omRatisClient, omRatisEnabled));
|
||||
secretManager = createSecretManager(configuration);
|
||||
|
||||
omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
|
||||
OzoneManagerProtocolPB.class, omService,
|
||||
handlerCount);
|
||||
omRpcAddress = updateRPCListenAddress(configuration,
|
||||
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
|
||||
metadataManager = new OmMetadataManagerImpl(configuration);
|
||||
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
|
||||
bucketManager = new BucketManagerImpl(metadataManager);
|
||||
@ -313,6 +367,67 @@ private File getMetricsStorageFile() {
|
||||
}
|
||||
|
||||
|
||||
private OzoneSecretManager createSecretManager(
|
||||
OzoneConfiguration conf)
|
||||
throws IOException {
|
||||
long tokenRemoverScanInterval =
|
||||
conf.getTimeDuration(OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_KEY,
|
||||
OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long tokenMaxLifetime =
|
||||
conf.getTimeDuration(OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
||||
OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long tokenRenewInterval =
|
||||
conf.getTimeDuration(OMConfigKeys.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
||||
OMConfigKeys.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
Text omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
|
||||
|
||||
return new OzoneSecretManager(conf, tokenMaxLifetime, tokenRenewInterval,
|
||||
tokenRemoverScanInterval, omRpcAddressTxt);
|
||||
}
|
||||
|
||||
private void stopSecretManager() throws IOException {
|
||||
if (secretManager != null) {
|
||||
LOG.info("Stopping OM secret manager");
|
||||
secretManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void startSecretManager() {
|
||||
if (secretManager != null) {
|
||||
try {
|
||||
readKeyPair();
|
||||
LOG.info("Starting OM secret manager");
|
||||
secretManager.startThreads(keyPair);
|
||||
} catch (IOException e) {
|
||||
// Inability to start secret manager
|
||||
// can't be recovered from.
|
||||
LOG.error("Error starting secret manager.", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setCertClient(CertificateClient certClient) {
|
||||
// TODO: Initialize it in contructor with implementation for certClient.
|
||||
this.certClient = certClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read private key from file.
|
||||
*/
|
||||
private void readKeyPair() throws OzoneSecurityException {
|
||||
try {
|
||||
keyPair = new KeyPair(certClient.getPublicKey(OM_DAEMON),
|
||||
certClient.getPrivateKey(OM_DAEMON));
|
||||
} catch (Exception e) {
|
||||
throw new OzoneSecurityException("Error reading private file for "
|
||||
+ "OzoneManager", e, OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Login OM service user if security and Kerberos are enabled.
|
||||
*
|
||||
@ -322,8 +437,8 @@ private File getMetricsStorageFile() {
|
||||
private static void loginOMUser(OzoneConfiguration conf)
|
||||
throws IOException, AuthenticationException {
|
||||
|
||||
if (SecurityUtil.getAuthenticationMethod(conf).equals
|
||||
(AuthenticationMethod.KERBEROS)) {
|
||||
if (SecurityUtil.getAuthenticationMethod(conf).equals(
|
||||
AuthenticationMethod.KERBEROS)) {
|
||||
LOG.debug("Ozone security is enabled. Attempting login for OM user. "
|
||||
+ "Principal: {},keytab: {}", conf.get(
|
||||
OZONE_OM_KERBEROS_PRINCIPAL_KEY),
|
||||
@ -335,8 +450,8 @@ private static void loginOMUser(OzoneConfiguration conf)
|
||||
SecurityUtil.login(conf, OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
|
||||
OZONE_OM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
|
||||
} else {
|
||||
throw new AuthenticationException(SecurityUtil.getAuthenticationMethod
|
||||
(conf) + " authentication method not supported. OM user login "
|
||||
throw new AuthenticationException(SecurityUtil.getAuthenticationMethod(
|
||||
conf) + " authentication method not supported. OM user login "
|
||||
+ "failed.");
|
||||
}
|
||||
LOG.info("Ozone Manager login successful.");
|
||||
@ -409,7 +524,7 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf,
|
||||
.setPort(addr.getPort())
|
||||
.setNumHandlers(handlerCount)
|
||||
.setVerbose(false)
|
||||
.setSecretManager(null)
|
||||
.setSecretManager(secretManager)
|
||||
.build();
|
||||
|
||||
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
|
||||
@ -449,6 +564,10 @@ private static void printUsage(PrintStream out) {
|
||||
out.println(USAGE + "\n");
|
||||
}
|
||||
|
||||
private static boolean isOzoneSecurityEnabled() {
|
||||
return securityEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs OM instance based on command line arguments.
|
||||
*
|
||||
@ -493,8 +612,10 @@ private static OzoneManager createOm(String[] argv,
|
||||
terminate(1);
|
||||
return null;
|
||||
}
|
||||
|
||||
securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
|
||||
// Authenticate KSM if security is enabled
|
||||
if (conf.getBoolean(OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY, true)) {
|
||||
if (securityEnabled) {
|
||||
loginOMUser(conf);
|
||||
}
|
||||
switch (startOpt) {
|
||||
@ -637,49 +758,13 @@ public OMMetrics getMetrics() {
|
||||
*/
|
||||
public void start() throws IOException {
|
||||
|
||||
InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
|
||||
int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
|
||||
OZONE_OM_HANDLER_COUNT_DEFAULT);
|
||||
|
||||
// This is a temporary check. Once fully implemented, all OM state change
|
||||
// should go through Ratis - either standalone (for non-HA) or replicated
|
||||
// (for HA).
|
||||
boolean omRatisEnabled = configuration.getBoolean(
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
|
||||
if (omRatisEnabled) {
|
||||
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
|
||||
omNodeRpcAddr.getAddress(), configuration);
|
||||
omRatisServer.start();
|
||||
|
||||
LOG.info("OzoneManager Ratis server started at port {}",
|
||||
omRatisServer.getServerPort());
|
||||
|
||||
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
|
||||
omId, omRatisServer.getRaftGroup(), configuration);
|
||||
omRatisClient.connect();
|
||||
} else {
|
||||
omRatisServer = null;
|
||||
omRatisClient = null;
|
||||
}
|
||||
|
||||
BlockingService omService = newReflectiveBlockingService(
|
||||
new OzoneManagerProtocolServerSideTranslatorPB(
|
||||
this, omRatisClient, omRatisEnabled));
|
||||
omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
|
||||
OzoneManagerProtocolPB.class, omService,
|
||||
handlerCount);
|
||||
omRpcAddress = updateRPCListenAddress(configuration,
|
||||
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
|
||||
omRpcServer.start();
|
||||
|
||||
LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
|
||||
omRpcAddress));
|
||||
|
||||
DefaultMetricsSystem.initialize("OzoneManager");
|
||||
|
||||
metadataManager.start(configuration);
|
||||
|
||||
startSecretManagerIfNecessary();
|
||||
|
||||
// Set metrics and start metrics back ground thread
|
||||
metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
|
||||
@ -700,8 +785,7 @@ public void start() throws IOException {
|
||||
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
|
||||
|
||||
keyManager.start(configuration);
|
||||
|
||||
httpServer = new OzoneManagerHttpServer(configuration, this);
|
||||
omRpcServer.start();
|
||||
try {
|
||||
httpServer.start();
|
||||
} catch (Exception ex) {
|
||||
@ -731,6 +815,7 @@ public void stop() {
|
||||
omRatisServer.stop();
|
||||
}
|
||||
keyManager.stop();
|
||||
stopSecretManager();
|
||||
httpServer.stop();
|
||||
metadataManager.stop();
|
||||
metrics.unRegister();
|
||||
@ -755,6 +840,140 @@ public void join() {
|
||||
}
|
||||
}
|
||||
|
||||
private void startSecretManagerIfNecessary() {
|
||||
boolean shouldRun = shouldUseDelegationTokens() && isOzoneSecurityEnabled();
|
||||
boolean running = secretManager.isRunning();
|
||||
if (shouldRun && !running) {
|
||||
startSecretManager();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldUseDelegationTokens() {
|
||||
return UserGroupInformation.isSecurityEnabled();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @return true if delegation token operation is allowed
|
||||
*/
|
||||
private boolean isAllowedDelegationTokenOp() throws IOException {
|
||||
AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& (authMethod != AuthenticationMethod.KERBEROS)
|
||||
&& (authMethod != AuthenticationMethod.KERBEROS_SSL)
|
||||
&& (authMethod != AuthenticationMethod.CERTIFICATE)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns authentication method used to establish the connection.
|
||||
* @return AuthenticationMethod used to establish connection
|
||||
* @throws IOException
|
||||
*/
|
||||
private AuthenticationMethod getConnectionAuthenticationMethod()
|
||||
throws IOException {
|
||||
UserGroupInformation ugi = getRemoteUser();
|
||||
AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
|
||||
if (authMethod == AuthenticationMethod.PROXY) {
|
||||
authMethod = ugi.getRealUser().getAuthenticationMethod();
|
||||
}
|
||||
return authMethod;
|
||||
}
|
||||
|
||||
// optimize ugi lookup for RPC operations to avoid a trip through
|
||||
// UGI.getCurrentUser which is synch'ed
|
||||
private static UserGroupInformation getRemoteUser() throws IOException {
|
||||
UserGroupInformation ugi = Server.getRemoteUser();
|
||||
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get delegation token from OzoneManager.
|
||||
* @param renewer Renewer information
|
||||
* @return delegationToken DelegationToken signed by OzoneManager
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@Override
|
||||
public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
final boolean success;
|
||||
final String tokenId;
|
||||
Token<OzoneTokenIdentifier> token;
|
||||
|
||||
if (!isAllowedDelegationTokenOp()) {
|
||||
throw new IOException("Delegation Token can be issued only with "
|
||||
+ "kerberos or web authentication");
|
||||
}
|
||||
if (secretManager == null || !secretManager.isRunning()) {
|
||||
LOG.warn("trying to get DT with no secret manager running in OM.");
|
||||
return null;
|
||||
}
|
||||
|
||||
UserGroupInformation ugi = getRemoteUser();
|
||||
String user = ugi.getUserName();
|
||||
Text owner = new Text(user);
|
||||
Text realUser = null;
|
||||
if (ugi.getRealUser() != null) {
|
||||
realUser = new Text(ugi.getRealUser().getUserName());
|
||||
}
|
||||
|
||||
token = secretManager.createToken(owner, renewer, realUser);
|
||||
return token;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to renew a delegationToken issued by OzoneManager.
|
||||
* @param token token to renew
|
||||
* @return new expiryTime of the token
|
||||
* @throws InvalidToken if {@code token} is invalid
|
||||
* @throws IOException on other errors
|
||||
*/
|
||||
@Override
|
||||
public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws InvalidToken, IOException {
|
||||
long expiryTime;
|
||||
|
||||
try {
|
||||
|
||||
if (!isAllowedDelegationTokenOp()) {
|
||||
throw new IOException("Delegation Token can be renewed only with "
|
||||
+ "kerberos or web authentication");
|
||||
}
|
||||
String renewer = getRemoteUser().getShortUserName();
|
||||
expiryTime = secretManager.renewToken(token, renewer);
|
||||
|
||||
} catch (AccessControlException ace) {
|
||||
final OzoneTokenIdentifier id = OzoneTokenIdentifier.readProtoBuf(
|
||||
token.getIdentifier());
|
||||
LOG.error("Delegation token renewal failed for dt: {}, cause: {}",
|
||||
id.toString(), ace.getMessage());
|
||||
throw ace;
|
||||
}
|
||||
return expiryTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels a delegation token.
|
||||
* @param token token to cancel
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
|
||||
throws IOException {
|
||||
OzoneTokenIdentifier id = null;
|
||||
try {
|
||||
String canceller = getRemoteUser().getUserName();
|
||||
id = secretManager.cancelToken(token, canceller);
|
||||
LOG.trace("Delegation token renewed for dt: {}", id);
|
||||
} catch (AccessControlException ace) {
|
||||
LOG.error("Delegation token renewal failed for dt: {}, cause: {}", id,
|
||||
ace.getMessage());
|
||||
throw ace;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Creates a volume.
|
||||
*
|
||||
@ -1760,4 +1979,8 @@ public String getName() {
|
||||
public static Logger getLogger() {
|
||||
return LOG;
|
||||
}
|
||||
|
||||
public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
|
||||
OzoneManager.testSecureOmFlag = testSecureOmFlag;
|
||||
}
|
||||
}
|
||||
|
@ -22,7 +22,9 @@
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
@ -153,6 +155,15 @@
|
||||
.SetVolumePropertyRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.SetVolumePropertyResponse;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
|
||||
import org.slf4j.Logger;
|
||||
@ -317,6 +328,21 @@ public OMResponse handle(OMRequest request) {
|
||||
request.getServiceListRequest());
|
||||
responseBuilder.setServiceListResponse(serviceListResponse);
|
||||
break;
|
||||
case GetDelegationToken:
|
||||
GetDelegationTokenResponseProto getDtResp = getDelegationToken(
|
||||
request.getGetDelegationTokenRequest());
|
||||
responseBuilder.setGetDelegationTokenResponse(getDtResp);
|
||||
break;
|
||||
case RenewDelegationToken:
|
||||
RenewDelegationTokenResponseProto renewDtResp = renewDelegationToken(
|
||||
request.getRenewDelegationTokenRequest());
|
||||
responseBuilder.setRenewDelegationTokenResponse(renewDtResp);
|
||||
break;
|
||||
case CancelDelegationToken:
|
||||
CancelDelegationTokenResponseProto cancelDtResp = cancelDelegationToken(
|
||||
request.getCancelDelegationTokenRequest());
|
||||
responseBuilder.setCancelDelegationTokenResponse(cancelDtResp);
|
||||
break;
|
||||
default:
|
||||
responseBuilder.setSuccess(false);
|
||||
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
|
||||
@ -915,4 +941,61 @@ private MultipartUploadAbortResponse abortMultipartUpload(
|
||||
}
|
||||
return response.build();
|
||||
}
|
||||
|
||||
private GetDelegationTokenResponseProto getDelegationToken(
|
||||
GetDelegationTokenRequestProto request){
|
||||
GetDelegationTokenResponseProto.Builder rb =
|
||||
GetDelegationTokenResponseProto.newBuilder();
|
||||
try {
|
||||
Token<OzoneTokenIdentifier> token = impl
|
||||
.getDelegationToken(new Text(request.getRenewer()));
|
||||
if (token != null) {
|
||||
rb.setResponse(org.apache.hadoop.security.proto.SecurityProtos
|
||||
.GetDelegationTokenResponseProto.newBuilder().setToken(OMPBHelper
|
||||
.convertToTokenProto(token)).build());
|
||||
}
|
||||
rb.setStatus(Status.OK);
|
||||
} catch (IOException ex) {
|
||||
rb.setStatus(exceptionToResponseStatus(ex));
|
||||
}
|
||||
return rb.build();
|
||||
}
|
||||
|
||||
private RenewDelegationTokenResponseProto renewDelegationToken(
|
||||
RenewDelegationTokenRequestProto request) {
|
||||
RenewDelegationTokenResponseProto.Builder rb =
|
||||
RenewDelegationTokenResponseProto.newBuilder();
|
||||
try {
|
||||
if(request.hasToken()) {
|
||||
long expiryTime = impl
|
||||
.renewDelegationToken(
|
||||
OMPBHelper.convertToDelegationToken(request.getToken()));
|
||||
rb.setResponse(org.apache.hadoop.security.proto.SecurityProtos
|
||||
.RenewDelegationTokenResponseProto.newBuilder()
|
||||
.setNewExpiryTime(expiryTime).build());
|
||||
}
|
||||
rb.setStatus(Status.OK);
|
||||
} catch (IOException ex) {
|
||||
rb.setStatus(exceptionToResponseStatus(ex));
|
||||
}
|
||||
return rb.build();
|
||||
}
|
||||
|
||||
private CancelDelegationTokenResponseProto cancelDelegationToken(
|
||||
CancelDelegationTokenRequestProto req) {
|
||||
CancelDelegationTokenResponseProto.Builder rb =
|
||||
CancelDelegationTokenResponseProto.newBuilder();
|
||||
try {
|
||||
if(req.hasToken()) {
|
||||
impl.cancelDelegationToken(
|
||||
OMPBHelper.convertToDelegationToken(req.getToken()));
|
||||
}
|
||||
rb.setResponse(org.apache.hadoop.security.proto.SecurityProtos
|
||||
.CancelDelegationTokenResponseProto.getDefaultInstance());
|
||||
rb.setStatus(Status.OK);
|
||||
} catch (IOException ex) {
|
||||
rb.setStatus(exceptionToResponseStatus(ex));
|
||||
}
|
||||
return rb.build();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user