HDFS-12329. Ozone: Ratis: Readonly calls in XceiverClientRatis should use sendReadOnly. Contributed by Mukul Kumar Singh.

This commit is contained in:
Weiwei Yang 2017-09-20 10:08:15 +08:00 committed by Owen O'Malley
parent 607d1fcd31
commit c329d3b4b4
3 changed files with 71 additions and 8 deletions

View File

@ -20,14 +20,17 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; .ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.ratis.RatisHelper; import org.apache.ratis.RatisHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -94,13 +97,48 @@ private RaftClient getClient() {
return Objects.requireNonNull(client.get(), "client is null"); return Objects.requireNonNull(client.get(), "client is null");
} }
private boolean isReadOnly(ContainerCommandRequestProto proto) {
switch (proto.getCmdType()) {
case ReadContainer:
case ReadChunk:
case ListKey:
case GetKey:
case GetSmallFile:
case ListContainer:
case ListChunk:
return true;
case CloseContainer:
case WriteChunk:
case UpdateContainer:
case CompactChunk:
case CreateContainer:
case DeleteChunk:
case DeleteContainer:
case DeleteKey:
case PutKey:
case PutSmallFile:
default:
return false;
}
}
private RaftClientReply sendRequest(ContainerCommandRequestProto request)
throws IOException {
boolean isReadOnlyRequest = isReadOnly(request);
ByteString byteString =
ShadedProtoUtil.asShadedByteString(request.toByteArray());
LOG.debug("sendCommand {} {}", isReadOnlyRequest, request);
final RaftClientReply reply = isReadOnlyRequest ?
getClient().sendReadOnly(() -> byteString) :
getClient().send(() -> byteString);
LOG.debug("reply {} {}", isReadOnlyRequest, reply);
return reply;
}
@Override @Override
public ContainerCommandResponseProto sendCommand( public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException { ContainerCommandRequestProto request) throws IOException {
LOG.debug("sendCommand {}", request); final RaftClientReply reply = sendRequest(request);
final RaftClientReply reply = getClient().send(
() -> ShadedProtoUtil.asShadedByteString(request.toByteArray()));
LOG.debug("reply {}", reply);
Preconditions.checkState(reply.isSuccess()); Preconditions.checkState(reply.isSuccess());
return ContainerCommandResponseProto.parseFrom( return ContainerCommandResponseProto.parseFrom(
ShadedProtoUtil.asByteString(reply.getMessage().getContent())); ShadedProtoUtil.asByteString(reply.getMessage().getContent()));

View File

@ -1581,6 +1581,15 @@ public OzoneContainer getOzoneContainerManager() {
return this.datanodeStateMachine.getContainer(); return this.datanodeStateMachine.getContainer();
} }
@VisibleForTesting
public DatanodeStateMachine.DatanodeStates getOzoneStateMachineState() {
if (this.datanodeStateMachine != null) {
return this.datanodeStateMachine.getContext().getState();
}
// if the state machine doesn't exist then DN initialization is in progress
return DatanodeStateMachine.DatanodeStates.INIT;
}
/** /**
* After the block pool has contacted the NN, registers that block pool * After the block pool has contacted the NN, registers that block pool
* with the secret manager, updating it with the secrets provided by the NN. * with the secret manager, updating it with the secrets provided by the NN.

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.container.common
.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys; import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager; import org.apache.hadoop.ozone.ksm.KeySpaceManager;
@ -168,8 +170,7 @@ public boolean restartDataNode(int i, boolean keepPort) throws IOException {
try { try {
this.waitActive(); this.waitActive();
this.waitForHeartbeatProcessed(); waitDatanodeOzoneReady(i);
this.waitOzoneReady();
} catch (TimeoutException | InterruptedException e) { } catch (TimeoutException | InterruptedException e) {
Thread.interrupted(); Thread.interrupted();
} }
@ -263,6 +264,21 @@ public void waitOzoneReady() throws TimeoutException, InterruptedException {
}, 1000, 60 * 1000); //wait for 1 min. }, 1000, 60 * 1000); //wait for 1 min.
} }
/**
* Waits for a particular Datanode to be ready for processing ozone requests.
*/
public void waitDatanodeOzoneReady(int dnIndex)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
DatanodeStateMachine.DatanodeStates state =
dataNodes.get(dnIndex).getDatanode().getOzoneStateMachineState();
final boolean rebootComplete =
(state == DatanodeStateMachine.DatanodeStates.RUNNING);
LOG.info("{} Current state:{}", rebootComplete, state);
return rebootComplete;
}, 1000, 60 * 1000); //wait for 1 min.
}
/** /**
* Waits for SCM to be out of Chill Mode. Many tests can be run iff we are out * Waits for SCM to be out of Chill Mode. Many tests can be run iff we are out
* of Chill mode. * of Chill mode.