From 50c4045fde06a561b66b2ad39f7f11d69ce07224 Mon Sep 17 00:00:00 2001 From: Ajay Kumar Date: Wed, 19 Dec 2018 16:34:51 -0800 Subject: [PATCH] HDDS-805. Block token: Client api changes for block token. Contributed by Ajay Kumar. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 61 ++- .../hadoop/hdds/scm/XceiverClientManager.java | 16 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 6 + .../scm/client/ContainerOperationClient.java | 10 +- .../apache/hadoop/hdds/HddsConfigKeys.java | 4 + .../apache/hadoop/hdds/client/BlockID.java | 8 +- .../hadoop/hdds/client/ContainerBlockID.java | 10 +- .../hadoop/hdds/scm/XceiverClientSpi.java | 6 + .../scm/storage/ContainerProtocolCalls.java | 162 ++++++-- .../security/token/BlockTokenVerifier.java | 12 +- .../token/OzoneBlockTokenSelector.java | 22 +- .../hdds/security/x509/SecurityConfig.java | 22 +- .../proto/DatanodeContainerProtocol.proto | 1 + .../src/main/resources/ozone-default.xml | 14 +- .../client/io/BlockOutputStreamEntry.java | 359 ++++++++++++++++++ .../ozone/client/io/KeyInputStream.java | 5 + .../ozone/client/io/KeyOutputStream.java | 207 ++-------- .../ozone/om/helpers/OmKeyLocationInfo.java | 44 ++- .../OzoneBlockTokenSecretManager.java | 11 +- .../OzoneDelegationTokenSecretManager.java | 4 +- .../ozone/security/OzoneSecretManager.java | 7 +- .../src/main/proto/OzoneManagerProtocol.proto | 1 + .../TestOzoneBlockTokenSecretManager.java | 3 +- .../apache/hadoop/ozone/MiniOzoneCluster.java | 14 + .../hadoop/ozone/MiniOzoneClusterImpl.java | 1 + .../TestContainerStateMachineIdempotency.java | 9 +- .../client/CertificateClientTestImpl.java | 148 ++++++++ .../ozone/client/rpc/TestOzoneRpcClient.java | 1 + .../rpc/TestOzoneRpcClientAbstract.java | 29 +- .../client/rpc/TestSecureOzoneRpcClient.java | 238 ++++++++++++ .../ozoneimpl/TestOzoneContainer.java | 13 +- .../ozoneimpl/TestSecureOzoneContainer.java | 2 +- .../ozone/scm/TestContainerSmallFile.java | 8 +- .../TestGetCommittedBlockLengthAndPutKey.java | 8 +- .../ozone/scm/TestXceiverClientManager.java | 6 +- .../ozone/web/OzoneHddsDatanodeService.java | 4 +- .../hadoop/ozone/om/KeyManagerImpl.java | 75 +++- .../apache/hadoop/ozone/om/OzoneManager.java | 122 ++++-- .../ozone/om/TestKeyDeletingService.java | 6 +- .../hadoop/ozone/om/TestKeyManagerImpl.java | 2 +- 40 files changed, 1332 insertions(+), 349 deletions(-) create mode 100644 hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 496ef91b09..3ffe54e212 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -32,15 +32,10 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector; import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.Status; @@ -53,7 +48,6 @@ import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -76,6 +70,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { private Map channels; private final Semaphore semaphore; private boolean closed = false; + private SecurityConfig secConfig; /** * Constructs a client that can communicate with the Container framework on @@ -90,6 +85,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { Preconditions.checkNotNull(config); this.pipeline = pipeline; this.config = config; + this.secConfig = new SecurityConfig(config); this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -97,17 +93,30 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { this.asyncStubs = new HashMap<>(); } + /** + * To be used when grpc token is not enabled. + * */ @Override public void connect() throws Exception { - // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning - connectToDatanode(dn); + connectToDatanode(dn, null); } + /** + * Passed encoded token to GRPC header when security is enabled. + * */ + @Override + public void connect(String encodedToken) throws Exception { + // leader by default is the 1st datanode in the datanode list of pipleline + DatanodeDetails dn = this.pipeline.getFirstNode(); + // just make a connection to the 1st datanode at the beginning + connectToDatanode(dn, encodedToken); + } - private void connectToDatanode(DatanodeDetails dn) throws IOException { + private void connectToDatanode(DatanodeDetails dn, String encodedToken) + throws IOException { // read port from the data node, on failure use default configured // port. int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); @@ -119,19 +128,6 @@ private void connectToDatanode(DatanodeDetails dn) throws IOException { // Add credential context to the client call String userName = UserGroupInformation.getCurrentUser() .getShortUserName(); - - // Add block token if block token (mutual auth) is required but the client - // does not have a mTLS (private key and ca signed certificate) - String encodedToken = null; - SecurityConfig secConfig = new SecurityConfig(config); - if (secConfig.isGrpcBlockTokenEnabled()) { - InetSocketAddress addr = new InetSocketAddress(dn.getIpAddress(), port); - encodedToken = getEncodedBlockToken(addr); - if (encodedToken == null) { - throw new SCMSecurityException("No Block token available to access " + - "service at : " + addr.toString()); - } - } LOG.debug("Connecting to server Port : " + dn.getIpAddress()); NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn .getIpAddress(), port).usePlaintext() @@ -166,20 +162,6 @@ private void connectToDatanode(DatanodeDetails dn) throws IOException { channels.put(dn.getUuid(), channel); } - private String getEncodedBlockToken(InetSocketAddress addr) - throws IOException{ - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - OzoneBlockTokenSelector tokenSelector = new OzoneBlockTokenSelector(); - Text service = SecurityUtil.buildTokenService(addr); - Token token = tokenSelector.selectToken( - service, ugi.getTokens()); - if (token != null) { - token.setService(service); - return token.encodeToUrlString(); - } - return null; - } - /** * Returns if the xceiver client connects to all servers in the pipeline. * @@ -301,8 +283,9 @@ private XceiverClientAsyncReply sendCommandAsync( ManagedChannel channel = channels.get(dnId); // If the channel doesn't exist for this specific datanode or the channel // is closed, just reconnect + String token = request.getEncodedToken(); if (!isConnected(channel)) { - reconnect(dn); + reconnect(dn, token); } final CompletableFuture replyFuture = @@ -347,11 +330,11 @@ public void onCompleted() { return new XceiverClientAsyncReply(replyFuture); } - private void reconnect(DatanodeDetails dn) + private void reconnect(DatanodeDetails dn, String encodedToken) throws IOException { ManagedChannel channel; try { - connectToDatanode(dn); + connectToDatanode(dn, encodedToken); channel = channels.get(dn.getUuid()); } catch (Exception e) { LOG.error("Error while connecting: ", e); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index b2735bc79f..e9ca93db9f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneSecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import java.io.Closeable; import java.io.IOException; @@ -62,6 +64,7 @@ public class XceiverClientManager implements Closeable { private final boolean useRatis; private static XceiverClientMetrics metrics; + private boolean isSecurityEnabled; /** * Creates a new XceiverClientManager. * @@ -78,6 +81,7 @@ public XceiverClientManager(Configuration conf) { ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); this.conf = conf; + this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf); this.clientCache = CacheBuilder.newBuilder() .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .maximumSize(maxSize) @@ -141,14 +145,19 @@ private XceiverClientSpi getClient(Pipeline pipeline) throws IOException { HddsProtos.ReplicationType type = pipeline.getType(); try { - return clientCache.get(pipeline.getId().getId().toString() + type, - new Callable() { - @Override + String key = pipeline.getId().getId().toString() + type; + // Append user short name to key to prevent a different user + // from using same instance of xceiverClient. + key = isSecurityEnabled ? + key + UserGroupInformation.getCurrentUser().getShortUserName() : key; + return clientCache.get(key, new Callable() { + @Override public XceiverClientSpi call() throws Exception { XceiverClientSpi client = null; switch (type) { case RATIS: client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); + client.connect(); break; case STAND_ALONE: client = new XceiverClientGrpc(pipeline, conf); @@ -157,7 +166,6 @@ public XceiverClientSpi call() throws Exception { default: throw new IOException("not implemented" + pipeline.getType()); } - client.connect(); return client; } }); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index f8d02deb26..1980b474ad 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -150,6 +150,12 @@ public void connect() throws Exception { } } + @Override + public void connect(String encodedToken) throws Exception { + throw new UnsupportedOperationException("Block tokens are not " + + "implemented for Ratis clients."); + } + @Override public void close() { final RaftClient c = client.getAndSet(null); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 85b5d29f32..1689e07988 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -115,7 +115,7 @@ public ContainerWithPipeline createContainer(String owner) public void createContainer(XceiverClientSpi client, long containerId) throws IOException { String traceID = UUID.randomUUID().toString(); - ContainerProtocolCalls.createContainer(client, containerId, traceID); + ContainerProtocolCalls.createContainer(client, containerId, traceID, null); // Let us log this info after we let SCM know that we have completed the // creation state. @@ -260,7 +260,7 @@ public void deleteContainer(long containerId, Pipeline pipeline, client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); ContainerProtocolCalls - .deleteContainer(client, containerId, force, traceID); + .deleteContainer(client, containerId, force, traceID, null); storageContainerLocationClient .deleteContainer(containerId); if (LOG.isDebugEnabled()) { @@ -310,7 +310,8 @@ public ContainerDataProto readContainer(long containerID, client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); ReadContainerResponseProto response = - ContainerProtocolCalls.readContainer(client, containerID, traceID); + ContainerProtocolCalls.readContainer(client, containerID, traceID, + null); if (LOG.isDebugEnabled()) { LOG.debug("Read container {}, machines: {} ", containerID, pipeline.getNodes()); @@ -401,7 +402,8 @@ public void closeContainer(long containerId, Pipeline pipeline) ObjectStageChangeRequestProto.Op.close, ObjectStageChangeRequestProto.Stage.begin); - ContainerProtocolCalls.closeContainer(client, containerId, traceID); + ContainerProtocolCalls.closeContainer(client, containerId, traceID, + null); // Notify SCM to close the container storageContainerLocationClient.notifyObjectStageChange( ObjectStageChangeRequestProto.Type.container, diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index ba84f44096..895f627bb2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -111,6 +111,10 @@ public final class HddsConfigKeys { public static final String HDDS_PUBLIC_KEY_FILE_NAME = "hdds.public.key.file" + ".name"; public static final String HDDS_PUBLIC_KEY_FILE_NAME_DEFAULT = "public.pem"; + + public static final String HDDS_BLOCK_TOKEN_EXPIRY_TIME = + "hdds.block.token.expiry.time"; + public static final String HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT = "1d"; /** * Maximum duration of certificates issued by SCM including Self-Signed Roots. * The formats accepted are based on the ISO-8601 duration format PnDTnHnMn.nS diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java index a863437891..a49f8ae79f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.hdds.client; -import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -75,10 +74,9 @@ public void setContainerBlockID(ContainerBlockID containerBlockID) { @Override public String toString() { - return new ToStringBuilder(this) - .append("containerID", containerBlockID.getContainerID()) - .append("localID", containerBlockID.getLocalID()) - .append("blockCommitSequenceId", blockCommitSequenceId) + return new StringBuffer().append(getContainerBlockID().toString()) + .append(" bcId: ") + .append(blockCommitSequenceId) .toString(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java index 82084f22a9..1e30cc351f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.hdds.client; -import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.util.Objects; @@ -43,10 +42,11 @@ public long getLocalID() { @Override public String toString() { - return new ToStringBuilder(this). - append("containerID", containerID). - append("localID", localID). - toString(); + return new StringBuffer() + .append("conID: ") + .append(containerID) + .append(" locID: ") + .append(localID).toString(); } public HddsProtos.ContainerBlockID getProtobuf() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 87cda56b41..9da74afefb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -78,6 +78,12 @@ public int getRefcount() { */ public abstract void connect() throws Exception; + /** + * Connects to the leader in the pipeline using encoded token. To be used + * in a secure cluster. + */ + public abstract void connect(String encodedToken) throws Exception; + @Override public abstract void close(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 977a784c77..114b6e6275 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -21,8 +21,13 @@ import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers @@ -97,14 +102,19 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, .setBlockID(datanodeBlockID); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.GetBlock) .setContainerID(datanodeBlockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setGetBlock(readBlockRequest) - .build(); + .setGetBlock(readBlockRequest); + String encodedToken = getEncodedBlockToken(getService(datanodeBlockID)); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); @@ -129,13 +139,19 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder(). setBlockID(blockID.getDatanodeBlockIDProtobuf()); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.GetCommittedBlockLength) .setContainerID(blockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setGetCommittedBlockLength(getBlockLengthRequestBuilder).build(); + .setGetCommittedBlockLength(getBlockLengthRequestBuilder); + String encodedToken = getEncodedBlockToken(new Text(blockID. + getContainerBlockID().toString())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); return response.getGetCommittedBlockLength(); @@ -156,11 +172,17 @@ public static ContainerProtos.PutBlockResponseProto putBlock( PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) .setContainerID(containerBlockData.getBlockID().getContainerID()) .setTraceID(traceID).setDatanodeUuid(id) - .setPutBlock(createBlockRequest).build(); + .setPutBlock(createBlockRequest); + String encodedToken = + getEncodedBlockToken(getService(containerBlockData.getBlockID())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); return response.getPutBlock(); @@ -184,11 +206,18 @@ public static XceiverClientAsyncReply putBlockAsync( PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) .setContainerID(containerBlockData.getBlockID().getContainerID()) .setTraceID(traceID).setDatanodeUuid(id) - .setPutBlock(createBlockRequest).build(); + .setPutBlock(createBlockRequest); + String encodedToken = + getEncodedBlockToken(getService(containerBlockData.getBlockID())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); + xceiverClient.sendCommand(request); return xceiverClient.sendCommandAsync(request); } @@ -203,20 +232,25 @@ public static XceiverClientAsyncReply putBlockAsync( * @throws IOException if there is an I/O error while performing the call */ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, BlockID blockID, String traceID) throws IOException { + ChunkInfo chunk, BlockID blockID, String traceID) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.ReadChunk) .setContainerID(blockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setReadChunk(readChunkRequest) - .build(); + .setReadChunk(readChunkRequest); + String encodedToken = getEncodedBlockToken(new Text(blockID. + getContainerBlockID().toString())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); return response.getReadChunk(); @@ -241,14 +275,19 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, .setChunkData(chunk) .setData(data); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.WriteChunk) .setContainerID(blockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setWriteChunk(writeChunkRequest) - .build(); + .setWriteChunk(writeChunkRequest); + String encodedToken = getEncodedBlockToken(new Text(blockID. + getContainerBlockID().toString())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); } @@ -272,10 +311,16 @@ public static XceiverClientAsyncReply writeChunkAsync( .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk).setData(data); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk) .setContainerID(blockID.getContainerID()).setTraceID(traceID) - .setDatanodeUuid(id).setWriteChunk(writeChunkRequest).build(); + .setDatanodeUuid(id).setWriteChunk(writeChunkRequest); + String encodedToken = getEncodedBlockToken(new Text(blockID. + getContainerBlockID().toString())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); return xceiverClient.sendCommandAsync(request); } @@ -293,8 +338,8 @@ public static XceiverClientAsyncReply writeChunkAsync( * @throws IOException */ public static PutSmallFileResponseProto writeSmallFile( - XceiverClientSpi client, BlockID blockID, byte[] data, String traceID) - throws IOException { + XceiverClientSpi client, BlockID blockID, byte[] data, + String traceID) throws IOException { BlockData containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -323,14 +368,19 @@ public static PutSmallFileResponseProto writeSmallFile( .build(); String id = client.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.PutSmallFile) .setContainerID(blockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setPutSmallFile(putSmallFileRequest) - .build(); + .setPutSmallFile(putSmallFileRequest); + String encodedToken = getEncodedBlockToken(new Text(blockID. + getContainerBlockID().toString())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = client.sendCommand(request); validateContainerResponse(response); return response.getPutSmallFile(); @@ -341,10 +391,11 @@ public static PutSmallFileResponseProto writeSmallFile( * @param client - client * @param containerID - ID of container * @param traceID - traceID + * @param encodedToken - encodedToken if security is enabled * @throws IOException */ public static void createContainer(XceiverClientSpi client, long containerID, - String traceID) throws IOException { + String traceID, String encodedToken) throws IOException { ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto .newBuilder(); @@ -354,6 +405,9 @@ public static void createContainer(XceiverClientSpi client, long containerID, String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.newBuilder(); + if (encodedToken != null) { + request.setEncodedToken(encodedToken); + } request.setCmdType(ContainerProtos.Type.CreateContainer); request.setContainerID(containerID); request.setCreateContainer(createRequest.build()); @@ -370,10 +424,11 @@ public static void createContainer(XceiverClientSpi client, long containerID, * @param client * @param force whether or not to forcibly delete the container. * @param traceID + * @param encodedToken - encodedToken if security is enabled * @throws IOException */ public static void deleteContainer(XceiverClientSpi client, long containerID, - boolean force, String traceID) throws IOException { + boolean force, String traceID, String encodedToken) throws IOException { ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest = ContainerProtos.DeleteContainerRequestProto.newBuilder(); deleteRequest.setForceDelete(force); @@ -386,6 +441,9 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, request.setDeleteContainer(deleteRequest); request.setTraceID(traceID); request.setDatanodeUuid(id); + if(encodedToken != null) { + request.setEncodedToken(encodedToken); + } ContainerCommandResponseProto response = client.sendCommand(request.build()); validateContainerResponse(response); @@ -397,10 +455,12 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, * @param client * @param containerID * @param traceID + * @param encodedToken - encodedToken if security is enabled * @throws IOException */ public static void closeContainer(XceiverClientSpi client, - long containerID, String traceID) throws IOException { + long containerID, String traceID, String encodedToken) + throws IOException { String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = @@ -410,6 +470,9 @@ public static void closeContainer(XceiverClientSpi client, request.setCloseContainer(CloseContainerRequestProto.getDefaultInstance()); request.setTraceID(traceID); request.setDatanodeUuid(id); + if(encodedToken != null) { + request.setEncodedToken(encodedToken); + } ContainerCommandResponseProto response = client.sendCommand(request.build()); validateContainerResponse(response); @@ -418,13 +481,14 @@ public static void closeContainer(XceiverClientSpi client, /** * readContainer call that gets meta data from an existing container. * - * @param client - client - * @param traceID - trace ID + * @param client - client + * @param traceID - trace ID + * @param encodedToken - encodedToken if security is enabled * @throws IOException */ public static ReadContainerResponseProto readContainer( XceiverClientSpi client, long containerID, - String traceID) throws IOException { + String traceID, String encodedToken) throws IOException { String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = @@ -434,6 +498,9 @@ public static ReadContainerResponseProto readContainer( request.setReadContainer(ReadContainerRequestProto.getDefaultInstance()); request.setDatanodeUuid(id); request.setTraceID(traceID); + if(encodedToken != null) { + request.setEncodedToken(encodedToken); + } ContainerCommandResponseProto response = client.sendCommand(request.build()); validateContainerResponse(response); @@ -461,14 +528,19 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, .build(); String id = client.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.GetSmallFile) .setContainerID(blockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setGetSmallFile(getSmallFileRequest) - .build(); + .setGetSmallFile(getSmallFileRequest); + String encodedToken = getEncodedBlockToken(new Text(blockID. + getContainerBlockID().toString())); + if (encodedToken != null) { + builder.setEncodedToken(encodedToken); + } + ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = client.sendCommand(request); validateContainerResponse(response); @@ -494,4 +566,30 @@ public static void validateContainerResponse( throw new StorageContainerException( response.getMessage(), response.getResult()); } + + /** + * Returns a url encoded block token. Service param should match the service + * field of token. + * @param service + * + * */ + private static String getEncodedBlockToken(Text service) + throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Token token = + OzoneBlockTokenSelector.selectBlockToken(service, ugi.getTokens()); + if (token != null) { + return token.encodeToUrlString(); + } + return null; + } + + private static Text getService(DatanodeBlockID blockId) { + return new Text(new StringBuffer() + .append("conID: ") + .append(blockId.getContainerID()) + .append(" locID: ") + .append(blockId.getLocalID()) + .toString()); + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java index 8c0d19ec78..9867879588 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java @@ -39,6 +39,7 @@ public class BlockTokenVerifier implements TokenVerifier { private final CertificateClient caClient; private final SecurityConfig conf; + private static boolean testStub = false; public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) { this.conf = conf; @@ -53,7 +54,7 @@ private boolean isExpired(long expiryDate) { public UserGroupInformation verify(String user, String tokenStr) throws SCMSecurityException { if (conf.isGrpcBlockTokenEnabled()) { - if (Strings.isNullOrEmpty(tokenStr)) { + if (Strings.isNullOrEmpty(tokenStr) || isTestStub()) { throw new BlockTokenException("Fail to find any token (empty or " + "null."); } @@ -110,4 +111,13 @@ public UserGroupInformation verify(String user, String tokenStr) return UserGroupInformation.createRemoteUser(user); } } + + public static boolean isTestStub() { + return testStub; + } + + // For testing purpose only. + public static void setTestStub(boolean isTestStub) { + BlockTokenVerifier.testStub = isTestStub; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java index 1943cce9de..83797c3c96 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java @@ -45,7 +45,27 @@ public Token selectToken(Text service, return null; } for (Token token : tokens) { - if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind())) { + if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind()) + && token.getService().equals(service)) { + LOG.trace("Getting token for service:{}", service); + return (Token) token; + } + } + return null; + } + + /** + * Static method to avoid instantiation. + * */ + @SuppressWarnings("unchecked") + public static Token selectBlockToken(Text service, + Collection> tokens) { + if (service == null) { + return null; + } + for (Token token : tokens) { + if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind()) + && token.getService().equals(service)) { LOG.trace("Getting token for service:{}", service); return (Token) token; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java index c0ae070c4b..cb7d87d214 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java @@ -72,6 +72,9 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_SIGNATURE_ALGO; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_SIGNATURE_ALGO_DEFAULT; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; /** * A class that deals with all Security related configs in HDDS. @@ -101,6 +104,7 @@ public class SecurityConfig { private String trustStoreFileName; private String serverCertChainFileName; private String clientCertChainFileName; + private final boolean isSecurityEnabled; /** * Constructs a SecurityConfig. @@ -120,8 +124,8 @@ public SecurityConfig(Configuration configuration) { // HDDS metadata dir and if that is not set, we will use Ozone directory. // TODO: We might want to fix this later. this.metadatDir = this.configuration.get(HDDS_METADATA_DIR_NAME, - configuration.get(OZONE_METADATA_DIRS)); - + configuration.get(OZONE_METADATA_DIRS, + configuration.get(HDDS_DATANODE_DIR_KEY))); Preconditions.checkNotNull(this.metadatDir, "Metadata directory can't be" + " null. Please check configs."); this.keyDir = this.configuration.get(HDDS_KEY_DIR_NAME, @@ -164,6 +168,10 @@ public SecurityConfig(Configuration configuration) { HDDS_GRPC_TLS_TEST_CERT, HDDS_GRPC_TLS_TEST_CERT_DEFAULT); } + this.isSecurityEnabled = this.configuration.getBoolean( + OZONE_SECURITY_ENABLED_KEY, + OZONE_SECURITY_ENABLED_DEFAULT); + // First Startup -- if the provider is null, check for the provider. if (SecurityConfig.provider == null) { synchronized (SecurityConfig.class) { @@ -177,6 +185,16 @@ public SecurityConfig(Configuration configuration) { } } + /** + * Returns true if security is enabled for OzoneCluster. This is determined + * by value of OZONE_SECURITY_ENABLED_KEY. + * + * @return true if security is enabled for OzoneCluster. + */ + public boolean isSecurityEnabled() { + return isSecurityEnabled; + } + /** * Returns the Standard Certificate file name. * diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 661d910f1c..3e4f64de43 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -194,6 +194,7 @@ message ContainerCommandRequestProto { optional PutSmallFileRequestProto putSmallFile = 20; optional GetSmallFileRequestProto getSmallFile = 21; optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 22; + optional string encodedToken = 23; } message ContainerCommandResponseProto { diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 6975843c42..6384d1fbab 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1139,7 +1139,7 @@ ozone.tags.system OZONE,MANAGEMENT,SECURITY,PERFORMANCE,DEBUG,CLIENT,SERVER,OM,SCM, - CRITICAL,RATIS,CONTAINER,REQUIRED,REST,STORAGE,PIPELINE,STANDALONE,S3GATEWAY,ACL + CRITICAL,RATIS,CONTAINER,REQUIRED,REST,STORAGE,PIPELINE,STANDALONE,S3GATEWAY,ACL,TOKEN @@ -1645,6 +1645,18 @@ Directory to store public/private key for SCM CA. This is relative to ozone/hdds meteadata dir. + + hdds.block.token.expiry.time + 1d + OZONE, HDDS, SECURITY, TOKEN + + Default value for expiry time of block token. This + setting supports multiple time unit suffixes as described in + dfs.heartbeat.interval. If no suffix is specified, then milliseconds is + assumed. + + + hdds.metadata.dir diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java new file mode 100644 index 0000000000..6dcdad9c10 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Helper class used inside {@link BlockOutputStream}. + * */ +public class BlockOutputStreamEntry extends OutputStream { + + private OutputStream outputStream; + private BlockID blockID; + private final String key; + private final XceiverClientManager xceiverClientManager; + private final XceiverClientSpi xceiverClient; + private final Checksum checksum; + private final String requestId; + private final int chunkSize; + // total number of bytes that should be written to this stream + private final long length; + // the current position of this stream 0 <= currentPosition < length + private long currentPosition; + private Token token; + + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private List bufferList; + + private BlockOutputStreamEntry(BlockID blockID, String key, + XceiverClientManager xceiverClientManager, + XceiverClientSpi xceiverClient, String requestId, int chunkSize, + long length, long streamBufferFlushSize, long streamBufferMaxSize, + long watchTimeout, List bufferList, Checksum checksum, + Token token) { + this.outputStream = null; + this.blockID = blockID; + this.key = key; + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.requestId = requestId; + this.chunkSize = chunkSize; + this.token = token; + this.length = length; + this.currentPosition = 0; + this.streamBufferFlushSize = streamBufferFlushSize; + this.streamBufferMaxSize = streamBufferMaxSize; + this.watchTimeout = watchTimeout; + this.bufferList = bufferList; + this.checksum = checksum; + } + + /** + * For testing purpose, taking a some random created stream instance. + * + * @param outputStream a existing writable output stream + * @param length the length of data to write to the stream + */ + BlockOutputStreamEntry(OutputStream outputStream, long length, + Checksum checksum) { + this.outputStream = outputStream; + this.blockID = null; + this.key = null; + this.xceiverClientManager = null; + this.xceiverClient = null; + this.requestId = null; + this.chunkSize = -1; + this.token = null; + this.length = length; + this.currentPosition = 0; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + bufferList = null; + watchTimeout = 0; + this.checksum = checksum; + } + + long getLength() { + return length; + } + + Token getToken() { + return token; + } + + long getRemaining() { + return length - currentPosition; + } + + private void checkStream() throws IOException { + if (this.outputStream == null) { + if (getToken() != null) { + UserGroupInformation.getCurrentUser().addToken(getToken()); + } + this.outputStream = + new BlockOutputStream(blockID, key, xceiverClientManager, + xceiverClient, requestId, chunkSize, streamBufferFlushSize, + streamBufferMaxSize, watchTimeout, bufferList, checksum); + } + } + + @Override + public void write(int b) throws IOException { + checkStream(); + outputStream.write(b); + this.currentPosition += 1; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkStream(); + outputStream.write(b, off, len); + this.currentPosition += len; + } + + @Override + public void flush() throws IOException { + if (this.outputStream != null) { + this.outputStream.flush(); + } + } + + @Override + public void close() throws IOException { + if (this.outputStream != null) { + this.outputStream.close(); + // after closing the chunkOutPutStream, blockId would have been + // reconstructed with updated bcsId + if (this.outputStream instanceof BlockOutputStream) { + this.blockID = ((BlockOutputStream) outputStream).getBlockID(); + } + } + } + + long getTotalSuccessfulFlushedData() throws IOException { + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; + blockID = out.getBlockID(); + return out.getTotalSuccessfulFlushedData(); + } else if (outputStream == null) { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + throw new IOException("Invalid Output Stream for Key: " + key); + } + + long getWrittenDataLength() throws IOException { + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; + return out.getWrittenDataLength(); + } else if (outputStream == null) { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + throw new IOException("Invalid Output Stream for Key: " + key); + } + + void cleanup() throws IOException{ + checkStream(); + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; + out.cleanup(); + } + } + + void writeOnRetry(long len) throws IOException { + checkStream(); + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; + out.writeOnRetry(len); + this.currentPosition += len; + } else { + throw new IOException("Invalid Output Stream for Key: " + key); + } + } + + /** + * Builder class for ChunkGroupOutputStreamEntry. + * */ + public static class Builder { + + private BlockID blockID; + private String key; + private XceiverClientManager xceiverClientManager; + private XceiverClientSpi xceiverClient; + private String requestId; + private int chunkSize; + private long length; + private long streamBufferFlushSize; + private long streamBufferMaxSize; + private long watchTimeout; + private List bufferList; + private Token token; + private Checksum checksum; + + public Builder setChecksum(Checksum cs) { + this.checksum = cs; + return this; + } + + public Builder setBlockID(BlockID bID) { + this.blockID = bID; + return this; + } + + public Builder setKey(String keys) { + this.key = keys; + return this; + } + + public Builder setXceiverClientManager(XceiverClientManager + xClientManager) { + this.xceiverClientManager = xClientManager; + return this; + } + + public Builder setXceiverClient(XceiverClientSpi client) { + this.xceiverClient = client; + return this; + } + + public Builder setRequestId(String request) { + this.requestId = request; + return this; + } + + public Builder setChunkSize(int cSize) { + this.chunkSize = cSize; + return this; + } + + public Builder setLength(long len) { + this.length = len; + return this; + } + + public Builder setStreamBufferFlushSize(long bufferFlushSize) { + this.streamBufferFlushSize = bufferFlushSize; + return this; + } + + public Builder setStreamBufferMaxSize(long bufferMaxSize) { + this.streamBufferMaxSize = bufferMaxSize; + return this; + } + + public Builder setWatchTimeout(long timeout) { + this.watchTimeout = timeout; + return this; + } + + public Builder setBufferList(List bufferList) { + this.bufferList = bufferList; + return this; + } + + public Builder setToken(Token bToken) { + this.token = bToken; + return this; + } + + public BlockOutputStreamEntry build() { + return new BlockOutputStreamEntry(blockID, key, + xceiverClientManager, xceiverClient, requestId, chunkSize, + length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout, + bufferList, checksum, token); + } + } + + public OutputStream getOutputStream() { + return outputStream; + } + + public BlockID getBlockID() { + return blockID; + } + + public String getKey() { + return key; + } + + public XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + public XceiverClientSpi getXceiverClient() { + return xceiverClient; + } + + public Checksum getChecksum() { + return checksum; + } + + public String getRequestId() { + return requestId; + } + + public int getChunkSize() { + return chunkSize; + } + + public long getCurrentPosition() { + return currentPosition; + } + + public long getStreamBufferFlushSize() { + return streamBufferFlushSize; + } + + public long getStreamBufferMaxSize() { + return streamBufferMaxSize; + } + + public long getWatchTimeout() { + return watchTimeout; + } + + public List getBufferList() { + return bufferList; + } + + public void setCurrentPosition(long curPosition) { + this.currentPosition = curPosition; + } +} + + diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 99817fbbbc..72b7cca065 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -296,6 +297,10 @@ public static LengthInputStream getFromOmKeyInfo( groupInputStream.streamOffset[i] = length; ContainerProtos.DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); + if (omKeyLocationInfo.getToken() != null) { + UserGroupInformation.getCurrentUser(). + addToken(omKeyLocationInfo.getToken()); + } ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls .getBlock(xceiverClient, datanodeBlockID, requestId); List chunks = diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 66e419906d..1874017810 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -36,6 +36,7 @@ .StorageContainerException; import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,12 +134,13 @@ public List getLocationInfoList() throws IOException { List locationInfoList = new ArrayList<>(); for (BlockOutputStreamEntry streamEntry : streamEntries) { OmKeyLocationInfo info = - new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) - .setLength(streamEntry.currentPosition).setOffset(0) + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) + .setLength(streamEntry.getCurrentPosition()).setOffset(0) + .setToken(streamEntry.getToken()) .build(); - LOG.debug("block written " + streamEntry.blockID + ", length " - + streamEntry.currentPosition + " bcsID " + streamEntry.blockID - .getBlockCommitSequenceId()); + LOG.debug("block written " + streamEntry.getBlockID() + ", length " + + streamEntry.getCurrentPosition() + " bcsID " + + streamEntry.getBlockID().getBlockCommitSequenceId()); locationInfoList.add(info); } return locationInfoList; @@ -213,15 +215,27 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) throws IOException { ContainerWithPipeline containerWithPipeline = scmClient .getContainerWithPipeline(subKeyInfo.getContainerID()); + UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); - streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(), - keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, bufferList, checksum)); + BlockOutputStreamEntry.Builder builder = + new BlockOutputStreamEntry.Builder() + .setBlockID(subKeyInfo.getBlockID()) + .setKey(keyArgs.getKeyName()) + .setXceiverClientManager(xceiverClientManager) + .setXceiverClient(xceiverClient) + .setRequestId(requestID) + .setChunkSize(chunkSize) + .setLength(subKeyInfo.getLength()) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setBufferList(bufferList) + .setChecksum(checksum) + .setToken(subKeyInfo.getToken()); + streamEntries.add(builder.build()); } - @Override public void write(int b) throws IOException { byte[] buf = new byte[1]; @@ -329,7 +343,7 @@ private void discardPreallocatedBlocks(long containerID) { ListIterator streamEntryIterator = streamEntries.listIterator(currentStreamIndex); while (streamEntryIterator.hasNext()) { - if (streamEntryIterator.next().blockID.getContainerID() + if (streamEntryIterator.next().getBlockID().getContainerID() == containerID) { streamEntryIterator.remove(); } @@ -348,7 +362,7 @@ private void removeEmptyBlocks() { ListIterator streamEntryIterator = streamEntries.listIterator(currentStreamIndex); while (streamEntryIterator.hasNext()) { - if (streamEntryIterator.next().currentPosition == 0) { + if (streamEntryIterator.next().getCurrentPosition() == 0) { streamEntryIterator.remove(); } } @@ -369,7 +383,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, long totalSuccessfulFlushedData = streamEntry.getTotalSuccessfulFlushedData(); //set the correct length for the current stream - streamEntry.currentPosition = totalSuccessfulFlushedData; + streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); // just clean up the current stream. streamEntry.cleanup(); @@ -385,7 +399,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, } // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - discardPreallocatedBlocks(streamEntry.blockID.getContainerID()); + discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID()); } private boolean checkIfContainerIsClosed(IOException ioe) { @@ -423,7 +437,7 @@ private boolean checkIfTimeoutException(IOException ioe) { } private long getKeyLength() { - return streamEntries.stream().mapToLong(e -> e.currentPosition) + return streamEntries.parallelStream().mapToLong(e -> e.getCurrentPosition()) .sum(); } @@ -638,169 +652,6 @@ public KeyOutputStream build() throws IOException { } } - private static class BlockOutputStreamEntry extends OutputStream { - private OutputStream outputStream; - private BlockID blockID; - private final String key; - private final XceiverClientManager xceiverClientManager; - private final XceiverClientSpi xceiverClient; - private final Checksum checksum; - private final String requestId; - private final int chunkSize; - // total number of bytes that should be written to this stream - private final long length; - // the current position of this stream 0 <= currentPosition < length - private long currentPosition; - - private final long streamBufferFlushSize; - private final long streamBufferMaxSize; - private final long watchTimeout; - private List bufferList; - - @SuppressWarnings("parameternumber") - BlockOutputStreamEntry(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length, long streamBufferFlushSize, long streamBufferMaxSize, - long watchTimeout, List bufferList, Checksum checksum) { - this.outputStream = null; - this.blockID = blockID; - this.key = key; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.requestId = requestId; - this.chunkSize = chunkSize; - - this.length = length; - this.currentPosition = 0; - this.streamBufferFlushSize = streamBufferFlushSize; - this.streamBufferMaxSize = streamBufferMaxSize; - this.watchTimeout = watchTimeout; - this.checksum = checksum; - this.bufferList = bufferList; - } - - /** - * For testing purpose, taking a some random created stream instance. - * @param outputStream a existing writable output stream - * @param length the length of data to write to the stream - */ - BlockOutputStreamEntry(OutputStream outputStream, long length, - Checksum checksum) { - this.outputStream = outputStream; - this.blockID = null; - this.key = null; - this.xceiverClientManager = null; - this.xceiverClient = null; - this.requestId = null; - this.chunkSize = -1; - - this.length = length; - this.currentPosition = 0; - streamBufferFlushSize = 0; - streamBufferMaxSize = 0; - bufferList = null; - watchTimeout = 0; - this.checksum = checksum; - } - - long getLength() { - return length; - } - - long getRemaining() { - return length - currentPosition; - } - - private void checkStream() { - if (this.outputStream == null) { - this.outputStream = - new BlockOutputStream(blockID, key, xceiverClientManager, - xceiverClient, requestId, chunkSize, streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, bufferList, checksum); - } - } - - @Override - public void write(int b) throws IOException { - checkStream(); - outputStream.write(b); - this.currentPosition += 1; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - checkStream(); - outputStream.write(b, off, len); - this.currentPosition += len; - } - - @Override - public void flush() throws IOException { - if (this.outputStream != null) { - this.outputStream.flush(); - } - } - - @Override - public void close() throws IOException { - if (this.outputStream != null) { - this.outputStream.close(); - // after closing the chunkOutPutStream, blockId would have been - // reconstructed with updated bcsId - if (this.outputStream instanceof BlockOutputStream) { - this.blockID = ((BlockOutputStream) outputStream).getBlockID(); - } - } - } - - long getTotalSuccessfulFlushedData() throws IOException { - if (this.outputStream instanceof BlockOutputStream) { - BlockOutputStream out = (BlockOutputStream) this.outputStream; - blockID = out.getBlockID(); - return out.getTotalSuccessfulFlushedData(); - } else if (outputStream == null) { - // For a pre allocated block for which no write has been initiated, - // the OutputStream will be null here. - // In such cases, the default blockCommitSequenceId will be 0 - return 0; - } - throw new IOException("Invalid Output Stream for Key: " + key); - } - - long getWrittenDataLength() throws IOException { - if (this.outputStream instanceof BlockOutputStream) { - BlockOutputStream out = (BlockOutputStream) this.outputStream; - return out.getWrittenDataLength(); - } else if (outputStream == null) { - // For a pre allocated block for which no write has been initiated, - // the OutputStream will be null here. - // In such cases, the default blockCommitSequenceId will be 0 - return 0; - } - throw new IOException("Invalid Output Stream for Key: " + key); - } - - void cleanup() { - checkStream(); - if (this.outputStream instanceof BlockOutputStream) { - BlockOutputStream out = (BlockOutputStream) this.outputStream; - out.cleanup(); - } - } - - void writeOnRetry(long len) throws IOException { - checkStream(); - if (this.outputStream instanceof BlockOutputStream) { - BlockOutputStream out = (BlockOutputStream) this.outputStream; - out.writeOnRetry(len); - this.currentPosition += len; - } else { - throw new IOException("Invalid Output Stream for Key: " + key); - } - } - } - /** * Verify that the output stream is open. Non blocking; this gives * the last state of the volatile {@link #closed} field. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index cf61f3ce7b..e4eb0f5f0e 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -17,7 +17,9 @@ package org.apache.hadoop.ozone.om.helpers; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; +import org.apache.hadoop.security.token.Token; /** * One key can be too huge to fit in one container. In which case it gets split @@ -28,6 +30,8 @@ public final class OmKeyLocationInfo { // the id of this subkey in all the subkeys. private long length; private final long offset; + // Block token, required for client authentication when security is enabled. + private Token token; // the version number indicating when this block was added private long createVersion; @@ -37,6 +41,14 @@ private OmKeyLocationInfo(BlockID blockID, long length, long offset) { this.offset = offset; } + private OmKeyLocationInfo(BlockID blockID, long length, long offset, + Token token) { + this.blockID = blockID; + this.length = length; + this.offset = offset; + this.token = token; + } + public void setCreateVersion(long version) { createVersion = version; } @@ -73,6 +85,13 @@ public long getBlockCommitSequenceId() { return blockID.getBlockCommitSequenceId(); } + public Token getToken() { + return token; + } + + public void setToken(Token token) { + this.token = token; + } /** * Builder of OmKeyLocationInfo. */ @@ -80,6 +99,7 @@ public static class Builder { private BlockID blockID; private long length; private long offset; + private Token token; public Builder setBlockID(BlockID blockId) { this.blockID = blockId; @@ -96,18 +116,30 @@ public Builder setOffset(long off) { return this; } + public Builder setToken(Token bToken) { + this.token = bToken; + return this; + } + public OmKeyLocationInfo build() { - return new OmKeyLocationInfo(blockID, length, offset); + if (token == null) { + return new OmKeyLocationInfo(blockID, length, offset); + } else { + return new OmKeyLocationInfo(blockID, length, offset, token); + } } } public KeyLocation getProtobuf() { - return KeyLocation.newBuilder() + KeyLocation.Builder builder = KeyLocation.newBuilder() .setBlockID(blockID.getProtobuf()) .setLength(length) .setOffset(offset) - .setCreateVersion(createVersion) - .build(); + .setCreateVersion(createVersion); + if (this.token != null) { + builder.setToken(this.token.toTokenProto()); + } + return builder.build(); } public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { @@ -115,6 +147,9 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { BlockID.getFromProtobuf(keyLocation.getBlockID()), keyLocation.getLength(), keyLocation.getOffset()); + if(keyLocation.hasToken()) { + info.token = new Token<>(keyLocation.getToken()); + } info.setCreateVersion(keyLocation.getCreateVersion()); return info; } @@ -125,6 +160,7 @@ public String toString() { ", localID=" + blockID.getLocalID() + "}" + ", length=" + length + ", offset=" + offset + + ", token=" + token + ", createVersion=" + createVersion + '}'; } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java index 3b833cb140..611d2515bc 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java @@ -19,9 +19,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -55,7 +55,7 @@ public class OzoneBlockTokenSecretManager extends * @param blockTokenExpirytime token expiry time for expired tokens in * milliseconds */ - public OzoneBlockTokenSecretManager(OzoneConfiguration conf, + public OzoneBlockTokenSecretManager(SecurityConfig conf, long blockTokenExpirytime, String omCertSerialId) { super(conf, blockTokenExpirytime, blockTokenExpirytime, SERVICE, LOG); this.omCertSerialId = omCertSerialId; @@ -74,7 +74,8 @@ public OzoneBlockTokenIdentifier createIdentifier(String owner, } /** - * Generate an block token for specified user, blockId. + * Generate an block token for specified user, blockId. Service field for + * token is set to blockId. * * @param user * @param blockId @@ -92,8 +93,10 @@ public Token generateToken(String user, LOG.trace("Issued delegation token -> expiryTime:{},tokenId:{}", expiryTime, tokenId); } + // Pass blockId as service. return new Token<>(tokenIdentifier.getBytes(), - createPassword(tokenIdentifier), tokenIdentifier.getKind(), SERVICE); + createPassword(tokenIdentifier), tokenIdentifier.getKind(), + new Text(blockId)); } /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java index 1b9414b592..b13c5e7502 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.security.OzoneSecretStore.OzoneManagerSecretState; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier.TokenInfo; @@ -75,7 +76,8 @@ public class OzoneDelegationTokenSecretManager public OzoneDelegationTokenSecretManager(OzoneConfiguration conf, long tokenMaxLifetime, long tokenRenewInterval, long dtRemoverScanInterval, Text service) throws IOException { - super(conf, tokenMaxLifetime, tokenRenewInterval, service, LOG); + super(new SecurityConfig(conf), tokenMaxLifetime, tokenRenewInterval, + service, LOG); currentTokens = new ConcurrentHashMap(); this.tokenRemoverScanInterval = dtRemoverScanInterval; this.store = new OzoneSecretStore(conf); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java index 5f909bbdb7..12be4c9ec5 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessControlException; @@ -66,16 +65,16 @@ public abstract class OzoneSecretManager /** * Create a secret manager. * - * @param conf configuration. + * @param secureConf configuration. * @param tokenMaxLifetime the maximum lifetime of the delegation tokens in * milliseconds * @param tokenRenewInterval how often the tokens must be renewed in * milliseconds * @param service name of service */ - public OzoneSecretManager(OzoneConfiguration conf, long tokenMaxLifetime, + public OzoneSecretManager(SecurityConfig secureConf, long tokenMaxLifetime, long tokenRenewInterval, Text service, Logger logger) { - this.securityConfig = new SecurityConfig(conf); + this.securityConfig = secureConf; this.tokenMaxLifetime = tokenMaxLifetime; this.tokenRenewInterval = tokenRenewInterval; currentKeyId = new AtomicInteger(); diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 80462c83bd..7a8d5237b2 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -411,6 +411,7 @@ message KeyLocation { required uint64 length = 4; // indicated at which version this block gets created. optional uint64 createVersion = 5; + optional hadoop.common.TokenProto token = 6; } message KeyLocationList { diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java index 469226693e..fca67e1821 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; @@ -64,7 +65,7 @@ public void setUp() throws Exception { x509Certificate = KeyStoreTestUtil .generateCertificate("CN=OzoneMaster", keyPair, 30, "SHA256withRSA"); omCertSerialId = x509Certificate.getSerialNumber().toString(); - secretManager = new OzoneBlockTokenSecretManager(conf, + secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf), expiryTime, omCertSerialId); secretManager.start(keyPair); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index a38787009f..b348b50c4d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.hdds.scm.protocolPB @@ -240,6 +241,7 @@ abstract class Builder { protected int numOfScmHandlers = 20; protected int numOfDatanodes = 1; protected boolean startDataNodes = true; + protected CertificateClient certClient; protected Builder(OzoneConfiguration conf) { this.conf = conf; @@ -265,6 +267,18 @@ public Builder setStartDataNodes(boolean nodes) { return this; } + /** + * Sets the certificate client. + * + * @param client + * + * @return MiniOzoneCluster.Builder + */ + public Builder setCertificateClient(CertificateClient client) { + this.certClient = client; + return this; + } + /** * Sets the SCM id. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index cc9459b13d..78f4e88c37 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -378,6 +378,7 @@ public MiniOzoneCluster build() throws IOException { scm = createSCM(); scm.start(); om = createOM(); + om.setCertClient(certClient); } catch (AuthenticationException ex) { throw new IOException("Unable to build MiniOzoneCluster. ", ex); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java index 78a85110d3..98ff8dcc7d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java @@ -89,7 +89,8 @@ public void testContainerStateMachineIdempotency() throws Exception { XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); try { //create the container - ContainerProtocolCalls.createContainer(client, containerID, traceID); + ContainerProtocolCalls.createContainer(client, containerID, traceID, + null); // call create Container again BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); byte[] data = @@ -111,8 +112,10 @@ public void testContainerStateMachineIdempotency() throws Exception { client.sendCommand(putKeyRequest); // close container call - ContainerProtocolCalls.closeContainer(client, containerID, traceID); - ContainerProtocolCalls.closeContainer(client, containerID, traceID); + ContainerProtocolCalls.closeContainer(client, containerID, traceID, + null); + ContainerProtocolCalls.closeContainer(client, containerID, traceID, + null); } catch (IOException ioe) { Assert.fail("Container operation failed" + ioe); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java new file mode 100644 index 0000000000..ceb02003fc --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.client; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; +import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest; +import org.apache.hadoop.hdds.security.x509.certificates.utils.SelfSignedCertificate; +import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException; +import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; + +import java.io.InputStream; +import java.security.KeyPair; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.cert.CertStore; +import java.security.cert.X509Certificate; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; +import java.util.List; + +/** + * Test implementation for CertificateClient. To be used only for test + * purposes. + */ + +public class CertificateClientTestImpl implements CertificateClient { + + private final SecurityConfig securityConfig; + private final KeyPair keyPair; + private final X509Certificate cert; + + public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception{ + securityConfig = new SecurityConfig(conf); + HDDSKeyGenerator keyGen = + new HDDSKeyGenerator(securityConfig.getConfiguration()); + keyPair = keyGen.generateKey(); + + SelfSignedCertificate.Builder builder = + SelfSignedCertificate.newBuilder() + .setBeginDate(LocalDate.now()) + .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS)) + .setClusterID("cluster1") + .setKey(keyPair) + .setSubject("TestCertSub") + .setConfiguration(conf) + .setScmID("TestScmId1") + .makeCA(); + + X509CertificateHolder certificateHolder = builder.build(); + cert = new JcaX509CertificateConverter().getCertificate(certificateHolder); + } + + @Override + public PrivateKey getPrivateKey(String component) { + return keyPair.getPrivate(); + } + + @Override + public PublicKey getPublicKey(String component) { + return keyPair.getPublic(); + } + + @Override + public X509Certificate getCertificate(String component) { + return cert; + } + + @Override + public boolean verifyCertificate(X509Certificate certificate) { + return true; + } + + @Override + public byte[] signDataStream(InputStream stream, String component) + throws CertificateException { + return new byte[0]; + } + + @Override + public boolean verifySignature(InputStream stream, byte[] signature, + X509Certificate x509Certificate) { + return true; + } + + @Override + public boolean verifySignature(byte[] data, byte[] signature, + X509Certificate x509Certificate) { + return true; + } + + @Override + public CertificateSignRequest.Builder getCSRBuilder() { + return null; + } + + @Override + public X509Certificate queryCertificate(String query) { + return null; + } + + @Override + public void storePrivateKey(PrivateKey key, String component) + throws CertificateException { + + } + + @Override + public void storePublicKey(PublicKey key, String component) + throws CertificateException { + + } + + @Override + public void storeCertificate(X509Certificate certificate, String component) + throws CertificateException { + + } + + @Override + public void storeTrustChain(CertStore certStore, String component) + throws CertificateException { + + } + + @Override + public void storeTrustChain(List certificates, + String component) throws CertificateException { + + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 9d652b461a..7e0a6703c9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -25,6 +25,7 @@ import java.io.IOException; + /** * This class is to test all the public facing APIs of Ozone Client. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index e7bca5e78d..2a80367b12 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -109,7 +109,7 @@ public abstract class TestOzoneRpcClientAbstract { private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; - private static final String SCM_ID = UUID.randomUUID().toString(); + private static String SCM_ID = UUID.randomUUID().toString(); /** * Create a MiniOzoneCluster for testing. @@ -146,6 +146,33 @@ static void shutdownCluster() throws IOException { } } + public static void setCluster(MiniOzoneCluster cluster) { + TestOzoneRpcClientAbstract.cluster = cluster; + } + + public static void setOzClient(OzoneClient ozClient) { + TestOzoneRpcClientAbstract.ozClient = ozClient; + } + + public static void setOzoneManager(OzoneManager ozoneManager){ + TestOzoneRpcClientAbstract.ozoneManager = ozoneManager; + } + + public static void setStorageContainerLocationClient( + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient) { + TestOzoneRpcClientAbstract.storageContainerLocationClient = + storageContainerLocationClient; + } + + public static void setStore(ObjectStore store) { + TestOzoneRpcClientAbstract.store = store; + } + + public static void setScmId(String scmId){ + TestOzoneRpcClientAbstract.SCM_ID = scmId; + } + @Test public void testSetVolumeQuota() throws IOException, OzoneException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java new file mode 100644 index 0000000000..16df75df87 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.CertificateClientTestImpl; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + +/** + * This class is to test all the public facing APIs of Ozone Client. + */ +public class TestSecureOzoneRpcClient extends TestOzoneRpcClient { + + private static MiniOzoneCluster cluster = null; + private static OzoneClient ozClient = null; + private static ObjectStore store = null; + private static OzoneManager ozoneManager; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + + private static final String SCM_ID = UUID.randomUUID().toString(); + private static File testDir; + private static OzoneConfiguration conf; + + /** + * Create a MiniOzoneCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + testDir = GenericTestUtils.getTestDir( + TestSecureOzoneRpcClient.class.getSimpleName()); + OzoneManager.setTestSecureOmFlag(true); + conf = new OzoneConfiguration(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1); + conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED, true); + conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + CertificateClientTestImpl certificateClientTest = + new CertificateClientTestImpl(conf); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(10) + .setScmId(SCM_ID) + .setCertificateClient(certificateClientTest) + .build(); + cluster.getOzoneManager().startSecretManager(); + cluster.waitForClusterToBeReady(); + ozClient = OzoneClientFactory.getRpcClient(conf); + store = ozClient.getObjectStore(); + storageContainerLocationClient = + cluster.getStorageContainerLocationClient(); + ozoneManager = cluster.getOzoneManager(); + TestOzoneRpcClient.setCluster(cluster); + TestOzoneRpcClient.setOzClient(ozClient); + TestOzoneRpcClient.setOzoneManager(ozoneManager); + TestOzoneRpcClient.setStorageContainerLocationClient( + storageContainerLocationClient); + TestOzoneRpcClient.setStore(store); + TestOzoneRpcClient.setScmId(SCM_ID); + } + + /** + * Tests successful completion of following operations when grpc block + * token is used. + * 1. getKey + * 2. writeChunk + * */ + @Test + public void testPutKeySuccessWithBlockToken() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + long currentTime = Time.now(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < 10; i++) { + String keyName = UUID.randomUUID().toString(); + + try (OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE)) { + out.write(value.getBytes()); + } + + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + byte[] fileContent; + try(OzoneInputStream is = bucket.readKey(keyName)) { + fileContent = new byte[value.getBytes().length]; + is.read(fileContent); + } + + Assert.assertTrue(verifyRatisReplication(volumeName, bucketName, + keyName, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE)); + Assert.assertEquals(value, new String(fileContent)); + Assert.assertTrue(key.getCreationTime() >= currentTime); + Assert.assertTrue(key.getModificationTime() >= currentTime); + } + } + + /** + * Tests failure in following operations when grpc block token is + * not present. + * 1. getKey + * 2. writeChunk + * */ + @Test + public void testKeyOpFailureWithoutBlockToken() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String value = "sample value"; + BlockTokenVerifier.setTestStub(true); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < 10; i++) { + String keyName = UUID.randomUUID().toString(); + + try(OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE)) { + LambdaTestUtils.intercept(IOException.class, "UNAUTHENTICATED: Fail " + + "to find any token ", + () -> out.write(value.getBytes())); + } + + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + LambdaTestUtils.intercept(IOException.class, "Failed to authenticate" + + " with GRPC XceiverServer with Ozone block token.", + () -> bucket.readKey(keyName)); + } + BlockTokenVerifier.setTestStub(false); + } + + private boolean verifyRatisReplication(String volumeName, String bucketName, + String keyName, ReplicationType type, ReplicationFactor factor) + throws IOException { + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + HddsProtos.ReplicationType replicationType = + HddsProtos.ReplicationType.valueOf(type.toString()); + HddsProtos.ReplicationFactor replicationFactor = + HddsProtos.ReplicationFactor.valueOf(factor.getValue()); + OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs); + for (OmKeyLocationInfo info: + keyInfo.getLatestVersionLocations().getLocationList()) { + ContainerInfo container = + storageContainerLocationClient.getContainer(info.getContainerID()); + if (!container.getReplicationFactor().equals(replicationFactor) || ( + container.getReplicationType() != replicationType)) { + return false; + } + } + return true; + } + + /** + * Close OzoneClient and shutdown MiniOzoneCluster. + */ + @AfterClass + public static void shutdown() throws IOException { + if(ozClient != null) { + ozClient.close(); + } + + if (storageContainerLocationClient != null) { + storageContainerLocationClient.close(); + } + + if (cluster != null) { + cluster.shutdown(); + } + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index d58466f887..9e687e5df0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -228,7 +229,8 @@ public void testBothGetandPutSmallFile() throws Exception { XceiverClientGrpc client = null; try { OzoneConfiguration conf = newOzoneConfiguration(); - + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + tempFolder.getRoot().getPath()); client = createClientForTesting(conf); cluster = MiniOzoneCluster.newBuilder(conf) .setRandomContainerPort(false) @@ -286,7 +288,8 @@ public void testCloseContainer() throws Exception { try { OzoneConfiguration conf = newOzoneConfiguration(); - + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + tempFolder.getRoot().getPath()); client = createClientForTesting(conf); cluster = MiniOzoneCluster.newBuilder(conf) .setRandomContainerPort(false) @@ -383,7 +386,8 @@ public void testDeleteContainer() throws Exception { writeChunkRequest, putBlockRequest; try { OzoneConfiguration conf = newOzoneConfiguration(); - + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + tempFolder.getRoot().getPath()); client = createClientForTesting(conf); cluster = MiniOzoneCluster.newBuilder(conf) .setRandomContainerPort(false) @@ -501,7 +505,8 @@ public void testXcieverClientAsync() throws Exception { XceiverClientGrpc client = null; try { OzoneConfiguration conf = newOzoneConfiguration(); - + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + tempFolder.getRoot().getPath()); client = createClientForTesting(conf); cluster = MiniOzoneCluster.newBuilder(conf) .setRandomContainerPort(false) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java index 2224300899..74ce908405 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java @@ -172,7 +172,7 @@ public void testCreateOzoneContainer() throws Exception { public Void run() { try { XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf); - client.connect(); + client.connect(token.encodeToUrlString()); createContainerForTesting(client, containerID); } catch (Exception e) { if (requireBlockToken && hasBlockToken && !blockTokeExpired) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index ecf0d846b1..45490ab8b9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -88,7 +88,7 @@ public void testAllocateWrite() throws Exception { XceiverClientSpi client = xceiverClientManager .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, - container.getContainerInfo().getContainerID(), traceID); + container.getContainerInfo().getContainerID(), traceID, null); BlockID blockID = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); @@ -111,7 +111,7 @@ public void testInvalidBlockRead() throws Exception { XceiverClientSpi client = xceiverClientManager .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, - container.getContainerInfo().getContainerID(), traceID); + container.getContainerInfo().getContainerID(), traceID, null); thrown.expect(StorageContainerException.class); thrown.expectMessage("Unable to find the block"); @@ -135,7 +135,7 @@ public void testInvalidContainerRead() throws Exception { XceiverClientSpi client = xceiverClientManager .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, - container.getContainerInfo().getContainerID(), traceID); + container.getContainerInfo().getContainerID(), traceID, null); BlockID blockID = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); ContainerProtocolCalls.writeSmallFile(client, blockID, @@ -162,7 +162,7 @@ public void testReadWriteWithBCSId() throws Exception { XceiverClientSpi client = xceiverClientManager .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, - container.getContainerInfo().getContainerID(), traceID); + container.getContainerInfo().getContainerID(), traceID, null); BlockID blockID1 = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index b4601fa4d0..d097cb5b1f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -93,7 +93,7 @@ public void tesGetCommittedBlockLength() throws Exception { Pipeline pipeline = container.getPipeline(); XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); //create the container - ContainerProtocolCalls.createContainer(client, containerID, traceID); + ContainerProtocolCalls.createContainer(client, containerID, traceID, null); BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); byte[] data = @@ -126,11 +126,11 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { long containerID = container.getContainerInfo().getContainerID(); XceiverClientSpi client = xceiverClientManager .acquireClient(container.getPipeline()); - ContainerProtocolCalls.createContainer(client, containerID, traceID); + ContainerProtocolCalls.createContainer(client, containerID, traceID, null); BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); // move the container to closed state - ContainerProtocolCalls.closeContainer(client, containerID, traceID); + ContainerProtocolCalls.closeContainer(client, containerID, traceID, null); try { // There is no block written inside the container. The request should // fail. @@ -153,7 +153,7 @@ public void tesPutKeyResposne() throws Exception { Pipeline pipeline = container.getPipeline(); XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); //create the container - ContainerProtocolCalls.createContainer(client, containerID, traceID); + ContainerProtocolCalls.createContainer(client, containerID, traceID, null); BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); byte[] data = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index c8ba7618bc..f6947c1c40 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -146,7 +146,7 @@ public void testFreeByReference() throws IOException { // However container call should succeed because of refcount on the client. String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); ContainerProtocolCalls.createContainer(client1, - container1.getContainerInfo().getContainerID(), traceID1); + container1.getContainerInfo().getContainerID(), traceID1, null); // After releasing the client, this connection should be closed // and any container operations should fail @@ -155,7 +155,7 @@ public void testFreeByReference() throws IOException { String expectedMessage = "This channel is not connected."; try { ContainerProtocolCalls.createContainer(client1, - container1.getContainerInfo().getContainerID(), traceID1); + container1.getContainerInfo().getContainerID(), traceID1, null); Assert.fail("Create container should throw exception on closed" + "client"); } catch (Exception e) { @@ -206,7 +206,7 @@ public void testFreeByEviction() throws IOException { String expectedMessage = "This channel is not connected."; try { ContainerProtocolCalls.createContainer(client1, - container1.getContainerInfo().getContainerID(), traceID2); + container1.getContainerInfo().getContainerID(), traceID2, null); Assert.fail("Create container should throw exception on closed" + "client"); } catch (Exception e) { diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/OzoneHddsDatanodeService.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/OzoneHddsDatanodeService.java index 87b1e21309..621b659711 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/OzoneHddsDatanodeService.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/OzoneHddsDatanodeService.java @@ -72,7 +72,9 @@ public void start(Object service) { @Override public void stop() { try { - handler.close(); + if (handler != null) { + handler.close(); + } } catch (Exception e) { throw new RuntimeException("Can't stop the Object Store Rest server", e); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index cdc645736f..53dd7c3a29 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.HashMap; @@ -36,6 +37,10 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto; +import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; @@ -57,6 +62,9 @@ import org.apache.hadoop.utils.db.DBStore; import com.google.common.base.Preconditions; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; @@ -89,13 +97,14 @@ public class KeyManagerImpl implements KeyManager { private final long preallocateMax; private final String omId; + private final OzoneBlockTokenSecretManager secretManager; + private final boolean grpcBlockTokenEnabled; private BackgroundService keyDeletingService; public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, - OMMetadataManager metadataManager, - OzoneConfiguration conf, - String omId) { + OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, + OzoneBlockTokenSecretManager secretManager) { this.scmBlockClient = scmBlockClient; this.metadataManager = metadataManager; this.scmBlockSize = (long) conf @@ -108,6 +117,10 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); this.omId = omId; start(conf); + this.secretManager = secretManager; + this.grpcBlockTokenEnabled = conf.getBoolean( + HDDS_GRPC_BLOCK_TOKEN_ENABLED, + HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT); } @Override @@ -185,11 +198,18 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) } throw ex; } - OmKeyLocationInfo info = new OmKeyLocationInfo.Builder() + OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(allocatedBlock.getBlockID())) .setLength(scmBlockSize) - .setOffset(0) - .build(); + .setOffset(0); + if (grpcBlockTokenEnabled) { + String remoteUser = getRemoteUser().getShortUserName(); + builder.setToken(secretManager.generateToken(remoteUser, + allocatedBlock.getBlockID().toString(), + getAclForUser(remoteUser), + scmBlockSize)); + } + OmKeyLocationInfo info = builder.build(); // current version not committed, so new blocks coming now are added to // the same version keyInfo.appendNewBlocks(Collections.singletonList(info)); @@ -199,6 +219,24 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) return info; } + /* Optimize ugi lookup for RPC operations to avoid a trip through + * UGI.getCurrentUser which is synch'ed. + */ + public static UserGroupInformation getRemoteUser() throws IOException { + UserGroupInformation ugi = Server.getRemoteUser(); + return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); + } + + /** + * Return acl for user. + * @param user + * + * */ + private EnumSet getAclForUser(String user) { + // TODO: Return correct acl for user. + return EnumSet.allOf(AccessModeProto.class); + } + @Override public OpenKeySession openKey(OmKeyArgs args) throws IOException { Preconditions.checkNotNull(args); @@ -268,11 +306,19 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } throw ex; } - OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder() + OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(allocatedBlock.getBlockID())) .setLength(allocateSize) - .setOffset(0) - .build(); + .setOffset(0); + if (grpcBlockTokenEnabled) { + String remoteUser = getRemoteUser().getShortUserName(); + builder.setToken(secretManager.generateToken(remoteUser, + allocatedBlock.getBlockID().toString(), + getAclForUser(remoteUser), + scmBlockSize)); + } + + OmKeyLocationInfo subKeyInfo = builder.build(); locations.add(subKeyInfo); requestedSize -= allocateSize; } @@ -422,6 +468,17 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { throw new OMException("Key not found", OMException.ResultCodes.FAILED_KEY_NOT_FOUND); } + if (grpcBlockTokenEnabled) { + String remoteUser = getRemoteUser().getShortUserName(); + for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) { + key.getLocationList().forEach(k -> { + k.setToken(secretManager.generateToken(remoteUser, + k.getBlockID().getContainerBlockID().toString(), + getAclForUser(remoteUser), + k.getLength())); + }); + } + } return value; } catch (IOException ex) { LOG.debug("Get key failed for volume:{} bucket:{} key:{}", diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index b754cc8710..196ba29005 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -24,8 +24,10 @@ import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import java.security.KeyPair; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -37,6 +39,7 @@ import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdfs.DFSUtil; @@ -95,6 +98,7 @@ import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType; import org.apache.hadoop.ozone.security.acl.OzoneObjInfo; import org.apache.hadoop.ozone.security.acl.RequestContext; +import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -180,10 +184,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private static final String OM_DAEMON = "om"; private static boolean securityEnabled = false; private static OzoneDelegationTokenSecretManager - secretManager; + delegationTokenMgr; + private OzoneBlockTokenSecretManager blockTokenMgr; private KeyPair keyPair; private CertificateClient certClient; private static boolean testSecureOmFlag = false; + private final Text omRpcAddressTxt; private final OzoneConfiguration configuration; private RPC.Server omRpcServer; private InetSocketAddress omRpcAddress; @@ -213,6 +219,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final boolean isAclEnabled; private final IAccessAuthorizer accessAuthorizer; private JvmPauseMonitor jvmPauseMonitor; + private final SecurityConfig secConfig; private OzoneManager(OzoneConfiguration conf) throws IOException { Preconditions.checkNotNull(conf); @@ -224,7 +231,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { ResultCodes.OM_NOT_INITIALIZED); } - if (!testSecureOmFlag) { + if (!testSecureOmFlag || !isOzoneSecurityEnabled()) { scmContainerClient = getScmContainerClient(configuration); // verifies that the SCM info in the OM Version file is correct. scmBlockClient = getScmBlockClient(configuration); @@ -271,7 +278,14 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { BlockingService omService = newReflectiveBlockingService( new OzoneManagerProtocolServerSideTranslatorPB( this, omRatisClient, omRatisEnabled)); - secretManager = createSecretManager(configuration); + omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration)); + secConfig = new SecurityConfig(configuration); + if (secConfig.isGrpcBlockTokenEnabled()) { + blockTokenMgr = createBlockTokenSecretManager(configuration); + } + if(secConfig.isSecurityEnabled()){ + delegationTokenMgr = createDelegationTokenSecretManager(configuration); + } omRpcServer = startRpcServer(configuration, omNodeRpcAddr, OzoneManagerProtocolPB.class, omService, @@ -285,9 +299,8 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager, volumeManager, bucketManager); - keyManager = - new KeyManagerImpl(scmBlockClient, metadataManager, configuration, - omStorage.getOmId()); + keyManager = new KeyManagerImpl(scmBlockClient, metadataManager, + configuration, omStorage.getOmId(), blockTokenMgr); shutdownHook = () -> { saveOmMetrics(); @@ -368,7 +381,7 @@ private File getMetricsStorageFile() { } - private OzoneDelegationTokenSecretManager createSecretManager( + private OzoneDelegationTokenSecretManager createDelegationTokenSecretManager( OzoneConfiguration conf) throws IOException { long tokenRemoverScanInterval = conf.getTimeDuration(OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_KEY, @@ -387,30 +400,78 @@ private OzoneDelegationTokenSecretManager createSecretManager( tokenRenewInterval, tokenRemoverScanInterval, omRpcAddressTxt); } - private void stopSecretManager() throws IOException { - if (secretManager != null) { - LOG.info("Stopping OM secret manager"); - secretManager.stop(); + private OzoneBlockTokenSecretManager createBlockTokenSecretManager( + OzoneConfiguration conf) { + + long expiryTime = conf.getTimeDuration( + HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME, + HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT, + TimeUnit.MILLISECONDS); + // TODO: Pass OM cert serial ID. + if (testSecureOmFlag) { + return new OzoneBlockTokenSecretManager(secConfig, expiryTime, "1"); + } + Objects.nonNull(certClient); + return new OzoneBlockTokenSecretManager(secConfig, expiryTime, + certClient.getCertificate(OM_DAEMON).getSerialNumber().toString()); + } + + private void stopSecretManager() { + if (blockTokenMgr != null) { + LOG.info("Stopping OM block token manager."); + try { + blockTokenMgr.stop(); + } catch (IOException e) { + LOG.error("Failed to stop block token manager", e); + } + } + + if (delegationTokenMgr != null) { + LOG.info("Stopping OM delegation token secret manager."); + try { + delegationTokenMgr.stop(); + } catch (IOException e) { + LOG.error("Failed to stop delegation token manager", e); + } } } - private void startSecretManager() { - if (secretManager != null) { + @VisibleForTesting + public void startSecretManager() { + try { + readKeyPair(); + } catch (OzoneSecurityException e) { + LOG.error("Unable to read key pair for OM.", e); + throw new RuntimeException(e); + } + if (secConfig.isGrpcBlockTokenEnabled() && blockTokenMgr != null) { try { - readKeyPair(); - LOG.info("Starting OM secret manager"); - secretManager.start(keyPair); + LOG.info("Starting OM block token secret manager"); + blockTokenMgr.start(keyPair); } catch (IOException e) { - // Inability to start secret manager - // can't be recovered from. - LOG.error("Error starting secret manager.", e); + // Unable to start secret manager. + LOG.error("Error starting block token secret manager.", e); + throw new RuntimeException(e); + } + } + + if (delegationTokenMgr != null) { + try { + LOG.info("Starting OM delegation token secret manager"); + delegationTokenMgr.start(keyPair); + } catch (IOException e) { + // Unable to start secret manager. + LOG.error("Error starting delegation token secret manager.", e); throw new RuntimeException(e); } } } + /** + * For testing purpose only. + * */ public void setCertClient(CertificateClient certClient) { - // TODO: Initialize it in contructor with implementation for certClient. + // TODO: Initialize it in constructor with implementation for certClient. this.certClient = certClient; } @@ -524,7 +585,7 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf, .setPort(addr.getPort()) .setNumHandlers(handlerCount) .setVerbose(false) - .setSecretManager(secretManager) + .setSecretManager(delegationTokenMgr) .build(); DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); @@ -844,10 +905,13 @@ public void join() { } private void startSecretManagerIfNecessary() { - boolean shouldRun = shouldUseDelegationTokens() && isOzoneSecurityEnabled(); - boolean running = secretManager.isRunning(); - if (shouldRun && !running) { - startSecretManager(); + boolean shouldRun = isOzoneSecurityEnabled(); + if (shouldRun) { + boolean running = delegationTokenMgr.isRunning() + && blockTokenMgr.isRunning(); + if(!running){ + startSecretManager(); + } } } @@ -910,7 +974,7 @@ public Token getDelegationToken(Text renewer) throw new IOException("Delegation Token can be issued only with " + "kerberos or web authentication"); } - if (secretManager == null || !secretManager.isRunning()) { + if (delegationTokenMgr == null || !delegationTokenMgr.isRunning()) { LOG.warn("trying to get DT with no secret manager running in OM."); return null; } @@ -923,7 +987,7 @@ public Token getDelegationToken(Text renewer) realUser = new Text(ugi.getRealUser().getUserName()); } - token = secretManager.createToken(owner, renewer, realUser); + token = delegationTokenMgr.createToken(owner, renewer, realUser); return token; } @@ -946,7 +1010,7 @@ public long renewDelegationToken(Token token) + "kerberos or web authentication"); } String renewer = getRemoteUser().getShortUserName(); - expiryTime = secretManager.renewToken(token, renewer); + expiryTime = delegationTokenMgr.renewToken(token, renewer); } catch (AccessControlException ace) { final OzoneTokenIdentifier id = OzoneTokenIdentifier.readProtoBuf( @@ -969,7 +1033,7 @@ public void cancelDelegationToken(Token token) OzoneTokenIdentifier id = null; try { String canceller = getRemoteUser().getUserName(); - id = secretManager.cancelToken(token, canceller); + id = delegationTokenMgr.cancelToken(token, canceller); LOG.trace("Delegation token renewed for dt: {}", id); } catch (AccessControlException ace) { LOG.error("Delegation token renewal failed for dt: {}, cause: {}", id, diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java index 07f1600496..c86c12a881 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java @@ -90,7 +90,7 @@ public void checkIfDeleteServiceisDeletingKeys() KeyManager keyManager = new KeyManagerImpl( new ScmBlockLocationTestIngClient(null, null, 0), - metaMgr, conf, UUID.randomUUID().toString()); + metaMgr, conf, UUID.randomUUID().toString(), null); final int keyCount = 100; createAndDeleteKeys(keyManager, keyCount, 1); KeyDeletingService keyDeletingService = @@ -112,7 +112,7 @@ public void checkIfDeleteServiceWithFailingSCM() KeyManager keyManager = new KeyManagerImpl( new ScmBlockLocationTestIngClient(null, null, 1), - metaMgr, conf, UUID.randomUUID().toString()); + metaMgr, conf, UUID.randomUUID().toString(), null); final int keyCount = 100; createAndDeleteKeys(keyManager, keyCount, 1); KeyDeletingService keyDeletingService = @@ -139,7 +139,7 @@ public void checkDeletionForEmptyKey() KeyManager keyManager = new KeyManagerImpl( new ScmBlockLocationTestIngClient(null, null, 1), - metaMgr, conf, UUID.randomUUID().toString()); + metaMgr, conf, UUID.randomUUID().toString(), null); final int keyCount = 100; createAndDeleteKeys(keyManager, keyCount, 0); KeyDeletingService keyDeletingService = diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index 82ecd2ec20..4531dbbfa3 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -84,7 +84,7 @@ public void setUp() throws Exception { scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class); metadataManager = Mockito.mock(OMMetadataManager.class); keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager, - conf, "om1"); + conf, "om1", null); setupMocks(); }