diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index d6c7937fab..3551ceb700 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -20,14 +20,17 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; import org.apache.ratis.RatisHelper; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,13 +97,48 @@ private RaftClient getClient() { return Objects.requireNonNull(client.get(), "client is null"); } + private boolean isReadOnly(ContainerCommandRequestProto proto) { + switch (proto.getCmdType()) { + case ReadContainer: + case ReadChunk: + case ListKey: + case GetKey: + case GetSmallFile: + case ListContainer: + case ListChunk: + return true; + case CloseContainer: + case WriteChunk: + case UpdateContainer: + case CompactChunk: + case CreateContainer: + case DeleteChunk: + case DeleteContainer: + case DeleteKey: + case PutKey: + case PutSmallFile: + default: + return false; + } + } + + private RaftClientReply sendRequest(ContainerCommandRequestProto request) + throws IOException { + boolean isReadOnlyRequest = isReadOnly(request); + ByteString byteString = + ShadedProtoUtil.asShadedByteString(request.toByteArray()); + LOG.debug("sendCommand {} {}", isReadOnlyRequest, request); + final RaftClientReply reply = isReadOnlyRequest ? + getClient().sendReadOnly(() -> byteString) : + getClient().send(() -> byteString); + LOG.debug("reply {} {}", isReadOnlyRequest, reply); + return reply; + } + @Override public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { - LOG.debug("sendCommand {}", request); - final RaftClientReply reply = getClient().send( - () -> ShadedProtoUtil.asShadedByteString(request.toByteArray())); - LOG.debug("reply {}", reply); + final RaftClientReply reply = sendRequest(request); Preconditions.checkState(reply.isSuccess()); return ContainerCommandResponseProto.parseFrom( ShadedProtoUtil.asByteString(reply.getMessage().getContent())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b60cf91937..68ffb4440f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1581,6 +1581,15 @@ public OzoneContainer getOzoneContainerManager() { return this.datanodeStateMachine.getContainer(); } + @VisibleForTesting + public DatanodeStateMachine.DatanodeStates getOzoneStateMachineState() { + if (this.datanodeStateMachine != null) { + return this.datanodeStateMachine.getContext().getState(); + } + // if the state machine doesn't exist then DN initialization is in progress + return DatanodeStateMachine.DatanodeStates.INIT; + } + /** * After the block pool has contacted the NN, registers that block pool * with the secret manager, updating it with the secrets provided by the NN. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index e0b6ccb3f3..3e90a343f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -27,6 +27,8 @@ import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.container.common + .statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.ksm.KSMConfigKeys; import org.apache.hadoop.ozone.ksm.KeySpaceManager; @@ -168,8 +170,7 @@ public boolean restartDataNode(int i, boolean keepPort) throws IOException { try { this.waitActive(); - this.waitForHeartbeatProcessed(); - this.waitOzoneReady(); + waitDatanodeOzoneReady(i); } catch (TimeoutException | InterruptedException e) { Thread.interrupted(); } @@ -263,6 +264,21 @@ public void waitOzoneReady() throws TimeoutException, InterruptedException { }, 1000, 60 * 1000); //wait for 1 min. } + /** + * Waits for a particular Datanode to be ready for processing ozone requests. + */ + public void waitDatanodeOzoneReady(int dnIndex) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + DatanodeStateMachine.DatanodeStates state = + dataNodes.get(dnIndex).getDatanode().getOzoneStateMachineState(); + final boolean rebootComplete = + (state == DatanodeStateMachine.DatanodeStates.RUNNING); + LOG.info("{} Current state:{}", rebootComplete, state); + return rebootComplete; + }, 1000, 60 * 1000); //wait for 1 min. + } + /** * Waits for SCM to be out of Chill Mode. Many tests can be run iff we are out * of Chill mode.