diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index a3a45fe90b..0d48805e81 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -126,9 +126,9 @@ public final class HddsConfigKeys { public static final String HDDS_X509_SIGNATURE_ALGO = "hdds.x509.signature.algorithm"; public static final String HDDS_X509_SIGNATURE_ALGO_DEFAULT = "SHA256withRSA"; - public static final String HDDS_GRPC_BLOCK_TOKEN_ENABLED = - "hdds.grpc.block.token.enabled"; - public static final boolean HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT = false; + public static final String HDDS_BLOCK_TOKEN_ENABLED = + "hdds.block.token.enabled"; + public static final boolean HDDS_BLOCK_TOKEN_ENABLED_DEFAULT = false; public static final String HDDS_X509_DIR_NAME = "hdds.x509.dir.name"; public static final String HDDS_X509_DIR_NAME_DEFAULT = "certs"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java index 21bc45badc..f5453254ad 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java @@ -24,6 +24,7 @@ * Root Security Exception call for all Certificate related Execptions. */ public class SCMSecurityException extends IOException { + private final ErrorCode errorCode; /** * Ctor. @@ -31,6 +32,7 @@ public class SCMSecurityException extends IOException { */ public SCMSecurityException(String message) { super(message); + this.errorCode = ErrorCode.DEFAULT; } /** @@ -40,6 +42,17 @@ public SCMSecurityException(String message) { */ public SCMSecurityException(String message, Throwable cause) { super(message, cause); + this.errorCode = ErrorCode.DEFAULT; + } + + /** + * Ctor. + * @param message - Message. + * @param error - error code. + */ + public SCMSecurityException(String message, ErrorCode error) { + super(message); + this.errorCode = error; } /** @@ -48,5 +61,18 @@ public SCMSecurityException(String message, Throwable cause) { */ public SCMSecurityException(Throwable cause) { super(cause); + this.errorCode = ErrorCode.DEFAULT; + } + + public ErrorCode getErrorCode() { + return errorCode; + } + + /** + * Error codes to make it easy to decode these exceptions. + */ + public enum ErrorCode { + DEFAULT, + MISSING_BLOCK_TOKEN } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java index 9867879588..f76dac477c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java @@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -40,6 +42,8 @@ public class BlockTokenVerifier implements TokenVerifier { private final CertificateClient caClient; private final SecurityConfig conf; private static boolean testStub = false; + private final static Logger LOGGER = + LoggerFactory.getLogger(BlockTokenVerifier.class); public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) { this.conf = conf; @@ -53,7 +57,9 @@ private boolean isExpired(long expiryDate) { @Override public UserGroupInformation verify(String user, String tokenStr) throws SCMSecurityException { - if (conf.isGrpcBlockTokenEnabled()) { + if (conf.isBlockTokenEnabled()) { + // TODO: add audit logs. + if (Strings.isNullOrEmpty(tokenStr) || isTestStub()) { throw new BlockTokenException("Fail to find any token (empty or " + "null."); @@ -62,10 +68,12 @@ public UserGroupInformation verify(String user, String tokenStr) OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); try { token.decodeFromUrlString(tokenStr); + LOGGER.debug("Verifying token:{} for user:{} ", token, user); ByteArrayInputStream buf = new ByteArrayInputStream( token.getIdentifier()); DataInputStream in = new DataInputStream(buf); tokenId.readFields(in); + } catch (IOException ex) { throw new BlockTokenException("Failed to decode token : " + tokenStr); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java index 10a63fdf0c..91bf2911eb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java @@ -36,8 +36,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_ALGORITHM; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_LEN; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_SECURITY_PROVIDER; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_ENABLED_DEFAULT; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER; @@ -98,7 +98,7 @@ public class SecurityConfig { private final String publicKeyFileName; private final Duration certDuration; private final String x509SignatureAlgo; - private final boolean grpcBlockTokenEnabled; + private final boolean blockTokenEnabled; private final String certificateDir; private final String certificateFileName; private final boolean grpcTlsEnabled; @@ -147,9 +147,9 @@ public SecurityConfig(Configuration configuration) { this.certificateFileName = this.configuration.get(HDDS_X509_FILE_NAME, HDDS_X509_FILE_NAME_DEFAULT); - this.grpcBlockTokenEnabled = this.configuration.getBoolean( - HDDS_GRPC_BLOCK_TOKEN_ENABLED, - HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT); + this.blockTokenEnabled = this.configuration.getBoolean( + HDDS_BLOCK_TOKEN_ENABLED, + HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); this.grpcTlsEnabled = this.configuration.getBoolean(HDDS_GRPC_TLS_ENABLED, HDDS_GRPC_TLS_ENABLED_DEFAULT); @@ -357,8 +357,8 @@ public Duration getMaxCertificateDuration() { return this.certDuration; } - public boolean isGrpcBlockTokenEnabled() { - return this.grpcBlockTokenEnabled; + public boolean isBlockTokenEnabled() { + return this.blockTokenEnabled; } /** diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 77d75d8767..61b8496d4f 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1658,7 +1658,7 @@ - hdds.grpc.block.token.enabled + hdds.block.token.enabled false OZONE, HDDS, SECURITY, TOKEN True if block tokens are enabled, else false. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index 37b7d5dcff..dc5f5bc854 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -23,7 +23,9 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto .XceiverClientProtocolServiceGrpc; +import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +41,18 @@ public class GrpcXceiverService extends LOG = LoggerFactory.getLogger(GrpcXceiverService.class); private final ContainerDispatcher dispatcher; + private final boolean isGrpcTokenEnabled; + private final TokenVerifier tokenVerifier; public GrpcXceiverService(ContainerDispatcher dispatcher) { + this(dispatcher, false, null); + } + + public GrpcXceiverService(ContainerDispatcher dispatcher, + boolean grpcTokenEnabled, TokenVerifier tokenVerifier) { this.dispatcher = dispatcher; + this.isGrpcTokenEnabled = grpcTokenEnabled; + this.tokenVerifier = tokenVerifier; } @Override @@ -53,6 +64,11 @@ public StreamObserver send( @Override public void onNext(ContainerCommandRequestProto request) { try { + if(isGrpcTokenEnabled) { + // ServerInterceptors intercepts incoming request and creates ugi. + tokenVerifier.verify(UserGroupInformation.getCurrentUser() + .getShortUserName(), request.getEncodedToken()); + } ContainerCommandResponseProto resp = dispatcher.dispatch(request, null); responseObserver.onNext(resp); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java new file mode 100644 index 0000000000..7691bdda1e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; +import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; + +import java.io.IOException; +import java.util.Objects; + +/** + * A server endpoint that acts as the communication layer for Ozone containers. + */ +public abstract class XceiverServer implements XceiverServerSpi { + + private final SecurityConfig secConfig; + private final TokenVerifier tokenVerifier; + + public XceiverServer(Configuration conf) { + Objects.nonNull(conf); + this.secConfig = new SecurityConfig(conf); + tokenVerifier = new BlockTokenVerifier(secConfig, getCaClient()); + } + + /** + * Default implementation which just validates security token if security is + * enabled. + * + * @param request ContainerCommandRequest + */ + @Override + public void submitRequest(ContainerCommandRequestProto request, + HddsProtos.PipelineID pipelineID) throws IOException { + if (secConfig.isSecurityEnabled()) { + String encodedToken = request.getEncodedToken(); + if (encodedToken == null) { + throw new SCMSecurityException("Security is enabled but client " + + "request is missing block token.", + SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN); + } + tokenVerifier.verify(encodedToken, ""); + } + } + + @VisibleForTesting + protected CertificateClient getCaClient() { + // TODO: instantiate CertificateClient + return null; + } + + protected SecurityConfig getSecurityConfig() { + return secConfig; + } + + protected TokenVerifier getBlockTokenVerifier() { + return tokenVerifier; + } + + public SecurityConfig getSecConfig() { + return secConfig; + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 30c7c26f23..b38d99f1b4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.common.transport.server; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -31,9 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.common.helpers. StorageContainerException; -import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; -import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -62,7 +58,7 @@ * Creates a Grpc server endpoint that acts as the communication layer for * Ozone containers. */ -public final class XceiverServerGrpc implements XceiverServerSpi { +public final class XceiverServerGrpc extends XceiverServer { private static final Logger LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); private int port; @@ -77,6 +73,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { */ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, ContainerDispatcher dispatcher, BindableService... additionalServices) { + super(conf); Preconditions.checkNotNull(conf); this.id = datanodeDetails.getUuid(); @@ -103,33 +100,35 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, ((NettyServerBuilder) ServerBuilder.forPort(port)) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); - // Populate UGI context via ServerCredentialInterceptor - SecurityConfig secConfig = new SecurityConfig(conf); ServerCredentialInterceptor credInterceptor = - new ServerCredentialInterceptor( - new BlockTokenVerifier(secConfig, getCaClient())); + new ServerCredentialInterceptor(getBlockTokenVerifier()); nettyServerBuilder.addService(ServerInterceptors.intercept( - new GrpcXceiverService(dispatcher), credInterceptor)); + new GrpcXceiverService(dispatcher, + getSecurityConfig().isBlockTokenEnabled(), + getBlockTokenVerifier()), credInterceptor)); + for (BindableService service : additionalServices) { nettyServerBuilder.addService(service); } - if (secConfig.isGrpcTlsEnabled()) { - File privateKeyFilePath = secConfig.getServerPrivateKeyFile(); - File serverCertChainFilePath = secConfig.getServerCertChainFile(); - File clientCertChainFilePath = secConfig.getClientCertChainFile(); + if (getSecConfig().isGrpcTlsEnabled()) { + File privateKeyFilePath = getSecurityConfig().getServerPrivateKeyFile(); + File serverCertChainFilePath = + getSecurityConfig().getServerCertChainFile(); + File clientCertChainFilePath = + getSecurityConfig().getClientCertChainFile(); try { SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer( serverCertChainFilePath, privateKeyFilePath); - if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFilePath - != null) { + if (getSecurityConfig().isGrpcMutualTlsRequired() && + clientCertChainFilePath != null) { // Only needed for mutual TLS sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE); sslClientContextBuilder.trustManager(clientCertChainFilePath); } SslContextBuilder sslContextBuilder = GrpcSslContexts.configure( - sslClientContextBuilder, secConfig.getGrpcSslProvider()); + sslClientContextBuilder, getSecurityConfig().getGrpcSslProvider()); nettyServerBuilder.sslContext(sslContextBuilder.build()); } catch (Exception ex) { LOG.error("Unable to setup TLS for secure datanode GRPC endpoint.", ex); @@ -139,12 +138,6 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, storageContainer = dispatcher; } - @VisibleForTesting - public CertificateClient getCaClient() { - // TODO: instantiate CertificateClient - return null; - } - @Override public int getIPCPort() { return this.port; @@ -173,6 +166,7 @@ public void stop() { @Override public void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { + super.submitRequest(request, pipelineID); ContainerProtos.ContainerCommandResponseProto response = storageContainer.dispatch(request, null); if (response.getResult() != ContainerProtos.Result.SUCCESS) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 68ee91c6c7..ea8a15f74a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadChunkResponseProto; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.server.storage.RaftStorage; @@ -126,6 +128,8 @@ public class ContainerStateMachine extends BaseStateMachine { private final Map applyTransactionCompletionMap; private long lastIndex; private final Cache stateMachineDataCache; + private final boolean isBlockTokenEnabled; + private final TokenVerifier tokenVerifier; /** * CSM metrics. */ @@ -133,7 +137,8 @@ public class ContainerStateMachine extends BaseStateMachine { public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer, - List executors, long expiryInterval) { + List executors, long expiryInterval, + boolean isBlockTokenEnabled, TokenVerifier tokenVerifier) { this.gid = gid; this.dispatcher = dispatcher; this.chunkExecutor = chunkExecutor; @@ -149,6 +154,8 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, // set the limit on no of cached entries equal to no of max threads // executing writeStateMachineData .maximumSize(chunkExecutor.getCorePoolSize()).build(); + this.isBlockTokenEnabled = isBlockTokenEnabled; + this.tokenVerifier = tokenVerifier; } @Override @@ -289,8 +296,13 @@ private ContainerCommandRequestProto getRequestProto(ByteString request) private ContainerCommandResponseProto dispatchCommand( ContainerCommandRequestProto requestProto, - DispatcherContext context) { + DispatcherContext context) throws IOException { LOG.trace("dispatch {}", requestProto); + if(isBlockTokenEnabled) { + // ServerInterceptors intercepts incoming request and creates ugi. + tokenVerifier.verify(UserGroupInformation.getCurrentUser() + .getShortUserName(), requestProto.getEncodedToken()); + } ContainerCommandResponseProto response = dispatcher.dispatch(requestProto, context); LOG.trace("response {}", response); @@ -298,7 +310,7 @@ private ContainerCommandResponseProto dispatchCommand( } private Message runCommand(ContainerCommandRequestProto requestProto, - DispatcherContext context) { + DispatcherContext context) throws IOException { return dispatchCommand(requestProto, context)::toByteString; } @@ -326,8 +338,15 @@ private CompletableFuture handleWriteChunk( .setLogIndex(entryIndex) .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) .build(); - CompletableFuture writeChunkFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor); + CompletableFuture writeChunkFuture; + try { + Message msg = runCommand(requestProto, context); + writeChunkFuture = CompletableFuture + .supplyAsync(() -> msg, chunkExecutor); + }catch(IOException ie) { + writeChunkFuture = completeExceptionally(ie); + } + writeChunkFutureMap.put(entryIndex, writeChunkFuture); LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + write.getChunkData() @@ -386,7 +405,8 @@ public CompletableFuture query(Message request) { } private ByteString readStateMachineData( - ContainerCommandRequestProto requestProto, long term, long index) { + ContainerCommandRequestProto requestProto, long term, long index) + throws IOException { WriteChunkRequestProto writeChunkRequestProto = requestProto.getWriteChunk(); ContainerProtos.ChunkInfo chunkInfo = writeChunkRequestProto.getChunkData(); @@ -559,9 +579,14 @@ public CompletableFuture applyTransaction(TransactionContext trx) { builder .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA); } - future = CompletableFuture - .supplyAsync(() -> runCommand(requestProto, builder.build()), - getCommandExecutor(requestProto)); + try { + Message msg = runCommand(requestProto, builder.build()); + future = CompletableFuture.supplyAsync(() -> msg, + getCommandExecutor(requestProto)); + } catch (IOException ie) { + future = completeExceptionally(ie); + } + lastIndex = index; future.thenAccept(m -> { final Long previous = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index ad27de1849..18253197bf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -38,8 +38,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.transport.server - .XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClientConfigKeys; @@ -69,7 +68,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.HEAD; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -91,7 +89,7 @@ * Creates a ratis server endpoint that acts as the communication layer for * Ozone containers. */ -public final class XceiverServerRatis implements XceiverServerSpi { +public final class XceiverServerRatis extends XceiverServer { private static final Logger LOG = LoggerFactory .getLogger(XceiverServerRatis.class); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); @@ -115,6 +113,7 @@ private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext context, GrpcTlsConfig tlsConfig) throws IOException { + super(conf); Objects.requireNonNull(dd, "id == null"); this.port = port; RaftProperties serverProperties = newRaftProperties(conf); @@ -155,7 +154,8 @@ private XceiverServerRatis(DatanodeDetails dd, int port, private ContainerStateMachine getStateMachine(RaftGroupId gid) { return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this, - Collections.unmodifiableList(executors), cacheEntryExpiryInteval); + Collections.unmodifiableList(executors), cacheEntryExpiryInteval, + getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier()); } private RaftProperties newRaftProperties(Configuration conf) { @@ -479,6 +479,7 @@ private void processReply(RaftClientReply reply) throws IOException { @Override public void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { + super.submitRequest(request, pipelineID); RaftClientReply reply; RaftClientRequest raftClientRequest = createRaftClientRequest(request, pipelineID, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java new file mode 100644 index 0000000000..74be7bdd00 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.CertificateClientTestImpl; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests the containerStateMachine failure handling. + */ + +public class TestContainerStateMachine { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static String volumeName; + private static String bucketName; + private static String path; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + path = GenericTestUtils + .getTempPath(TestContainerStateMachine.class.getSimpleName()); + File baseDir = new File(path); + baseDir.mkdirs(); + + conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); + // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + OzoneManager.setTestSecureOmFlag(true); + // conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); + cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) + .setHbInterval(200) + .setCertificateClient(new CertificateClientTestImpl(conf)) + .build(); + cluster.waitForClusterToBeReady(); + cluster.getOzoneManager().startSecretManager(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + volumeName = "testcontainerstatemachinefailures"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testContainerStateMachineFailures() throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis", 1024, ReplicationType.RATIS, + ReplicationFactor.ONE); + // First write and flush creates a container in the datanode + key.write("ratis".getBytes()); + key.flush(); + key.write("ratis".getBytes()); + + //get the name of a valid container + KeyOutputStream groupOutputStream = + (KeyOutputStream) key.getOutputStream(); + + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + + // delete the container dir + FileUtil.fullyDelete(new File( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() + .getContainerPath())); + + key.close(); + // Make sure the container is marked unhealthy + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()) + .getContainerState() + == ContainerProtos.ContainerDataProto.State.UNHEALTHY); + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java index 16df75df87..c313d32522 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java @@ -86,7 +86,7 @@ public static void init() throws Exception { conf = new OzoneConfiguration(); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1); - conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED, true); + conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true); conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); CertificateClientTestImpl certificateClientTest = new CertificateClientTestImpl(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 82c3ab8862..026c22a117 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -23,8 +23,10 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; @@ -35,6 +37,7 @@ import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.security.token.Token; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -213,7 +216,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest( writeRequest.setChunkData(info.getProtoBufMessage()); writeRequest.setData(ByteString.copyFrom(data)); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.WriteChunk); request.setContainerID(blockID.getContainerID()); @@ -255,7 +258,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest( smallFileRequest.setData(ByteString.copyFrom(data)); smallFileRequest.setBlock(putRequest); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.PutSmallFile); request.setContainerID(blockID.getContainerID()); @@ -274,7 +277,7 @@ public static ContainerCommandRequestProto getReadSmallFileRequest( ContainerCommandRequestProto getKey = getBlockRequest(pipeline, putKey); smallFileRequest.setBlock(getKey.getGetBlock()); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.GetSmallFile); request.setContainerID(getKey.getGetBlock().getBlockID().getContainerID()); @@ -304,7 +307,7 @@ public static ContainerCommandRequestProto getReadChunkRequest( readRequest.setBlockID(request.getBlockID()); readRequest.setChunkData(request.getChunkData()); - ContainerCommandRequestProto.Builder newRequest = + Builder newRequest = ContainerCommandRequestProto.newBuilder(); newRequest.setCmdType(ContainerProtos.Type.ReadChunk); newRequest.setContainerID(readRequest.getBlockID().getContainerID()); @@ -337,7 +340,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest( deleteRequest.setChunkData(writeRequest.getChunkData()); deleteRequest.setBlockID(writeRequest.getBlockID()); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.DeleteChunk); request.setContainerID(writeRequest.getBlockID().getContainerID()); @@ -356,8 +359,12 @@ public static ContainerCommandRequestProto getDeleteChunkRequest( public static ContainerCommandRequestProto getCreateContainerRequest( long containerID, Pipeline pipeline) throws IOException { LOG.trace("addContainer: {}", containerID); + return getContainerCommandRequestBuilder(containerID, pipeline).build(); + } - ContainerCommandRequestProto.Builder request = + private static Builder getContainerCommandRequestBuilder(long containerID, + Pipeline pipeline) throws IOException { + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.CreateContainer); request.setContainerID(containerID); @@ -366,6 +373,24 @@ public static ContainerCommandRequestProto getCreateContainerRequest( request.setTraceID(UUID.randomUUID().toString()); request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + return request; + } + + /** + * Returns a create container command for test purposes. There are a bunch of + * tests where we need to just send a request and get a reply. + * + * @return ContainerCommandRequestProto. + */ + public static ContainerCommandRequestProto getCreateContainerSecureRequest( + long containerID, Pipeline pipeline, + Token token) throws IOException { + LOG.trace("addContainer: {}", containerID); + + Builder request = getContainerCommandRequestBuilder(containerID, pipeline); + if(token != null){ + request.setEncodedToken(token.encodeToUrlString()); + } return request.build(); } @@ -393,7 +418,7 @@ public static ContainerCommandRequestProto getUpdateContainerRequest( Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.UpdateContainer); request.setContainerID(containerID); @@ -444,7 +469,7 @@ public static ContainerCommandRequestProto getPutBlockRequest( blockData.setBlockCommitSequenceId(0); putRequest.setBlockData(blockData.getProtoBufMessage()); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.PutBlock); request.setContainerID(blockData.getContainerID()); @@ -472,7 +497,7 @@ public static ContainerCommandRequestProto getBlockRequest( ContainerProtos.GetBlockRequestProto.newBuilder(); getRequest.setBlockID(blockID); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.GetBlock); request.setContainerID(blockID.getContainerID()); @@ -510,7 +535,7 @@ public static ContainerCommandRequestProto getDeleteBlockRequest( ContainerProtos.DeleteBlockRequestProto.Builder delRequest = ContainerProtos.DeleteBlockRequestProto.newBuilder(); delRequest.setBlockID(blockID); - ContainerCommandRequestProto.Builder request = + Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.DeleteBlock); request.setContainerID(blockID.getContainerID()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java index 74ce908405..b43570a577 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java @@ -120,7 +120,7 @@ public void testCreateOzoneContainer() throws Exception { LOG.info("Test case: requireBlockToken: {} hasBlockToken: {} " + "blockTokenExpired: {}.", requireBlockToken, hasBlockToken, blockTokeExpired); - conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED, + conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, requireBlockToken); long containerID = ContainerTestHelper.getTestContainerID(); @@ -161,7 +161,7 @@ public void testCreateOzoneContainer() throws Exception { new InetSocketAddress(dn.getIpAddress(), port); Token token = - new Token(tokenId.getBytes(), new byte[2], tokenId.getKind(), + new Token(tokenId.getBytes(), new byte[50], tokenId.getKind(), SecurityUtil.buildTokenService(addr)); if (hasBlockToken) { ugi.addToken(token); @@ -173,9 +173,15 @@ public Void run() { try { XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf); client.connect(token.encodeToUrlString()); - createContainerForTesting(client, containerID); + if (hasBlockToken) { + createContainerForTesting(client, containerID, token); + } else { + createContainerForTesting(client, containerID, null); + } + } catch (Exception e) { if (requireBlockToken && hasBlockToken && !blockTokeExpired) { + LOG.error("Unexpected error. ", e); fail("Client with BlockToken should succeed when block token is" + " required."); } @@ -185,7 +191,7 @@ public Void run() { } if (requireBlockToken && !hasBlockToken) { assertTrue("Receive expected exception", e instanceof - SCMSecurityException); + IOException); } } return null; @@ -199,11 +205,11 @@ public Void run() { } public static void createContainerForTesting(XceiverClientSpi client, - long containerID) throws Exception { + long containerID, Token token) throws Exception { // Create container ContainerProtos.ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest( - containerID, client.getPipeline()); + ContainerTestHelper.getCreateContainerSecureRequest( + containerID, client.getPipeline(), token); ContainerProtos.ContainerCommandResponseProto response = client.sendCommand(request); Assert.assertNotNull(response); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index b4dff7c0dd..8540939a90 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -19,11 +19,14 @@ package org.apache.hadoop.ozone.container.server; import com.google.common.collect.Maps; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -54,14 +57,17 @@ import org.apache.ratis.rpc.RpcType; import org.apache.ratis.util.function.CheckedBiConsumer; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.apache.ratis.rpc.SupportedRpcType.GRPC; import static org.apache.ratis.rpc.SupportedRpcType.NETTY; @@ -72,8 +78,9 @@ */ @Ignore("Takes too long to run this test. Ignoring for time being.") public class TestContainerServer { - static final String TEST_DIR - = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator; + static final String TEST_DIR = GenericTestUtils.getTestDir("dfs") + .getAbsolutePath() + File.separator; + private static final OzoneConfiguration CONF = new OzoneConfiguration(); private GrpcReplicationService createReplicationService( ContainerController containerController) { @@ -81,6 +88,11 @@ private GrpcReplicationService createReplicationService( new OnDemandContainerReplicationSource(containerController)); } + @BeforeClass + static public void setup() { + CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR); + } + @Test public void testClientServer() throws Exception { DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); @@ -151,17 +163,16 @@ static void runTestClientServer( try { final Pipeline pipeline = ContainerTestHelper.createPipeline(numDatanodes); - final OzoneConfiguration conf = new OzoneConfiguration(); - initConf.accept(pipeline, conf); + initConf.accept(pipeline, CONF); for (DatanodeDetails dn : pipeline.getNodes()) { - final XceiverServerSpi s = createServer.apply(dn, conf); + final XceiverServerSpi s = createServer.apply(dn, CONF); servers.add(s); s.start(); initServer.accept(dn, pipeline); } - client = createClient.apply(pipeline, conf); + client = createClient.apply(pipeline, CONF); client.connect(); final ContainerCommandRequestProto request = @@ -184,7 +195,7 @@ static void runTestClientServer( public void testClientServerWithContainerDispatcher() throws Exception { XceiverServerGrpc server = null; XceiverClientGrpc client = null; - + UUID scmId = UUID.randomUUID(); try { Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); OzoneConfiguration conf = new OzoneConfiguration(); @@ -196,16 +207,26 @@ public void testClientServerWithContainerDispatcher() throws Exception { VolumeSet volumeSet = mock(VolumeSet.class); ContainerMetrics metrics = ContainerMetrics.create(conf); Map handlers = Maps.newHashMap(); + DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + DatanodeStateMachine stateMachine = Mockito.mock( + DatanodeStateMachine.class); + StateContext context = Mockito.mock(StateContext.class); + Mockito.when(stateMachine.getDatanodeDetails()) + .thenReturn(datanodeDetails); + Mockito.when(context.getParent()).thenReturn(stateMachine); + + for (ContainerProtos.ContainerType containerType : ContainerProtos.ContainerType.values()) { handlers.put(containerType, - Handler.getHandlerForContainerType( - containerType, conf, null, containerSet, volumeSet, metrics)); + Handler.getHandlerForContainerType(containerType, conf, context, + containerSet, volumeSet, metrics)); } HddsDispatcher dispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, null, metrics); + conf, containerSet, volumeSet, handlers, context, metrics); + dispatcher.setScmId(scmId.toString()); dispatcher.init(); - DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, createReplicationService( new ContainerController(containerSet, null))); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java new file mode 100644 index 0000000000..140ca24ca8 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.server; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.RatisTestHelper; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; +import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.util.function.CheckedBiConsumer; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; +import static org.apache.ratis.rpc.SupportedRpcType.GRPC; +import static org.apache.ratis.rpc.SupportedRpcType.NETTY; + +/** + * Test Container servers when security is enabled. + */ +public class TestSecureContainerServer { + static final String TEST_DIR + = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator; + private static final OzoneConfiguration CONF = new OzoneConfiguration(); + + private GrpcReplicationService createReplicationService( + ContainerController containerController) { + return new GrpcReplicationService( + new OnDemandContainerReplicationSource(containerController)); + } + + @BeforeClass + static public void setup() { + CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR); + CONF.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); + CONF.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); + } + + @Test + public void testClientServer() throws Exception { + DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + ContainerSet containerSet = new ContainerSet(); + ContainerController controller = new ContainerController( + containerSet, null); + runTestClientServer(1, (pipeline, conf) -> conf + .setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + pipeline.getFirstNode() + .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()), + XceiverClientGrpc::new, + (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf, + new TestContainerDispatcher(), + createReplicationService(controller)), (dn, p) -> { + }); + } + + @FunctionalInterface + interface CheckedBiFunction { + OUT apply(LEFT left, RIGHT right) throws THROWABLE; + } + + @Test + public void testClientServerRatisGrpc() throws Exception { + runTestClientServerRatis(GRPC, 1); + runTestClientServerRatis(GRPC, 3); + } + + @Test + @Ignore + public void testClientServerRatisNetty() throws Exception { + runTestClientServerRatis(NETTY, 1); + runTestClientServerRatis(NETTY, 3); + } + + static XceiverServerRatis newXceiverServerRatis( + DatanodeDetails dn, OzoneConfiguration conf) throws IOException { + conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, + dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue()); + final String dir = TEST_DIR + dn.getUuid(); + conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); + + final ContainerDispatcher dispatcher = new TestContainerDispatcher(); + return XceiverServerRatis + .newXceiverServerRatis(dn, conf, dispatcher, null); + } + + static void runTestClientServerRatis(RpcType rpc, int numNodes) + throws Exception { + runTestClientServer(numNodes, + (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf), + XceiverClientRatis::newXceiverClientRatis, + TestSecureContainerServer::newXceiverServerRatis, + (dn, p) -> RatisTestHelper.initXceiverServerRatis(rpc, dn, p)); + } + + static void runTestClientServer( + int numDatanodes, + CheckedBiConsumer initConf, + CheckedBiFunction createClient, + CheckedBiFunction createServer, + CheckedBiConsumer initServer) + throws Exception { + final List servers = new ArrayList<>(); + XceiverClientSpi client = null; + String containerName = OzoneUtils.getRequestID(); + try { + final Pipeline pipeline = ContainerTestHelper.createPipeline(numDatanodes); + + initConf.accept(pipeline, CONF); + + for (DatanodeDetails dn : pipeline.getNodes()) { + final XceiverServerSpi s = createServer.apply(dn, CONF); + servers.add(s); + s.start(); + initServer.accept(dn, pipeline); + } + + client = createClient.apply(pipeline, CONF); + client.connect(); + + // Test 1: Test failure in request without block token. + final ContainerCommandRequestProto request = + ContainerTestHelper + .getCreateContainerRequest( + ContainerTestHelper.getTestContainerID(), pipeline); + Assert.assertNotNull(request.getTraceID()); + + XceiverClientSpi finalClient = client; + LambdaTestUtils.intercept(IOException.class, + () -> finalClient.sendCommand(request)); + + // Test 2: Test success in request with valid block token. + final ContainerCommandRequestProto request2 = + ContainerTestHelper + .getCreateContainerSecureRequest( + ContainerTestHelper.getTestContainerID(), pipeline, + new Token<>()); + Assert.assertNotNull(request2.getTraceID()); + + XceiverClientSpi finalClient2 = client; + LambdaTestUtils.intercept(IOException.class, "", + () -> finalClient2.sendCommand(request)); + } finally { + if (client != null) { + client.close(); + } + servers.stream().forEach(XceiverServerSpi::stop); + } + } + + private static class TestContainerDispatcher implements ContainerDispatcher { + /** + * Dispatches commands to container layer. + * + * @param msg - Command Request + * @return Command Response + */ + @Override + public ContainerCommandResponseProto dispatch( + ContainerCommandRequestProto msg, + DispatcherContext context) { + return ContainerTestHelper.getCreateContainerResponse(msg); + } + + @Override + public void init() { + } + + @Override + public void validateContainerCommand( + ContainerCommandRequestProto msg) throws StorageContainerException { + } + + @Override + public void shutdown() { + } + @Override + public Handler getHandler(ContainerProtos.ContainerType containerType) { + return null; + } + + @Override + public void setScmId(String scmId) { + + } + } + +} \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 53dd7c3a29..752d18d189 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -63,8 +63,8 @@ import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; @@ -119,8 +119,8 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, start(conf); this.secretManager = secretManager; this.grpcBlockTokenEnabled = conf.getBoolean( - HDDS_GRPC_BLOCK_TOKEN_ENABLED, - HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT); + HDDS_BLOCK_TOKEN_ENABLED, + HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 7fde94b4bf..a921a03d0b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -255,7 +255,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration)); secConfig = new SecurityConfig(configuration); - if (secConfig.isGrpcBlockTokenEnabled()) { + if (secConfig.isBlockTokenEnabled()) { blockTokenMgr = createBlockTokenSecretManager(configuration); } if(secConfig.isSecurityEnabled()){ @@ -385,7 +385,7 @@ private OzoneBlockTokenSecretManager createBlockTokenSecretManager( if (testSecureOmFlag) { return new OzoneBlockTokenSecretManager(secConfig, expiryTime, "1"); } - Objects.nonNull(certClient); + Objects.requireNonNull(certClient); return new OzoneBlockTokenSecretManager(secConfig, expiryTime, certClient.getCertificate(OM_DAEMON).getSerialNumber().toString()); } @@ -418,7 +418,7 @@ public void startSecretManager() { LOG.error("Unable to read key pair for OM.", e); throw new RuntimeException(e); } - if (secConfig.isGrpcBlockTokenEnabled() && blockTokenMgr != null) { + if (secConfig.isBlockTokenEnabled() && blockTokenMgr != null) { try { LOG.info("Starting OM block token secret manager"); blockTokenMgr.start(keyPair); @@ -982,11 +982,6 @@ private void startSecretManagerIfNecessary() { } } - private boolean shouldUseDelegationTokens() { - return UserGroupInformation.isSecurityEnabled(); - } - - /** * * @return true if delegation token operation is allowed @@ -1770,7 +1765,7 @@ private Map buildAuditMap(String volume){ @Override public AuditMessage buildAuditMessageForSuccess(AuditAction op, - Map auditMap) { + Map auditMap) { return new AuditMessage.Builder() .setUser((Server.getRemoteUser() == null) ? null : Server.getRemoteUser().getUserName())