From 66cfa482c450320f7326b2568703bae0d4b39e3c Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 27 Aug 2019 23:38:43 +0530 Subject: [PATCH] HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee(#1226). --- .../server/ratis/ContainerStateMachine.java | 84 +++++++--- .../server/ratis/XceiverServerRatis.java | 9 + .../StorageContainerDatanodeProtocol.proto | 1 + .../TestContainerStateMachineFailures.java | 156 +++++++++++++----- .../ozone/container/ContainerTestHelper.java | 16 ++ .../TestFreonWithDatanodeFastRestart.java | 17 +- 6 files changed, 205 insertions(+), 78 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index f4d4744d5a..aadec8dcd7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -34,6 +34,7 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.protocol.TermIndex; @@ -83,6 +84,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -147,6 +149,7 @@ public class ContainerStateMachine extends BaseStateMachine { private final Cache stateMachineDataCache; private final boolean isBlockTokenEnabled; private final TokenVerifier tokenVerifier; + private final AtomicBoolean isStateMachineHealthy; private final Semaphore applyTransactionSemaphore; /** @@ -184,6 +187,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ScmConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT); applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions); + isStateMachineHealthy = new AtomicBoolean(true); this.executors = new ExecutorService[numContainerOpExecutors]; for (int i = 0; i < numContainerOpExecutors; i++) { final int index = i; @@ -265,6 +269,14 @@ public void persistContainerSet(OutputStream out) throws IOException { public long takeSnapshot() throws IOException { TermIndex ti = getLastAppliedTermIndex(); long startTime = Time.monotonicNow(); + if (!isStateMachineHealthy.get()) { + String msg = + "Failed to take snapshot " + " for " + gid + " as the stateMachine" + + " is unhealthy. The last applied index is at " + ti; + StateMachineException sme = new StateMachineException(msg); + LOG.error(msg); + throw sme; + } if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) { final File snapshotFile = storage.getSnapshotFile(ti.getTerm(), ti.getIndex()); @@ -275,12 +287,12 @@ public long takeSnapshot() throws IOException { // make sure the snapshot file is synced fos.getFD().sync(); } catch (IOException ioe) { - LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti, + LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti, snapshotFile); throw ioe; } - LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", - gid, ti, snapshotFile, (Time.monotonicNow() - startTime)); + LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti, + snapshotFile, (Time.monotonicNow() - startTime)); return ti.getIndex(); } return -1; @@ -385,17 +397,12 @@ private ContainerCommandResponseProto dispatchCommand( return response; } - private ContainerCommandResponseProto runCommandGetResponse( + private ContainerCommandResponseProto runCommand( ContainerCommandRequestProto requestProto, DispatcherContext context) { return dispatchCommand(requestProto, context); } - private Message runCommand(ContainerCommandRequestProto requestProto, - DispatcherContext context) { - return runCommandGetResponse(requestProto, context)::toByteString; - } - private ExecutorService getCommandExecutor( ContainerCommandRequestProto requestProto) { int executorId = (int)(requestProto.getContainerID() % executors.length); @@ -425,7 +432,7 @@ private CompletableFuture handleWriteChunk( // thread. CompletableFuture writeChunkFuture = CompletableFuture.supplyAsync(() -> - runCommandGetResponse(requestProto, context), chunkExecutor); + runCommand(requestProto, context), chunkExecutor); CompletableFuture raftFuture = new CompletableFuture<>(); @@ -502,7 +509,8 @@ public CompletableFuture query(Message request) { metrics.incNumQueryStateMachineOps(); final ContainerCommandRequestProto requestProto = getContainerCommandRequestProto(request.getContent()); - return CompletableFuture.completedFuture(runCommand(requestProto, null)); + return CompletableFuture + .completedFuture(runCommand(requestProto, null)::toByteString); } catch (IOException e) { metrics.incNumQueryStateMachineFails(); return completeExceptionally(e); @@ -674,30 +682,58 @@ public CompletableFuture applyTransaction(TransactionContext trx) { if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) { builder.setCreateContainerSet(createContainerSet); } + CompletableFuture applyTransactionFuture = + new CompletableFuture<>(); // Ensure the command gets executed in a separate thread than // stateMachineUpdater thread which is calling applyTransaction here. - CompletableFuture future = CompletableFuture - .supplyAsync(() -> runCommand(requestProto, builder.build()), + CompletableFuture future = + CompletableFuture.supplyAsync( + () -> runCommand(requestProto, builder.build()), getCommandExecutor(requestProto)); - - future.thenAccept(m -> { + future.thenApply(r -> { if (trx.getServerRole() == RaftPeerRole.LEADER) { long startTime = (long) trx.getStateMachineContext(); metrics.incPipelineLatency(cmdType, Time.monotonicNowNanos() - startTime); } - - final Long previous = - applyTransactionCompletionMap + if (r.getResult() != ContainerProtos.Result.SUCCESS) { + StorageContainerException sce = + new StorageContainerException(r.getMessage(), r.getResult()); + LOG.error( + "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : " + + "{} Container Result: {}", gid, r.getCmdType(), index, + r.getMessage(), r.getResult()); + metrics.incNumApplyTransactionsFails(); + // Since the applyTransaction now is completed exceptionally, + // before any further snapshot is taken , the exception will be + // caught in stateMachineUpdater in Ratis and ratis server will + // shutdown. + applyTransactionFuture.completeExceptionally(sce); + isStateMachineHealthy.compareAndSet(true, false); + ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole()); + } else { + LOG.debug( + "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : " + + "{} Container Result: {}", gid, r.getCmdType(), index, + r.getMessage(), r.getResult()); + applyTransactionFuture.complete(r::toByteString); + if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) { + metrics.incNumBytesCommittedCount( + requestProto.getWriteChunk().getChunkData().getLen()); + } + // add the entry to the applyTransactionCompletionMap only if the + // stateMachine is healthy i.e, there has been no applyTransaction + // failures before. + if (isStateMachineHealthy.get()) { + final Long previous = applyTransactionCompletionMap .put(index, trx.getLogEntry().getTerm()); - Preconditions.checkState(previous == null); - if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) { - metrics.incNumBytesCommittedCount( - requestProto.getWriteChunk().getChunkData().getLen()); + Preconditions.checkState(previous == null); + updateLastApplied(); + } } - updateLastApplied(); + return applyTransactionFuture; }).whenComplete((r, t) -> applyTransactionSemaphore.release()); - return future; + return applyTransactionFuture; } catch (IOException | InterruptedException e) { metrics.incNumApplyTransactionsFails(); return completeExceptionally(e); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f0ed28baf5..b4021cf657 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -609,6 +609,15 @@ void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) { handlePipelineFailure(groupId, roleInfoProto); } + void handleApplyTransactionFailure(RaftGroupId groupId, + RaftProtos.RaftPeerRole role) { + UUID dnId = RatisHelper.toDatanodeId(getServer().getId()); + String msg = + "Ratis Transaction failure in datanode " + dnId + " with role " + role + + " .Triggering pipeline close action."; + triggerPipelineClose(groupId, msg, + ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true); + } /** * The fact that the snapshot contents cannot be used to actually catch up * the follower, it is the reason to initiate close pipeline and diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 500735a35c..1d09dfa902 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -214,6 +214,7 @@ message ClosePipelineInfo { enum Reason { PIPELINE_FAILED = 1; PIPELINE_LOG_FAILED = 2; + STATEMACHINE_TRANSACTION_FAILED = 3; } required PipelineID pipelineID = 1; optional Reason reason = 3; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 469eeb0ade..86621d6b16 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -22,23 +22,32 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.RaftRetryFailureException; +import org.apache.ratis.protocol.StateMachineException; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -46,6 +55,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -54,7 +64,8 @@ HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys. HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + ContainerDataProto.State.UNHEALTHY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys. HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys. @@ -77,7 +88,7 @@ public class TestContainerStateMachineFailures { private static String volumeName; private static String bucketName; private static String path; - private static int chunkSize; + private static XceiverClientManager xceiverClientManager; /** * Create a MiniDFSCluster for testing. @@ -101,6 +112,11 @@ public static void init() throws Exception { conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10, TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200) @@ -109,6 +125,7 @@ public static void init() throws Exception { //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); objectStore = client.getObjectStore(); + xceiverClientManager = new XceiverClientManager(conf); volumeName = "testcontainerstatemachinefailures"; bucketName = volumeName; objectStore.createVolume(volumeName); @@ -132,19 +149,10 @@ public void testContainerStateMachineFailures() throws Exception { .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); byte[] testData = "ratis".getBytes(); - long written = 0; // First write and flush creates a container in the datanode key.write(testData); - written += testData.length; key.flush(); key.write(testData); - written += testData.length; - - //get the name of a valid container - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). - setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) - .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis") - .build(); KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); List locationInfoList = @@ -157,7 +165,14 @@ public void testContainerStateMachineFailures() throws Exception { .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - key.close(); + try { + // there is only 1 datanode in the pipeline, the pipeline will be closed + // and allocation to new pipeline will fail as there is no other dn in + // the cluster + key.close(); + } catch(IOException ioe) { + Assert.assertTrue(ioe instanceof OMException); + } long containerID = omKeyLocationInfo.getContainerID(); // Make sure the container is marked unhealthy @@ -179,22 +194,6 @@ public void testContainerStateMachineFailures() throws Exception { .getDatanodeStateMachine().getContainer(); Assert .assertNull(ozoneContainer.getContainerSet().getContainer(containerID)); - - OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName) - .getBucket(bucketName).getKey("ratis"); - - /** - * Ensure length of data stored in key is equal to number of bytes written. - */ - Assert.assertTrue("Number of bytes stored in the key is not equal " + - "to number of bytes written.", keyDetails.getDataSize() == written); - - /** - * Pending data from the second write should get written to a new container - * during key.close() because the first container is UNHEALTHY by that time - */ - Assert.assertTrue("Expect Key to be stored in 2 separate containers", - keyDetails.getOzoneKeyLocations().size() == 2); } @Test @@ -207,12 +206,6 @@ public void testUnhealthyContainer() throws Exception { key.write("ratis".getBytes()); key.flush(); key.write("ratis".getBytes()); - - //get the name of a valid container - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). - setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) - .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis") - .build(); KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); List locationInfoList = groupOutputStream.getLocationInfoList(); @@ -228,8 +221,14 @@ public void testUnhealthyContainer() throws Exception { (KeyValueContainerData) containerData; // delete the container db file FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath())); - - key.close(); + try { + // there is only 1 datanode in the pipeline, the pipeline will be closed + // and allocation to new pipeline will fail as there is no other dn in + // the cluster + key.close(); + } catch(IOException ioe) { + Assert.assertTrue(ioe instanceof OMException); + } long containerID = omKeyLocationInfo.getContainerID(); @@ -270,4 +269,83 @@ public void testUnhealthyContainer() throws Exception { Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, dispatcher.dispatch(request.build(), null).getResult()); } + + @Test + public void testApplyTransactionFailure() throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis", 1024, ReplicationType.RATIS, + ReplicationFactor.ONE, new HashMap<>()); + // First write and flush creates a container in the datanode + key.write("ratis".getBytes()); + key.flush(); + key.write("ratis".getBytes()); + KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + ContainerData containerData = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()) + .getContainerData(); + Assert.assertTrue(containerData instanceof KeyValueContainerData); + KeyValueContainerData keyValueContainerData = + (KeyValueContainerData) containerData; + key.close(); + ContainerStateMachine stateMachine = + (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster); + SimpleStateMachineStorage storage = + (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); + Path parentPath = storage.findLatestSnapshot().getFile().getPath(); + // Since the snapshot threshold is set to 1, since there are + // applyTransactions, we should see snapshots + Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0); + FileInfo snapshot = storage.findLatestSnapshot().getFile(); + Assert.assertNotNull(snapshot); + long containerID = omKeyLocationInfo.getContainerID(); + // delete the container db file + FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath())); + Pipeline pipeline = cluster.getStorageContainerLocationClient() + .getContainerWithPipeline(containerID).getPipeline(); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + // close container transaction will fail over Ratis and will initiate + // a pipeline close action + + // Since the applyTransaction failure is propagated to Ratis, + // stateMachineUpdater will it exception while taking the next snapshot + // and should shutdown the RaftServerImpl. The client request will fail + // with RaftRetryFailureException. + try { + xceiverClient.sendCommand(request.build()); + Assert.fail("Expected exception not thrown"); + } catch (IOException e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof RaftRetryFailureException); + } + // Make sure the container is marked unhealthy + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet().getContainer(containerID) + .getContainerState() + == ContainerProtos.ContainerDataProto.State.UNHEALTHY); + try { + // try to take a new snapshot, ideally it should just fail + stateMachine.takeSnapshot(); + } catch (IOException ioe) { + Assert.assertTrue(ioe instanceof StateMachineException); + } + // Make sure the latest snapshot is same as the previous one + FileInfo latestSnapshot = storage.findLatestSnapshot().getFile(); + Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath())); + } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 4da190762b..82d34d74e1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -73,6 +73,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.Assert; import org.slf4j.Logger; @@ -866,4 +870,16 @@ public static void waitForContainerClose(MiniOzoneCluster cluster, index++; } } + + public static StateMachine getStateMachine(MiniOzoneCluster cluster) + throws Exception { + XceiverServerSpi server = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). + getContainer().getWriteChannel(); + RaftServerProxy proxy = + (RaftServerProxy) (((XceiverServerRatis) server).getServer()); + RaftGroupId groupId = proxy.getGroupIds().iterator().next(); + RaftServerImpl impl = proxy.getImpl(groupId); + return impl.getStateMachine(); + } } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java index 8ed3960287..545f2b36ad 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java @@ -22,13 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.container.common.transport - .server.XceiverServerSpi; -import org.apache.hadoop.ozone.container.common.transport.server.ratis - .XceiverServerRatis; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; @@ -127,13 +121,6 @@ private void startFreon() throws Exception { } private StateMachine getStateMachine() throws Exception { - XceiverServerSpi server = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). - getContainer().getWriteChannel(); - RaftServerProxy proxy = - (RaftServerProxy)(((XceiverServerRatis)server).getServer()); - RaftGroupId groupId = proxy.getGroupIds().iterator().next(); - RaftServerImpl impl = proxy.getImpl(groupId); - return impl.getStateMachine(); + return ContainerTestHelper.getStateMachine(cluster); } }