From 68da45a789262f143c393a101c00e8349a45c035 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Mon, 3 Apr 2017 12:40:06 -0700 Subject: [PATCH] HDFS-11519. Ozone: Implement XceiverServerSpi and XceiverClientSpi using Ratis. Contributed by Tsz Wo Nicholas Sze. --- .../hadoop-hdfs-client/pom.xml | 25 ++++ .../org/apache/hadoop/scm/ScmConfigKeys.java | 9 ++ .../hadoop/scm/XceiverClientManager.java | 8 +- .../apache/hadoop/scm/XceiverClientRatis.java | 111 ++++++++++++++++ .../container/common/helpers/Pipeline.java | 10 ++ .../com/google/protobuf/ShadedProtoUtil.java | 38 ++++++ hadoop-hdfs-project/hadoop-hdfs/pom.xml | 5 + .../apache/hadoop/ozone/OzoneConfigKeys.java | 16 +++ .../server/ratis/ContainerStateMachine.java | 107 ++++++++++++++++ .../server/ratis/XceiverServerRatis.java | 119 ++++++++++++++++++ .../container/ozoneimpl/OzoneContainer.java | 9 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 38 +++--- .../hdfs/MiniDFSClusterWithNodeGroup.java | 2 +- .../apache/hadoop/ozone/MiniOzoneCluster.java | 28 +++++ .../ozone/container/ContainerTestHelper.java | 64 +++++++++- .../ozoneimpl/TestOzoneContainer.java | 89 +++++++++++-- .../transport/server/TestContainerServer.java | 106 +++++++++++++--- 17 files changed, 729 insertions(+), 55 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml index a5ed7a376e..b33851cde3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml @@ -113,6 +113,31 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> com.fasterxml.jackson.core jackson-databind + + + org.apache.ratis + ratis-proto-shaded + + + ratis-common + org.apache.ratis + + + ratis-client + org.apache.ratis + + + ratis-server + org.apache.ratis + + + ratis-netty + org.apache.ratis + + + ratis-grpc + org.apache.ratis + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 5f3dbd552c..fed4459729 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -35,6 +35,15 @@ public final class ScmConfigKeys { public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT = 10000; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY + = "dfs.container.ratis.enabled"; + public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT + = false; + public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY + = "dfs.container.ratis.rpc.type"; + public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT + = "GRPC"; + // TODO : this is copied from OzoneConsts, may need to move to a better place public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index 82e7e2aa90..3d2a913053 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -52,6 +52,7 @@ public class XceiverClientManager { private final Configuration conf; private Cache openClient; private final long staleThresholdMs; + private final boolean useRatis; /** * Creates a new XceiverClientManager. @@ -63,6 +64,9 @@ public XceiverClientManager(Configuration conf) { this.staleThresholdMs = conf.getTimeDuration( SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY, SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS); + this.useRatis = conf.getBoolean( + ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); this.conf = conf; this.openClient = CacheBuilder.newBuilder() .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS) @@ -109,7 +113,9 @@ public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { return info.getXceiverClient(); } else { // connection not found, create new, add reference and return - XceiverClientSpi xceiverClient = new XceiverClient(pipeline, conf); + final XceiverClientSpi xceiverClient = useRatis? + XceiverClientRatis.newXceiverClientRatis(pipeline, conf) + : new XceiverClient(pipeline, conf); try { xceiverClient.connect(); } catch (Exception e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java new file mode 100644 index 0000000000..7b17f333fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * An abstract implementation of {@link XceiverClientSpi} using Ratis. + * The underlying RPC mechanism can be chosen via the constructor. + */ +public final class XceiverClientRatis implements XceiverClientSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); + + public static XceiverClientRatis newXceiverClientRatis( + Pipeline pipeline, Configuration ozoneConf) { + final String rpcType = ozoneConf.get( + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + return new XceiverClientRatis(pipeline, + SupportedRpcType.valueOfIgnoreCase(rpcType)); + } + + private final Pipeline pipeline; + private final RaftClient client; + + /** Constructs a client. */ + XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { + this.pipeline = pipeline; + final List peers = pipeline.getMachines().stream() + .map(dn -> dn.getXferAddr()) + .map(addr -> new RaftPeer(new RaftPeerId(addr), addr)) + .collect(Collectors.toList()); + + final RaftProperties properties = new RaftProperties(); + final ClientFactory factory = ClientFactory.cast(rpcType.newFactory( + properties, null)); + + client = RaftClient.newBuilder() + .setClientRpc(factory.newRaftClientRpc()) + .setServers(peers) + .setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr())) + .setProperties(properties) + .build(); + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public void connect() throws Exception { + // do nothing. + } + + @Override + public void close() { + try { + client.close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + LOG.debug("sendCommand {}", request); + final RaftClientReply reply = client.send( + () -> ShadedProtoUtil.asShadedByteString(request.toByteArray())); + LOG.debug("reply {}", reply); + Preconditions.checkState(reply.isSuccess()); + return ContainerCommandResponseProto.parseFrom( + ShadedProtoUtil.asByteString(reply.getMessage().getContent())); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java index 433c94ca59..4456138b2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java @@ -155,4 +155,14 @@ public byte[] getData() { return null; } } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()) + .append("["); + datanodes.keySet().stream() + .forEach(id -> b.append(id.endsWith(leaderID)? "*" + id : id)); + b.append("] container:").append(containerName); + return b.toString(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java new file mode 100644 index 0000000000..29242ad99f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.shaded.com.google.protobuf; + +/** Utilities for the shaded protobuf in Ratis. */ +public interface ShadedProtoUtil { + /** + * @param bytes + * @return the wrapped shaded {@link ByteString} (no coping). + */ + static ByteString asShadedByteString(byte[] bytes) { + return ByteString.wrap(bytes); + } + + /** + * @param shaded + * @return a {@link com.google.protobuf.ByteString} (require coping). + */ + static com.google.protobuf.ByteString asByteString(ByteString shaded) { + return com.google.protobuf.ByteString.copyFrom( + shaded.asReadOnlyByteBuffer()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 9b8f21ff0f..070c3421c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -212,6 +212,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> assertj-core test + + org.jctools + jctools-core + true + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 9d242d6deb..9a71cd0fd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.scm.ScmConfigKeys; /** * This class contains constants for configuration keys used in Ozone. @@ -67,6 +68,21 @@ public final class OzoneConfigKeys { "ozone.container.task.wait.seconds"; public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; + public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; + public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY; + public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT; + public static final String DFS_CONTAINER_RATIS_CONF = + "dfs.container.ratis.conf"; + public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS = + "dfs.container.ratis.datanode.address"; + public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = + "dfs.container.ratis.datanode.storage.dir"; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java new file mode 100644 index 0000000000..fd9e3df38a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.statemachine.BaseStateMachine; +import org.apache.ratis.statemachine.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */ +public class ContainerStateMachine extends BaseStateMachine { + static final Logger LOG = LoggerFactory.getLogger( + ContainerStateMachine.class); + private final SimpleStateMachineStorage storage + = new SimpleStateMachineStorage(); + private final ContainerDispatcher dispatcher; + + ContainerStateMachine(ContainerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + @Override + public StateMachineStorage getStateMachineStorage() { + return storage; + } + + @Override + public void initialize( + RaftPeerId id, RaftProperties properties, RaftStorage raftStorage) + throws IOException { + super.initialize(id, properties, raftStorage); + storage.init(raftStorage); +// TODO handle snapshots + } + + @Override + public CompletableFuture query(RaftClientRequest request) { + return dispatch(ShadedProtoUtil.asByteString( + request.getMessage().getContent()), + response -> new RaftClientReply(request, + () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()))); + } + + @Override + public CompletableFuture applyTransaction(TransactionContext trx) { + final SMLogEntryProto logEntry = trx.getSMLogEntry().get(); + return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()), + response -> + () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()) + ); + } + + private CompletableFuture dispatch( + ByteString requestBytes, Function f) { + final ContainerCommandResponseProto response; + try { + final ContainerCommandRequestProto request + = ContainerCommandRequestProto.parseFrom(requestBytes); + LOG.trace("dispatch {}", request); + response = dispatcher.dispatch(request); + LOG.trace("response {}", response); + } catch (IOException e) { + return completeExceptionally(e); + } + return CompletableFuture.completedFuture(f.apply(response)); + } + + static CompletableFuture completeExceptionally(Exception e) { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java new file mode 100644 index 0000000000..4c82ac2db2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Creates a ratis server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServerRatis implements XceiverServerSpi { + static RaftProperties newRaftProperties( + RpcType rpc, int port, String storageDir) { + final RaftProperties properties = new RaftProperties(); + RaftServerConfigKeys.setStorageDir(properties, storageDir); + RaftConfigKeys.Rpc.setType(properties, rpc); + if (rpc == SupportedRpcType.GRPC) { + GrpcConfigKeys.Server.setPort(properties, port); + } else if (rpc == SupportedRpcType.NETTY) { + NettyConfigKeys.Server.setPort(properties, port); + } + return properties; + } + + public static XceiverServerRatis newXceiverServerRatis( + Configuration ozoneConf, ContainerDispatcher dispatcher) + throws IOException { + final String id = ozoneConf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS); + final Collection servers = ozoneConf.getStringCollection( + OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); + final String storageDir = ozoneConf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); + final String rpcType = ozoneConf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); + return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc); + } + + private final int port; + private final RaftServer server; + + private XceiverServerRatis( + String id, Collection servers, String storageDir, + ContainerDispatcher dispatcher, RpcType rpcType) throws IOException { + Preconditions.checkArgument(servers.contains(id), + "%s is not one of %s specified in %s", + id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); + + final List peers = servers.stream() + .map(addr -> new RaftPeer(new RaftPeerId(addr), addr)) + .collect(Collectors.toList()); + + this.port = NetUtils.createSocketAddr(id).getPort(); + + this.server = RaftServer.newBuilder() + .setServerId(new RaftPeerId(id)) + .setPeers(peers) + .setProperties(newRaftProperties(rpcType, port, storageDir)) + .setStateMachine(new ContainerStateMachine(dispatcher)) + .build(); + } + + @Override + public void start() throws IOException { + server.start(); + } + + @Override + public void stop() { + try { + server.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getIPCPort() { + return port; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index de68a99435..1a7297d8c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -86,7 +87,13 @@ public OzoneContainer( manager.setKeyManager(this.keyManager); this.dispatcher = new Dispatcher(manager, this.ozoneConfig); - server = new XceiverServer(this.ozoneConfig, this.dispatcher); + + final boolean useRatis = ozoneConfig.getBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + server = useRatis? + XceiverServerRatis.newXceiverServerRatis(ozoneConfig, dispatcher) + : new XceiverServer(this.ozoneConfig, this.dispatcher); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index cda7b0f3c6..247804c7ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1609,7 +1609,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, dnConf.addResource(dnConfOverlays[i]); } // Set up datanode address - setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); + setupDatanodeAddress(i, dnConf, setupHostsFile, checkDataNodeAddrConfig); if (manageDfsDirs) { String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i - curDatanodesNum]); @@ -2911,16 +2911,19 @@ public File getProvidedStorageDir(int dnIndex, int dirIndex) { /** * Get a storage directory for a datanode. + * For examples, *
    - *
  1. /data/data<2*dnIndex + 1>
  2. - *
  3. /data/data<2*dnIndex + 2>
  4. + *
  5. /data/dn0_data0
  6. + *
  7. /data/dn0_data1
  8. + *
  9. /data/dn1_data0
  10. + *
  11. /data/dn1_data1
  12. *
* * @param dnIndex datanode index (starts from 0) * @param dirIndex directory index. * @return Storage directory */ - public File getStorageDir(int dnIndex, int dirIndex) { + public static File getStorageDir(int dnIndex, int dirIndex) { return new File(getBaseDirectory(), getStorageDirPath(dnIndex, dirIndex)); } @@ -2931,8 +2934,8 @@ public File getStorageDir(int dnIndex, int dirIndex) { * @param dirIndex directory index. * @return storage directory path */ - private String getStorageDirPath(int dnIndex, int dirIndex) { - return "data/data" + (storagesPerDatanode * dnIndex + 1 + dirIndex); + private static String getStorageDirPath(int dnIndex, int dirIndex) { + return "data/dn" + dnIndex + "_data" + dirIndex; } /** @@ -3197,35 +3200,36 @@ public void setBlockRecoveryTimeout(long timeout) { } } - protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile, - boolean checkDataNodeAddrConfig) throws IOException { + protected void setupDatanodeAddress( + int i, Configuration dnConf, boolean setupHostsFile, + boolean checkDataNodeAddrConfig) throws IOException { if (setupHostsFile) { - String hostsFile = conf.get(DFS_HOSTS, "").trim(); + String hostsFile = dnConf.get(DFS_HOSTS, "").trim(); if (hostsFile.length() == 0) { throw new IOException("Parameter dfs.hosts is not setup in conf"); } // Setup datanode in the include file, if it is defined in the conf String address = "127.0.0.1:" + NetUtils.getFreeSocketPort(); if (checkDataNodeAddrConfig) { - conf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, address); + dnConf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, address); } else { - conf.set(DFS_DATANODE_ADDRESS_KEY, address); + dnConf.set(DFS_DATANODE_ADDRESS_KEY, address); } addToFile(hostsFile, address); LOG.info("Adding datanode " + address + " to hosts file " + hostsFile); } else { if (checkDataNodeAddrConfig) { - conf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0"); + dnConf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0"); } else { - conf.set(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0"); + dnConf.set(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0"); } } if (checkDataNodeAddrConfig) { - conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0"); - conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0"); + dnConf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0"); + dnConf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0"); } else { - conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0"); + dnConf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0"); + dnConf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java index 5c011e31ba..898ec68886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java @@ -117,7 +117,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { Configuration dnConf = new HdfsConfiguration(conf); // Set up datanode address - setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); + setupDatanodeAddress(i, dnConf, setupHostsFile, checkDataNodeAddrConfig); if (manageDfsDirs) { String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]); dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 4bd9acca58..8f338b90c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import org.apache.commons.io.FileUtils; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; @@ -85,6 +86,33 @@ private MiniOzoneCluster(Builder builder, StorageContainerManager scm) tempPath = Paths.get(builder.getPath(), builder.getRunID()); } + @Override + protected void setupDatanodeAddress( + int i, Configuration dnConf, boolean setupHostsFile, + boolean checkDnAddrConf) throws IOException { + super.setupDatanodeAddress(i, dnConf, setupHostsFile, checkDnAddrConf); + + final boolean useRatis = dnConf.getBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + if (!useRatis) { + return; + } + final String[] ids = dnConf.getStrings( + OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); + // TODO: use the i-th raft server as the i-th datanode address + // this only work for one Raft cluster + setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, + ids[i]); + setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, + getInstanceStorageDir(i, -1).getCanonicalPath()); + } + + static void setConf(int i, Configuration conf, String key, String value) { + conf.set(key, value); + LOG.info("dn{}: set {} = {}", i, key, value); + } + @Override public void close() { shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 076ab825ab..7e8623ae1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -21,17 +21,22 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.ratis.rpc.RpcType; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; @@ -42,11 +47,14 @@ import java.util.Random; import java.util.UUID; import java.util.Map; +import java.util.stream.Collectors; /** * Helpers for container tests. */ public final class ContainerTestHelper { + public static final Logger LOG = LoggerFactory.getLogger( + ContainerTestHelper.class); private static Random r = new Random(); /** @@ -64,19 +72,50 @@ private ContainerTestHelper() { */ public static Pipeline createSingleNodePipeline(String containerName) throws IOException { + return createPipeline(containerName, 1); + } + + public static DatanodeID createDatanodeID() throws IOException { ServerSocket socket = new ServerSocket(0); int port = socket.getLocalPort(); DatanodeID datanodeID = new DatanodeID(socket.getInetAddress() .getHostAddress(), socket.getInetAddress().getHostName(), UUID.randomUUID().toString(), port, port, port, port); datanodeID.setContainerPort(port); - Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid()); - pipeline.addMember(datanodeID); - pipeline.setContainerName(containerName); socket.close(); + return datanodeID; + } + + /** + * Create a pipeline with single node replica. + * + * @return Pipeline with single node in it. + * @throws IOException + */ + public static Pipeline createPipeline(String containerName, int numNodes) + throws IOException { + Preconditions.checkArgument(numNodes >= 1); + final DatanodeID leader = createDatanodeID(); + Pipeline pipeline = new Pipeline(leader.getDatanodeUuid()); + pipeline.setContainerName(containerName); + pipeline.addMember(leader); + + for(int i = 1; i < numNodes; i++) { + pipeline.addMember(createDatanodeID()); + } return pipeline; } + public static void initRatisConf( + RpcType rpc, Pipeline pipeline, Configuration conf) { + conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name()); + conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF, + pipeline.getMachines().stream() + .map(dn -> dn.getXferAddr()) + .collect(Collectors.joining(","))); + } + /** * Creates a ChunkInfo for testing. * @@ -133,6 +172,8 @@ public static void setDataChecksum(ChunkInfo info, byte[] data) public static ContainerCommandRequestProto getWriteChunkRequest( Pipeline pipeline, String containerName, String keyName, int datalen) throws IOException, NoSuchAlgorithmException { + LOG.trace("writeChunk {} (key={}) to pipeline=", + datalen, keyName, pipeline); ContainerProtos.WriteChunkRequestProto.Builder writeRequest = ContainerProtos.WriteChunkRequestProto .newBuilder(); @@ -225,6 +266,9 @@ public static ContainerCommandRequestProto getReadSmallFileRequest( public static ContainerCommandRequestProto getReadChunkRequest( ContainerProtos.WriteChunkRequestProto request) throws IOException, NoSuchAlgorithmException { + LOG.trace("readChunk key={} from pipeline={}", + request.getKeyName(), request.getPipeline()); + ContainerProtos.ReadChunkRequestProto.Builder readRequest = ContainerProtos.ReadChunkRequestProto.newBuilder(); @@ -252,6 +296,9 @@ public static ContainerCommandRequestProto getDeleteChunkRequest( ContainerProtos.WriteChunkRequestProto writeRequest) throws IOException, NoSuchAlgorithmException { + LOG.trace("deleteChunk key={} from pipeline={}", + writeRequest.getKeyName(), writeRequest.getPipeline()); + ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest = ContainerProtos.DeleteChunkRequestProto .newBuilder(); @@ -275,6 +322,8 @@ public static ContainerCommandRequestProto getDeleteChunkRequest( */ public static ContainerCommandRequestProto getCreateContainerRequest( String containerName) throws IOException { + LOG.trace("createContainer: {}", containerName); + ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto .newBuilder(); @@ -358,6 +407,9 @@ public static ContainerCommandRequestProto getUpdateContainerRequest( */ public static ContainerCommandRequestProto getPutKeyRequest( ContainerProtos.WriteChunkRequestProto writeRequest) { + LOG.trace("putKey: {} to pipeline={}", + writeRequest.getKeyName(), writeRequest.getPipeline()); + ContainerProtos.PutKeyRequestProto.Builder putRequest = ContainerProtos.PutKeyRequestProto.newBuilder(); @@ -384,6 +436,9 @@ public static ContainerCommandRequestProto getPutKeyRequest( */ public static ContainerCommandRequestProto getKeyRequest( ContainerProtos.PutKeyRequestProto putKeyRequest) { + LOG.trace("getKey: name={} from pipeline={}", + putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline()); + ContainerProtos.GetKeyRequestProto.Builder getRequest = ContainerProtos.GetKeyRequestProto.newBuilder(); ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData @@ -422,6 +477,9 @@ public static void verifyGetKey(ContainerCommandRequestProto request, */ public static ContainerCommandRequestProto getDeleteKeyRequest( ContainerProtos.PutKeyRequestProto putKeyRequest) { + LOG.trace("deleteKey: name={} from pipeline={}", + putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline()); + ContainerProtos.DeleteKeyRequestProto.Builder delRequest = ContainerProtos.DeleteKeyRequestProto.newBuilder(); delRequest.setPipeline(putKeyRequest.getPipeline()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 09d1a81790..c0e33a75b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -25,7 +25,11 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.scm.XceiverClient; +import org.apache.hadoop.scm.XceiverClientRatis; +import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -88,20 +92,69 @@ public void testCreateOzoneContainer() throws Exception { } } + @Test + public void testOzoneContainerViaDataNodeRatisGrpc() throws Exception { + runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 1); + runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 3); + } + + @Test + public void testOzoneContainerViaDataNodeRatisNetty() throws Exception { + runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 1); + runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 3); + } + + private static void runTestOzoneContainerViaDataNodeRatis( + RpcType rpc, int numNodes) throws Exception { + ContainerTestHelper.LOG.info("runTestOzoneContainerViaDataNodeRatis(rpc=" + + rpc + ", numNodes=" + numNodes); + + final String containerName = OzoneUtils.getRequestID(); + final Pipeline pipeline = ContainerTestHelper.createPipeline( + containerName, numNodes); + final OzoneConfiguration conf = initOzoneConfiguration(pipeline); + ContainerTestHelper.initRatisConf(rpc, pipeline, conf); + + final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf) + .setHandlerType("local") + .numDataNodes(pipeline.getMachines().size()) + .build(); + cluster.waitOzoneReady(); + final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis( + pipeline, conf); + + try { + runTestOzoneContainerViaDataNode(containerName, pipeline, client); + } finally { + cluster.shutdown(); + } + } + + private static OzoneConfiguration initOzoneConfiguration(Pipeline pipeline) { + final OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + pipeline.getLeader().getContainerPort()); + + setOzoneLocalStorageRoot(conf); + return conf; + } + + private static void setOzoneLocalStorageRoot(OzoneConfiguration conf) { + URL p = conf.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + path += conf.getTrimmed( + OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, + OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT); + conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path); + } + @Test public void testOzoneContainerViaDataNode() throws Exception { MiniOzoneCluster cluster = null; - XceiverClient client = null; try { - String keyName = OzoneUtils.getRequestID(); String containerName = OzoneUtils.getRequestID(); OzoneConfiguration conf = new OzoneConfiguration(); - URL p = conf.getClass().getResource(""); - String path = p.getPath().concat( - TestOzoneContainer.class.getSimpleName()); - path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, - OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT); - conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path); + setOzoneLocalStorageRoot(conf); // Start ozone container Via Datanode create. @@ -115,19 +168,32 @@ public void testOzoneContainerViaDataNode() throws Exception { .setHandlerType("distributed").build(); // This client talks to ozone container via datanode. - client = new XceiverClient(pipeline, conf); + XceiverClient client = new XceiverClient(pipeline, conf); + + runTestOzoneContainerViaDataNode(containerName, pipeline, client); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + static void runTestOzoneContainerViaDataNode( + String containerName, Pipeline pipeline, XceiverClientSpi client) + throws Exception { + try { client.connect(); // Create container ContainerProtos.ContainerCommandRequestProto request = ContainerTestHelper.getCreateContainerRequest(containerName); - pipeline.setContainerName(containerName); ContainerProtos.ContainerCommandResponseProto response = client.sendCommand(request); Assert.assertNotNull(response); Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); // Write Chunk + final String keyName = OzoneUtils.getRequestID(); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper.getWriteChunkRequest(pipeline, containerName, keyName, 1024); @@ -204,9 +270,6 @@ public void testOzoneContainerViaDataNode() throws Exception { if (client != null) { client.close(); } - if (cluster != null) { - cluster.shutdown(); - } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java index 8606915aeb..e7ae679729 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java @@ -22,29 +22,43 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.impl.Dispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.ozone.container.common.impl.Dispatcher; -import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler; - +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.scm.XceiverClient; +import org.apache.hadoop.scm.XceiverClientRatis; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.rpc.RpcType; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import static org.apache.ratis.rpc.SupportedRpcType.GRPC; +import static org.apache.ratis.rpc.SupportedRpcType.NETTY; import static org.mockito.Mockito.mock; /** * Test Containers. */ public class TestContainerServer { + static final String TEST_DIR + = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator; @Test public void testPipeline() throws IOException { @@ -68,33 +82,87 @@ public void testPipeline() throws IOException { @Test public void testClientServer() throws Exception { - XceiverServer server = null; - XceiverClient client = null; + runTestClientServer(1, + (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + pipeline.getLeader().getContainerPort()), + XceiverClient::new, + (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher())); + } + + @FunctionalInterface + interface CheckedBiFunction { + OUT apply(LEFT left, RIGHT right) throws THROWABLE; + } + + @Test + public void testClientServerRatisNetty() throws Exception { + runTestClientServerRatis(NETTY, 1); + runTestClientServerRatis(NETTY, 3); + } + + @Test + public void testClientServerRatisGrpc() throws Exception { + runTestClientServerRatis(GRPC, 1); + runTestClientServerRatis(GRPC, 3); + } + + static XceiverServerRatis newXceiverServerRatis( + DatanodeID dn, OzoneConfiguration conf) throws IOException { + final String id = dn.getXferAddr(); + conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id); + final String dir = TEST_DIR + id.replace(':', '_'); + conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); + + final ContainerDispatcher dispatcher = new TestContainerDispatcher(); + return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher); + } + + static void runTestClientServerRatis(RpcType rpc, int numNodes) + throws Exception { + runTestClientServer(numNodes, + (pipeline, conf) -> ContainerTestHelper.initRatisConf( + rpc, pipeline, conf), + XceiverClientRatis::newXceiverClientRatis, + TestContainerServer::newXceiverServerRatis); + } + + static void runTestClientServer( + int numDatanodes, + BiConsumer initConf, + CheckedBiFunction createClient, + CheckedBiFunction createServer) + throws Exception { + final List servers = new ArrayList<>(); + XceiverClientSpi client = null; String containerName = OzoneUtils.getRequestID(); try { - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline( - containerName); - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - pipeline.getLeader().getContainerPort()); + final Pipeline pipeline = ContainerTestHelper.createPipeline( + containerName, numDatanodes); + final OzoneConfiguration conf = new OzoneConfiguration(); + initConf.accept(pipeline, conf); - server = new XceiverServer(conf, new TestContainerDispatcher()); - client = new XceiverClient(pipeline, conf); + for(DatanodeID dn : pipeline.getMachines()) { + final XceiverServerSpi s = createServer.apply(dn, conf); + servers.add(s); + s.start(); + } - server.start(); + client = createClient.apply(pipeline, conf); client.connect(); - ContainerCommandRequestProto request = + final ContainerCommandRequestProto request = ContainerTestHelper.getCreateContainerRequest(containerName); + Assert.assertNotNull(request.getTraceID()); + ContainerCommandResponseProto response = client.sendCommand(request); - Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + Assert.assertEquals(request.getTraceID(), response.getTraceID()); } finally { if (client != null) { client.close(); } - if (server != null) { - server.stop(); - } + servers.stream().forEach(XceiverServerSpi::stop); } }