From 7f9073132dcc9db157a6792635d2ed099f2ef0d2 Mon Sep 17 00:00:00 2001 From: bshashikant Date: Tue, 17 Sep 2019 16:49:25 +0530 Subject: [PATCH] HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430) --- .../container/common/impl/HddsDispatcher.java | 9 ++++-- .../server/ratis/ContainerStateMachine.java | 31 ++++++++++++++----- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index e95d899db4..37e19bc28b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -236,9 +236,7 @@ private ContainerCommandResponseProto dispatchRequest( if (container2BCSIDMap != null) { // adds this container to list of containers created in the pipeline // with initial BCSID recorded as 0. - Preconditions - .checkArgument(!container2BCSIDMap.containsKey(containerID)); - container2BCSIDMap.put(containerID, Long.valueOf(0)); + container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0)); } container = getContainer(containerID); } @@ -290,6 +288,11 @@ private ContainerCommandResponseProto dispatchRequest( // state here. Result result = responseProto.getResult(); + if (cmdType == ContainerProtos.Type.CreateContainer + && result == Result.SUCCESS && dispatcherContext != null) { + Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap()); + container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0)); + } if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) { // If the container is open/closing and the container operation // has failed, it should be first marked unhealthy and the initiate the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index cee9741c4f..c6ab0a1090 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -435,13 +435,20 @@ private CompletableFuture handleWriteChunk( .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) .setContainer2BCSIDMap(container2BCSIDMap) .build(); + CompletableFuture raftFuture = new CompletableFuture<>(); // ensure the write chunk happens asynchronously in writeChunkExecutor pool // thread. CompletableFuture writeChunkFuture = - CompletableFuture.supplyAsync(() -> - runCommand(requestProto, context), chunkExecutor); - - CompletableFuture raftFuture = new CompletableFuture<>(); + CompletableFuture.supplyAsync(() -> { + try { + return runCommand(requestProto, context); + } catch (Exception e) { + LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + + write.getChunkData().getChunkName() + e); + raftFuture.completeExceptionally(e); + throw e; + }}, chunkExecutor); writeChunkFutureMap.put(entryIndex, writeChunkFuture); LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " + @@ -698,7 +705,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA); } if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile - || cmdType == Type.PutBlock) { + || cmdType == Type.PutBlock || cmdType == Type.CreateContainer) { builder.setContainer2BCSIDMap(container2BCSIDMap); } CompletableFuture applyTransactionFuture = @@ -706,9 +713,17 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // Ensure the command gets executed in a separate thread than // stateMachineUpdater thread which is calling applyTransaction here. CompletableFuture future = - CompletableFuture.supplyAsync( - () -> runCommand(requestProto, builder.build()), - getCommandExecutor(requestProto)); + CompletableFuture.supplyAsync(() -> { + try { + return runCommand(requestProto, builder.build()); + } catch (Exception e) { + LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex " + + "{} exception {}", gid, requestProto.getCmdType(), + index, e); + applyTransactionFuture.completeExceptionally(e); + throw e; + } + }, getCommandExecutor(requestProto)); future.thenApply(r -> { if (trx.getServerRole() == RaftPeerRole.LEADER) { long startTime = (long) trx.getStateMachineContext();