diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 3ffe54e212..5c8ca26049 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -133,7 +133,7 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) .getIpAddress(), port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .intercept(new ClientCredentialInterceptor(userName, encodedToken)); - if (SecurityConfig.isGrpcTlsEnabled(config)) { + if (secConfig.isGrpcTlsEnabled()) { File trustCertCollectionFile = secConfig.getTrustStoreFile(); File privateKeyFile = secConfig.getClientPrivateKeyFile(); File clientCertChainFile = secConfig.getClientCertChainFile(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 1980b474ad..338a198ac3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.retry.RetryPolicy; @@ -69,9 +71,11 @@ public static XceiverClientRatis newXceiverClientRatis( final int maxOutstandingRequests = HddsClientUtils.getMaxOutstandingRequests(ozoneConf); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new + SecurityConfig(ozoneConf)); return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, - retryPolicy); + retryPolicy, tlsConfig); } private final Pipeline pipeline; @@ -79,6 +83,7 @@ public static XceiverClientRatis newXceiverClientRatis( private final AtomicReference client = new AtomicReference<>(); private final int maxOutstandingRequests; private final RetryPolicy retryPolicy; + private final GrpcTlsConfig tlsConfig; // Map to track commit index at every server private final ConcurrentHashMap commitInfoMap; @@ -90,7 +95,8 @@ public static XceiverClientRatis newXceiverClientRatis( * Constructs a client. */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, - int maxOutStandingChunks, RetryPolicy retryPolicy) { + int maxOutStandingChunks, RetryPolicy retryPolicy, + GrpcTlsConfig tlsConfig) { super(); this.pipeline = pipeline; this.rpcType = rpcType; @@ -98,6 +104,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, this.retryPolicy = retryPolicy; commitInfoMap = new ConcurrentHashMap<>(); watchClient = null; + this.tlsConfig = tlsConfig; } private void updateCommitInfosMap( @@ -145,7 +152,8 @@ public void connect() throws Exception { // maxOutstandingRequests so as to set the upper bound on max no of async // requests to be handled by raft client if (!client.compareAndSet(null, - RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) { + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, + maxOutstandingRequests, tlsConfig))) { throw new IllegalStateException("Client is already connected."); } } @@ -211,7 +219,8 @@ public long watchForCommit(long index, long timeout) // create a new RaftClient instance for watch request if (watchClient == null) { watchClient = - RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, + maxOutstandingRequests, tlsConfig); } CompletableFuture replyFuture = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); @@ -229,7 +238,8 @@ public long watchForCommit(long index, long timeout) // here once the watch request bypassing sliding window in Raft Client // gets fixed. watchClient = - RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, + maxOutstandingRequests, tlsConfig); reply = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java index bf1cc4dede..10a63fdf0c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java @@ -98,16 +98,17 @@ public class SecurityConfig { private final String publicKeyFileName; private final Duration certDuration; private final String x509SignatureAlgo; - private final Boolean grpcBlockTokenEnabled; + private final boolean grpcBlockTokenEnabled; private final String certificateDir; private final String certificateFileName; - private final Boolean grpcTlsEnabled; - private Boolean grpcTlsUseTestCert; + private final boolean grpcTlsEnabled; + private boolean grpcTlsUseTestCert; private String trustStoreFileName; private String serverCertChainFileName; private String clientCertChainFileName; private final Duration defaultCertDuration; private final boolean isSecurityEnabled; + private boolean grpcMutualTlsRequired; /** * Constructs a SecurityConfig. @@ -152,7 +153,10 @@ public SecurityConfig(Configuration configuration) { this.grpcTlsEnabled = this.configuration.getBoolean(HDDS_GRPC_TLS_ENABLED, HDDS_GRPC_TLS_ENABLED_DEFAULT); + if (grpcTlsEnabled) { + this.grpcMutualTlsRequired = configuration.getBoolean( + HDDS_GRPC_MUTUAL_TLS_REQUIRED, HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT); this.trustStoreFileName = this.configuration.get( HDDS_TRUST_STORE_FILE_NAME, HDDS_TRUST_STORE_FILE_NAME_DEFAULT); @@ -353,27 +357,24 @@ public Duration getMaxCertificateDuration() { return this.certDuration; } - public Boolean isGrpcBlockTokenEnabled() { + public boolean isGrpcBlockTokenEnabled() { return this.grpcBlockTokenEnabled; } /** * Returns true if TLS is enabled for gRPC services. - * @param conf configuration * @return true if TLS is enabled for gRPC services. */ - public static Boolean isGrpcTlsEnabled(Configuration conf) { - return conf.getBoolean(HDDS_GRPC_TLS_ENABLED, - HDDS_GRPC_TLS_ENABLED_DEFAULT); + public boolean isGrpcTlsEnabled() { + return this.grpcTlsEnabled; } /** * Returns true if TLS mutual authentication is enabled for gRPC services. * @return true if TLS is enabled for gRPC services. */ - public Boolean isGrpcMutualTlsRequired() { - return configuration.getBoolean(HDDS_GRPC_MUTUAL_TLS_REQUIRED, - HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT); + public boolean isGrpcMutualTlsRequired() { + return this.grpcMutualTlsRequired; } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 9fbf9833e6..31b9beed68 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -21,11 +21,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcFactory; +import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; @@ -33,6 +36,7 @@ import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.util.SizeInBytes; @@ -128,38 +132,77 @@ static RaftGroup newRaftGroup(Pipeline pipeline) { } static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, - RetryPolicy retryPolicy) throws IOException { + RetryPolicy retryPolicy, int maxOutStandingRequest, + GrpcTlsConfig tlsConfig) throws IOException { return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()), newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), - pipeline.getNodes()), retryPolicy); + pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, - RetryPolicy retryPolicy) { + RetryPolicy retryPolicy, int maxOutstandingRequests, + GrpcTlsConfig tlsConfig) { return newRaftClient(rpcType, leader.getId(), - newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy); + newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy, + maxOutstandingRequests, tlsConfig); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, - RaftGroup group, RetryPolicy retryPolicy) { - return newRaftClient(rpcType, leader.getId(), group, retryPolicy); + RetryPolicy retryPolicy, int maxOutstandingRequests) { + return newRaftClient(rpcType, leader.getId(), + newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy, + maxOutstandingRequests, null); } static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, - RaftGroup group, RetryPolicy retryPolicy) { + RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest, + GrpcTlsConfig tlsConfig) { LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); final RaftProperties properties = new RaftProperties(); RaftConfigKeys.Rpc.setType(properties, rpcType); GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)); - - return RaftClient.newBuilder() + GrpcConfigKeys.OutputStream.setOutstandingAppendsMax(properties, + maxOutStandingRequest); + RaftClient.Builder builder = RaftClient.newBuilder() .setRaftGroup(group) .setLeaderId(leader) .setProperties(properties) - .setRetryPolicy(retryPolicy) - .build(); + .setRetryPolicy(retryPolicy); + + // TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later. + if (tlsConfig != null && rpcType == SupportedRpcType.GRPC) { + builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig)); + } + return builder.build(); + } + + static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf) { + if (conf.isGrpcTlsEnabled()) { + if (conf.isGrpcMutualTlsRequired()) { + return new GrpcTlsConfig( + null, null, conf.getTrustStoreFile(), false); + } else { + return new GrpcTlsConfig(conf.getClientPrivateKeyFile(), + conf.getClientCertChainFile(), conf.getTrustStoreFile(), true); + } + } + return null; + } + + static GrpcTlsConfig createTlsServerConfig(SecurityConfig conf) { + if (conf.isGrpcTlsEnabled()) { + if (conf.isGrpcMutualTlsRequired()) { + return new GrpcTlsConfig( + conf.getServerPrivateKeyFile(), conf.getServerCertChainFile(), null, + false); + } else { + return new GrpcTlsConfig(conf.getServerPrivateKeyFile(), + conf.getServerCertChainFile(), conf.getClientCertChainFile(), true); + } + } + return null; } static RetryPolicy createRetryPolicy(Configuration conf) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 3092e1f17b..30c7c26f23 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -115,7 +115,7 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, nettyServerBuilder.addService(service); } - if (SecurityConfig.isGrpcTlsEnabled(conf)) { + if (secConfig.isGrpcTlsEnabled()) { File privateKeyFilePath = secConfig.getServerPrivateKeyFile(); File serverCertChainFilePath = secConfig.getServerCertChainFile(); File clientCertChainFilePath = secConfig.getClientCertChainFile(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 37aade76b0..ad27de1849 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -33,6 +33,7 @@ .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -44,6 +45,8 @@ import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcFactory; +import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.Message; @@ -66,6 +69,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.HEAD; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -107,9 +111,9 @@ private static long nextCallId() { private long nodeFailureTimeoutMs; private final long cacheEntryExpiryInteval; - private XceiverServerRatis(DatanodeDetails dd, int port, - ContainerDispatcher dispatcher, Configuration conf, StateContext context) + ContainerDispatcher dispatcher, Configuration conf, StateContext + context, GrpcTlsConfig tlsConfig) throws IOException { Objects.requireNonNull(dd, "id == null"); this.port = port; @@ -139,12 +143,14 @@ private XceiverServerRatis(DatanodeDetails dd, int port, for (int i = 0; i < numContainerOpExecutors; i++) { executors.add(Executors.newSingleThreadExecutor()); } - - this.server = RaftServer.newBuilder() + RaftServer.Builder builder = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) .setProperties(serverProperties) - .setStateMachineRegistry(this::getStateMachine) - .build(); + .setStateMachineRegistry(this::getStateMachine); + if (tlsConfig != null) { + builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig)); + } + this.server = builder.build(); } private ContainerStateMachine getStateMachine(RaftGroupId gid) { @@ -405,10 +411,12 @@ public static XceiverServerRatis newXceiverServerRatis( + "fallback to use default port {}", localPort, e); } } + GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig( + new SecurityConfig(ozoneConf)); datanodeDetails.setPort( DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort)); return new XceiverServerRatis(datanodeDetails, localPort, - dispatcher, ozoneConf, context); + dispatcher, ozoneConf, context, tlsConfig); } @Override diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index fdd7af879b..e9683f7136 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -242,8 +242,9 @@ private Container createContainer(final Configuration conf, final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails); final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, Collections.singleton(datanodeDetails)); - final RaftClient client = RatisHelper.newRaftClient( - SupportedRpcType.GRPC, peer, retryPolicy); + final int maxOutstandingRequests = 100; + final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC, + peer, retryPolicy, maxOutstandingRequests, null); Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess()); Thread.sleep(2000); final ContainerID containerId = ContainerID.valueof( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index e58b3ff9aa..58ca1fdbbd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -20,9 +20,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; @@ -135,9 +138,13 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); final RaftPeer p = RatisHelper.toRaftPeer(dn); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig( + new SecurityConfig(ozoneConf)); RaftClient client = RatisHelper .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy); + retryPolicy, maxOutstandingRequests, tlsConfig); client .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId()); } @@ -156,12 +163,16 @@ private static void callRatisRpc(List datanodes, final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); final List exceptions = Collections.synchronizedList(new ArrayList<>()); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new + SecurityConfig(ozoneConf)); datanodes.parallelStream().forEach(d -> { final RaftPeer p = RatisHelper.toRaftPeer(d); try (RaftClient client = RatisHelper .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy)) { + retryPolicy, maxOutstandingRequests, tlsConfig)) { rpc.accept(client, p); } catch (IOException ioe) { exceptions.add( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 0197304fe4..322339b7ea 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.rpc.RpcClient; @@ -41,6 +42,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.ratis.RatisHelper.newRaftClient; /** * Helpers for Ratis tests. @@ -122,8 +124,11 @@ static void initXceiverServerRatis( RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException { final RaftPeer p = RatisHelper.toRaftPeer(dd); final OzoneConfiguration conf = new OzoneConfiguration(); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(conf); final RaftClient client = - RatisHelper.newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf)); + newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), + maxOutstandingRequests); client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId()); } }