diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 7e4d481735..5c3fa859a6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -22,11 +22,13 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; @@ -82,6 +84,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.OutputStream; @@ -139,7 +142,6 @@ public class ContainerStateMachine extends BaseStateMachine { // keeps track of the containers created per pipeline private final Set createContainerSet; private ExecutorService[] executors; - private final int numExecutors; private final Map applyTransactionCompletionMap; private final Cache stateMachineDataCache; private final boolean isBlockTokenEnabled; @@ -152,15 +154,13 @@ public class ContainerStateMachine extends BaseStateMachine { @SuppressWarnings("parameternumber") public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer, - List executors, long expiryInterval, - boolean isBlockTokenEnabled, TokenVerifier tokenVerifier) { + long expiryInterval, boolean isBlockTokenEnabled, + TokenVerifier tokenVerifier, Configuration conf) { this.gid = gid; this.dispatcher = dispatcher; this.chunkExecutor = chunkExecutor; this.ratisServer = ratisServer; metrics = CSMMetrics.create(gid); - this.numExecutors = executors.size(); - this.executors = executors.toArray(new ExecutorService[numExecutors]); this.writeChunkFutureMap = new ConcurrentHashMap<>(); applyTransactionCompletionMap = new ConcurrentHashMap<>(); stateMachineDataCache = CacheBuilder.newBuilder() @@ -171,6 +171,19 @@ public class ContainerStateMachine extends BaseStateMachine { this.isBlockTokenEnabled = isBlockTokenEnabled; this.tokenVerifier = tokenVerifier; this.createContainerSet = new ConcurrentSkipListSet<>(); + + final int numContainerOpExecutors = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT); + this.executors = new ExecutorService[numContainerOpExecutors]; + for (int i = 0; i < numContainerOpExecutors; i++) { + final int index = i; + this.executors[index] = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r); + t.setName("RatisApplyTransactionExecutor " + index); + return t; + }); + } } @Override @@ -367,7 +380,7 @@ public class ContainerStateMachine extends BaseStateMachine { private ExecutorService getCommandExecutor( ContainerCommandRequestProto requestProto) { - int executorId = (int)(requestProto.getContainerID() % numExecutors); + int executorId = (int)(requestProto.getContainerID() % executors.length); return executors[executorId]; } @@ -700,5 +713,8 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { evictStateMachineCache(); + for (ExecutorService executor : executors) { + executor.shutdown(); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 72f6ab4887..1ae545657c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -81,8 +81,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -102,7 +100,6 @@ public final class XceiverServerRatis extends XceiverServer { private int port; private final RaftServer server; private ThreadPoolExecutor chunkExecutor; - private final List executors; private final ContainerDispatcher dispatcher; private ClientId clientId = ClientId.randomId(); private final StateContext context; @@ -111,16 +108,18 @@ public final class XceiverServerRatis extends XceiverServer { private final long cacheEntryExpiryInteval; private boolean isStarted = false; private DatanodeDetails datanodeDetails; + private final Configuration conf; private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext context, GrpcTlsConfig tlsConfig, CertificateClient caClient) throws IOException { super(conf, caClient); + this.conf = conf; Objects.requireNonNull(dd, "id == null"); datanodeDetails = dd; this.port = port; - RaftProperties serverProperties = newRaftProperties(conf); + RaftProperties serverProperties = newRaftProperties(); final int numWriteChunkThreads = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); @@ -129,23 +128,16 @@ public final class XceiverServerRatis extends XceiverServer { 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy()); - final int numContainerOpExecutors = conf.getInt( - OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT); this.context = context; this.replicationLevel = conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT); - this.executors = new ArrayList<>(); cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL, OzoneConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); this.dispatcher = dispatcher; - for (int i = 0; i < numContainerOpExecutors; i++) { - executors.add(Executors.newSingleThreadExecutor()); - } RaftServer.Builder builder = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) @@ -159,22 +151,22 @@ public final class XceiverServerRatis extends XceiverServer { private ContainerStateMachine getStateMachine(RaftGroupId gid) { return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this, - Collections.unmodifiableList(executors), cacheEntryExpiryInteval, - getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier()); + cacheEntryExpiryInteval, getSecurityConfig().isBlockTokenEnabled(), + getBlockTokenVerifier(), conf); } - private RaftProperties newRaftProperties(Configuration conf) { + private RaftProperties newRaftProperties() { final RaftProperties properties = new RaftProperties(); // Set rpc type - final RpcType rpc = setRpcType(conf, properties); + final RpcType rpc = setRpcType(properties); // set raft segment size - setRaftSegmentSize(conf, properties); + setRaftSegmentSize(properties); // set raft segment pre-allocated size final int raftSegmentPreallocatedSize = - setRaftSegmentPreallocatedSize(conf, properties); + setRaftSegmentPreallocatedSize(properties); // Set max write buffer size, which is the scm chunk size final int maxChunkSize = setMaxWriteBuffer(properties); @@ -196,19 +188,19 @@ public final class XceiverServerRatis extends XceiverServer { .setSyncTimeout(properties, dataSyncTimeout); // Set the server Request timeout - setServerRequestTimeout(conf, properties); + setServerRequestTimeout(properties); // set timeout for a retry cache entry - setTimeoutForRetryCache(conf, properties); + setTimeoutForRetryCache(properties); // Set the ratis leader election timeout - setRatisLeaderElectionTimeout(conf, properties); + setRatisLeaderElectionTimeout(properties); // Set the maximum cache segments RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); // set the node failure timeout - setNodeFailureTimeout(conf, properties); + setNodeFailureTimeout(properties); // Set the ratis storage directory String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf); @@ -266,8 +258,7 @@ public final class XceiverServerRatis extends XceiverServer { return properties; } - private void setNodeFailureTimeout(Configuration conf, - RaftProperties properties) { + private void setNodeFailureTimeout(RaftProperties properties) { TimeUnit timeUnit; long duration; timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT @@ -285,8 +276,7 @@ public final class XceiverServerRatis extends XceiverServer { nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS); } - private void setRatisLeaderElectionTimeout(Configuration conf, - RaftProperties properties) { + private void setRatisLeaderElectionTimeout(RaftProperties properties) { long duration; TimeUnit leaderElectionMinTimeoutUnit = OzoneConfigKeys. @@ -307,8 +297,7 @@ public final class XceiverServerRatis extends XceiverServer { TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); } - private void setTimeoutForRetryCache(Configuration conf, - RaftProperties properties) { + private void setTimeoutForRetryCache(RaftProperties properties) { TimeUnit timeUnit; long duration; timeUnit = @@ -324,8 +313,7 @@ public final class XceiverServerRatis extends XceiverServer { .setExpiryTime(properties, retryCacheTimeout); } - private void setServerRequestTimeout(Configuration conf, - RaftProperties properties) { + private void setServerRequestTimeout(RaftProperties properties) { TimeUnit timeUnit; long duration; timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT @@ -347,8 +335,7 @@ public final class XceiverServerRatis extends XceiverServer { return maxChunkSize; } - private int setRaftSegmentPreallocatedSize(Configuration conf, - RaftProperties properties) { + private int setRaftSegmentPreallocatedSize(RaftProperties properties) { final int raftSegmentPreallocatedSize = (int) conf.getStorageSize( OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT, @@ -371,8 +358,7 @@ public final class XceiverServerRatis extends XceiverServer { return raftSegmentPreallocatedSize; } - private void setRaftSegmentSize(Configuration conf, - RaftProperties properties) { + private void setRaftSegmentSize(RaftProperties properties) { final int raftSegmentSize = (int)conf.getStorageSize( OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT, @@ -381,7 +367,7 @@ public final class XceiverServerRatis extends XceiverServer { SizeInBytes.valueOf(raftSegmentSize)); } - private RpcType setRpcType(Configuration conf, RaftProperties properties) { + private RpcType setRpcType(RaftProperties properties) { final String rpcType = conf.get( OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); @@ -447,7 +433,6 @@ public final class XceiverServerRatis extends XceiverServer { // some of the tasks would be executed using the executors. server.close(); chunkExecutor.shutdown(); - executors.forEach(ExecutorService::shutdown); isStarted = false; } catch (IOException e) { throw new RuntimeException(e);