diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index aadec8dcd7..f4d4744d5a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -34,7 +34,6 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.protocol.TermIndex; @@ -84,7 +83,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -149,7 +147,6 @@ public class ContainerStateMachine extends BaseStateMachine { private final Cache stateMachineDataCache; private final boolean isBlockTokenEnabled; private final TokenVerifier tokenVerifier; - private final AtomicBoolean isStateMachineHealthy; private final Semaphore applyTransactionSemaphore; /** @@ -187,7 +184,6 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ScmConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT); applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions); - isStateMachineHealthy = new AtomicBoolean(true); this.executors = new ExecutorService[numContainerOpExecutors]; for (int i = 0; i < numContainerOpExecutors; i++) { final int index = i; @@ -269,14 +265,6 @@ public void persistContainerSet(OutputStream out) throws IOException { public long takeSnapshot() throws IOException { TermIndex ti = getLastAppliedTermIndex(); long startTime = Time.monotonicNow(); - if (!isStateMachineHealthy.get()) { - String msg = - "Failed to take snapshot " + " for " + gid + " as the stateMachine" - + " is unhealthy. The last applied index is at " + ti; - StateMachineException sme = new StateMachineException(msg); - LOG.error(msg); - throw sme; - } if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) { final File snapshotFile = storage.getSnapshotFile(ti.getTerm(), ti.getIndex()); @@ -287,12 +275,12 @@ public long takeSnapshot() throws IOException { // make sure the snapshot file is synced fos.getFD().sync(); } catch (IOException ioe) { - LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti, + LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti, snapshotFile); throw ioe; } - LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti, - snapshotFile, (Time.monotonicNow() - startTime)); + LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", + gid, ti, snapshotFile, (Time.monotonicNow() - startTime)); return ti.getIndex(); } return -1; @@ -397,12 +385,17 @@ private ContainerCommandResponseProto dispatchCommand( return response; } - private ContainerCommandResponseProto runCommand( + private ContainerCommandResponseProto runCommandGetResponse( ContainerCommandRequestProto requestProto, DispatcherContext context) { return dispatchCommand(requestProto, context); } + private Message runCommand(ContainerCommandRequestProto requestProto, + DispatcherContext context) { + return runCommandGetResponse(requestProto, context)::toByteString; + } + private ExecutorService getCommandExecutor( ContainerCommandRequestProto requestProto) { int executorId = (int)(requestProto.getContainerID() % executors.length); @@ -432,7 +425,7 @@ private CompletableFuture handleWriteChunk( // thread. CompletableFuture writeChunkFuture = CompletableFuture.supplyAsync(() -> - runCommand(requestProto, context), chunkExecutor); + runCommandGetResponse(requestProto, context), chunkExecutor); CompletableFuture raftFuture = new CompletableFuture<>(); @@ -509,8 +502,7 @@ public CompletableFuture query(Message request) { metrics.incNumQueryStateMachineOps(); final ContainerCommandRequestProto requestProto = getContainerCommandRequestProto(request.getContent()); - return CompletableFuture - .completedFuture(runCommand(requestProto, null)::toByteString); + return CompletableFuture.completedFuture(runCommand(requestProto, null)); } catch (IOException e) { metrics.incNumQueryStateMachineFails(); return completeExceptionally(e); @@ -682,58 +674,30 @@ public CompletableFuture applyTransaction(TransactionContext trx) { if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) { builder.setCreateContainerSet(createContainerSet); } - CompletableFuture applyTransactionFuture = - new CompletableFuture<>(); // Ensure the command gets executed in a separate thread than // stateMachineUpdater thread which is calling applyTransaction here. - CompletableFuture future = - CompletableFuture.supplyAsync( - () -> runCommand(requestProto, builder.build()), + CompletableFuture future = CompletableFuture + .supplyAsync(() -> runCommand(requestProto, builder.build()), getCommandExecutor(requestProto)); - future.thenApply(r -> { + + future.thenAccept(m -> { if (trx.getServerRole() == RaftPeerRole.LEADER) { long startTime = (long) trx.getStateMachineContext(); metrics.incPipelineLatency(cmdType, Time.monotonicNowNanos() - startTime); } - if (r.getResult() != ContainerProtos.Result.SUCCESS) { - StorageContainerException sce = - new StorageContainerException(r.getMessage(), r.getResult()); - LOG.error( - "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : " - + "{} Container Result: {}", gid, r.getCmdType(), index, - r.getMessage(), r.getResult()); - metrics.incNumApplyTransactionsFails(); - // Since the applyTransaction now is completed exceptionally, - // before any further snapshot is taken , the exception will be - // caught in stateMachineUpdater in Ratis and ratis server will - // shutdown. - applyTransactionFuture.completeExceptionally(sce); - isStateMachineHealthy.compareAndSet(true, false); - ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole()); - } else { - LOG.debug( - "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : " - + "{} Container Result: {}", gid, r.getCmdType(), index, - r.getMessage(), r.getResult()); - applyTransactionFuture.complete(r::toByteString); - if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) { - metrics.incNumBytesCommittedCount( - requestProto.getWriteChunk().getChunkData().getLen()); - } - // add the entry to the applyTransactionCompletionMap only if the - // stateMachine is healthy i.e, there has been no applyTransaction - // failures before. - if (isStateMachineHealthy.get()) { - final Long previous = applyTransactionCompletionMap + + final Long previous = + applyTransactionCompletionMap .put(index, trx.getLogEntry().getTerm()); - Preconditions.checkState(previous == null); - updateLastApplied(); - } + Preconditions.checkState(previous == null); + if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) { + metrics.incNumBytesCommittedCount( + requestProto.getWriteChunk().getChunkData().getLen()); } - return applyTransactionFuture; + updateLastApplied(); }).whenComplete((r, t) -> applyTransactionSemaphore.release()); - return applyTransactionFuture; + return future; } catch (IOException | InterruptedException e) { metrics.incNumApplyTransactionsFails(); return completeExceptionally(e); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index b4021cf657..f0ed28baf5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -609,15 +609,6 @@ void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) { handlePipelineFailure(groupId, roleInfoProto); } - void handleApplyTransactionFailure(RaftGroupId groupId, - RaftProtos.RaftPeerRole role) { - UUID dnId = RatisHelper.toDatanodeId(getServer().getId()); - String msg = - "Ratis Transaction failure in datanode " + dnId + " with role " + role - + " .Triggering pipeline close action."; - triggerPipelineClose(groupId, msg, - ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true); - } /** * The fact that the snapshot contents cannot be used to actually catch up * the follower, it is the reason to initiate close pipeline and diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 1d09dfa902..500735a35c 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -214,7 +214,6 @@ message ClosePipelineInfo { enum Reason { PIPELINE_FAILED = 1; PIPELINE_LOG_FAILED = 2; - STATEMACHINE_TRANSACTION_FAILED = 3; } required PipelineID pipelineID = 1; optional Reason reason = 3; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java index ffbc174f27..7fb639c6a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java @@ -742,8 +742,8 @@ public void cleanDirectory(INode.ReclaimContext reclaimContext, if (currentINode.isLastReference()) { // if this is the last reference, the created list can be // destroyed. - // priorDiff.getChildrenDiff().destroyCreatedList( - // reclaimContext, currentINode); + priorDiff.getChildrenDiff().destroyCreatedList( + reclaimContext, currentINode); } else { // we only check the node originally in prior's created list for (INode cNode : priorDiff.diff.getCreatedUnmodifiable()) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 86621d6b16..469eeb0ade 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -22,32 +22,23 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.protocol.RaftRetryFailureException; -import org.apache.ratis.protocol.StateMachineException; -import org.apache.ratis.server.storage.FileInfo; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -55,7 +46,6 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -64,8 +54,7 @@ HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys. HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - ContainerDataProto.State.UNHEALTHY; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys. HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys. @@ -88,7 +77,7 @@ public class TestContainerStateMachineFailures { private static String volumeName; private static String bucketName; private static String path; - private static XceiverClientManager xceiverClientManager; + private static int chunkSize; /** * Create a MiniDFSCluster for testing. @@ -112,11 +101,6 @@ public static void init() throws Exception { conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); - conf.setTimeDuration( - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, - 1, TimeUnit.SECONDS); - conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200) @@ -125,7 +109,6 @@ public static void init() throws Exception { //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); objectStore = client.getObjectStore(); - xceiverClientManager = new XceiverClientManager(conf); volumeName = "testcontainerstatemachinefailures"; bucketName = volumeName; objectStore.createVolume(volumeName); @@ -149,10 +132,19 @@ public void testContainerStateMachineFailures() throws Exception { .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); byte[] testData = "ratis".getBytes(); + long written = 0; // First write and flush creates a container in the datanode key.write(testData); + written += testData.length; key.flush(); key.write(testData); + written += testData.length; + + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). + setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis") + .build(); KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); List locationInfoList = @@ -165,14 +157,7 @@ public void testContainerStateMachineFailures() throws Exception { .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - try { - // there is only 1 datanode in the pipeline, the pipeline will be closed - // and allocation to new pipeline will fail as there is no other dn in - // the cluster - key.close(); - } catch(IOException ioe) { - Assert.assertTrue(ioe instanceof OMException); - } + key.close(); long containerID = omKeyLocationInfo.getContainerID(); // Make sure the container is marked unhealthy @@ -194,6 +179,22 @@ public void testContainerStateMachineFailures() throws Exception { .getDatanodeStateMachine().getContainer(); Assert .assertNull(ozoneContainer.getContainerSet().getContainer(containerID)); + + OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName) + .getBucket(bucketName).getKey("ratis"); + + /** + * Ensure length of data stored in key is equal to number of bytes written. + */ + Assert.assertTrue("Number of bytes stored in the key is not equal " + + "to number of bytes written.", keyDetails.getDataSize() == written); + + /** + * Pending data from the second write should get written to a new container + * during key.close() because the first container is UNHEALTHY by that time + */ + Assert.assertTrue("Expect Key to be stored in 2 separate containers", + keyDetails.getOzoneKeyLocations().size() == 2); } @Test @@ -206,6 +207,12 @@ public void testUnhealthyContainer() throws Exception { key.write("ratis".getBytes()); key.flush(); key.write("ratis".getBytes()); + + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). + setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis") + .build(); KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); List locationInfoList = groupOutputStream.getLocationInfoList(); @@ -221,14 +228,8 @@ public void testUnhealthyContainer() throws Exception { (KeyValueContainerData) containerData; // delete the container db file FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath())); - try { - // there is only 1 datanode in the pipeline, the pipeline will be closed - // and allocation to new pipeline will fail as there is no other dn in - // the cluster - key.close(); - } catch(IOException ioe) { - Assert.assertTrue(ioe instanceof OMException); - } + + key.close(); long containerID = omKeyLocationInfo.getContainerID(); @@ -269,83 +270,4 @@ public void testUnhealthyContainer() throws Exception { Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, dispatcher.dispatch(request.build(), null).getResult()); } - - @Test - public void testApplyTransactionFailure() throws Exception { - OzoneOutputStream key = - objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey("ratis", 1024, ReplicationType.RATIS, - ReplicationFactor.ONE, new HashMap<>()); - // First write and flush creates a container in the datanode - key.write("ratis".getBytes()); - key.flush(); - key.write("ratis".getBytes()); - KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); - List locationInfoList = - groupOutputStream.getLocationInfoList(); - Assert.assertEquals(1, locationInfoList.size()); - OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); - ContainerData containerData = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet() - .getContainer(omKeyLocationInfo.getContainerID()) - .getContainerData(); - Assert.assertTrue(containerData instanceof KeyValueContainerData); - KeyValueContainerData keyValueContainerData = - (KeyValueContainerData) containerData; - key.close(); - ContainerStateMachine stateMachine = - (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster); - SimpleStateMachineStorage storage = - (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); - Path parentPath = storage.findLatestSnapshot().getFile().getPath(); - // Since the snapshot threshold is set to 1, since there are - // applyTransactions, we should see snapshots - Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0); - FileInfo snapshot = storage.findLatestSnapshot().getFile(); - Assert.assertNotNull(snapshot); - long containerID = omKeyLocationInfo.getContainerID(); - // delete the container db file - FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath())); - Pipeline pipeline = cluster.getStorageContainerLocationClient() - .getContainerWithPipeline(containerID).getPipeline(); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(pipeline); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); - request.setCmdType(ContainerProtos.Type.CloseContainer); - request.setContainerID(containerID); - request.setCloseContainer( - ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - // close container transaction will fail over Ratis and will initiate - // a pipeline close action - - // Since the applyTransaction failure is propagated to Ratis, - // stateMachineUpdater will it exception while taking the next snapshot - // and should shutdown the RaftServerImpl. The client request will fail - // with RaftRetryFailureException. - try { - xceiverClient.sendCommand(request.build()); - Assert.fail("Expected exception not thrown"); - } catch (IOException e) { - Assert.assertTrue(HddsClientUtils - .checkForException(e) instanceof RaftRetryFailureException); - } - // Make sure the container is marked unhealthy - Assert.assertTrue( - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet().getContainer(containerID) - .getContainerState() - == ContainerProtos.ContainerDataProto.State.UNHEALTHY); - try { - // try to take a new snapshot, ideally it should just fail - stateMachine.takeSnapshot(); - } catch (IOException ioe) { - Assert.assertTrue(ioe instanceof StateMachineException); - } - // Make sure the latest snapshot is same as the previous one - FileInfo latestSnapshot = storage.findLatestSnapshot().getFile(); - Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath())); - } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 82d34d74e1..4da190762b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -73,10 +73,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; -import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.Assert; import org.slf4j.Logger; @@ -870,16 +866,4 @@ public static void waitForContainerClose(MiniOzoneCluster cluster, index++; } } - - public static StateMachine getStateMachine(MiniOzoneCluster cluster) - throws Exception { - XceiverServerSpi server = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). - getContainer().getWriteChannel(); - RaftServerProxy proxy = - (RaftServerProxy) (((XceiverServerRatis) server).getServer()); - RaftGroupId groupId = proxy.getGroupIds().iterator().next(); - RaftServerImpl impl = proxy.getImpl(groupId); - return impl.getStateMachine(); - } } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java index 545f2b36ad..8ed3960287 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java @@ -22,7 +22,13 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.transport + .server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .XceiverServerRatis; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; @@ -121,6 +127,13 @@ private void startFreon() throws Exception { } private StateMachine getStateMachine() throws Exception { - return ContainerTestHelper.getStateMachine(cluster); + XceiverServerSpi server = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). + getContainer().getWriteChannel(); + RaftServerProxy proxy = + (RaftServerProxy)(((XceiverServerRatis)server).getServer()); + RaftGroupId groupId = proxy.getGroupIds().iterator().next(); + RaftServerImpl impl = proxy.getImpl(groupId); + return impl.getStateMachine(); } }