diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index 5d652197cd..0513b2e097 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -20,12 +20,12 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.ratis.RatisHelper; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; @@ -37,11 +37,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * An abstract implementation of {@link XceiverClientSpi} using Ratis. @@ -75,8 +77,9 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { */ public void createPipeline(String clusterId, List datanodes) throws IOException { - final RaftPeer[] newPeers = datanodes.stream().map(RatisHelper::toRaftPeer) - .toArray(RaftPeer[]::new); + final List newPeers = datanodes.stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers); reinitialize(datanodes, newPeers); } @@ -90,7 +93,8 @@ public OzoneProtos.ReplicationType getPipelineType() { return OzoneProtos.ReplicationType.RATIS; } - private void reinitialize(List datanodes, RaftPeer[] newPeers) + private void reinitialize( + List datanodes, Collection newPeers) throws IOException { if (datanodes.isEmpty()) { return; @@ -120,11 +124,11 @@ private void reinitialize(List datanodes, RaftPeer[] newPeers) * @param newPeers - Raft machines * @throws IOException - on Failure. */ - private void reinitialize(DatanodeID datanode, RaftPeer[] newPeers) + private void reinitialize(DatanodeID datanode, Collection newPeers) throws IOException { final RaftPeer p = RatisHelper.toRaftPeer(datanode); try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { - client.reinitialize(newPeers, p.getId()); + client.reinitialize(RatisHelper.newRaftGroup(newPeers), p.getId()); } catch (IOException ioe) { LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", p, datanode, ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java index a2acdff6e3..0638b6c354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java @@ -23,15 +23,15 @@ import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; /** @@ -41,6 +41,10 @@ public interface RatisHelper { Logger LOG = LoggerFactory.getLogger(RatisHelper.class); static String toRaftPeerIdString(DatanodeID id) { + return id.getIpAddr() + "_" + id.getRatisPort(); + } + + static String toRaftPeerAddressString(DatanodeID id) { return id.getIpAddr() + ":" + id.getRatisPort(); } @@ -48,44 +52,59 @@ static RaftPeerId toRaftPeerId(DatanodeID id) { return RaftPeerId.valueOf(toRaftPeerIdString(id)); } - static RaftPeer toRaftPeer(String id) { - return new RaftPeer(RaftPeerId.valueOf(id), id); - } - static RaftPeer toRaftPeer(DatanodeID id) { - return toRaftPeer(toRaftPeerIdString(id)); + return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id)); } static List toRaftPeers(Pipeline pipeline) { - return pipeline.getMachines().stream() - .map(RatisHelper::toRaftPeer) + return toRaftPeers(pipeline.getMachines()); + } + + static List toRaftPeers(List datanodes) { + return datanodes.stream().map(RatisHelper::toRaftPeer) .collect(Collectors.toList()); } - static RaftPeer[] toRaftPeerArray(Pipeline pipeline) { - return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS); + /* TODO: use a dummy id for all groups for the moment. + * It should be changed to a unique id for each group. + */ + RaftGroupId DUMMY_GROUP_ID = RaftGroupId.randomId(); + + RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID, + Collections.emptyList()); + + static RaftGroup emptyRaftGroup() { + return EMPTY_GROUP; + } + + static RaftGroup newRaftGroup(Collection peers) { + return peers.isEmpty()? emptyRaftGroup() + : new RaftGroup(DUMMY_GROUP_ID, peers); + } + + static RaftGroup newRaftGroup(Pipeline pipeline) { + return newRaftGroup(toRaftPeers(pipeline)); } static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) { return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()), - toRaftPeers(pipeline)); + newRaftGroup(pipeline)); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) { return newRaftClient(rpcType, leader.getId(), - new ArrayList<>(Arrays.asList(leader))); + newRaftGroup(new ArrayList<>(Arrays.asList(leader)))); } static RaftClient newRaftClient( - RpcType rpcType, RaftPeerId leader, List peers) { - LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers); + RpcType rpcType, RaftPeerId leader, RaftGroup group) { + LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); final RaftProperties properties = new RaftProperties(); - final ClientFactory factory = ClientFactory.cast(rpcType.newFactory( - properties, null)); + final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(null)); return RaftClient.newBuilder() .setClientRpc(factory.newRaftClientRpc()) - .setServers(peers) + .setRaftGroup(group) .setLeaderId(leader) .setProperties(properties) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0bf25a3a0b..eb9247fe09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -80,7 +80,7 @@ public CompletableFuture query(RaftClientRequest request) { @Override public CompletableFuture applyTransaction(TransactionContext trx) { - final SMLogEntryProto logEntry = trx.getSMLogEntry().get(); + final SMLogEntryProto logEntry = trx.getSMLogEntry(); return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()), response -> () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index cbae4376f4..860090cb5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -45,7 +45,6 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; -import java.util.Collections; import java.util.Objects; /** @@ -64,7 +63,7 @@ private XceiverServerRatis(DatanodeID id, int port, String storageDir, this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(id)) - .setPeers(Collections.emptyList()) + .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(newRaftProperties(rpcType, port, storageDir)) .setStateMachine(new ContainerStateMachine(dispatcher)) .build(); @@ -73,7 +72,7 @@ private XceiverServerRatis(DatanodeID id, int port, String storageDir, static RaftProperties newRaftProperties( RpcType rpc, int port, String storageDir) { final RaftProperties properties = new RaftProperties(); - RaftServerConfigKeys.setStorageDir(properties, storageDir); + RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); RaftConfigKeys.Rpc.setType(properties, rpc); if (rpc == SupportedRpcType.GRPC) { GrpcConfigKeys.Server.setPort(properties, port); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java index abe27ab662..123de5f52b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java @@ -129,9 +129,8 @@ static XceiverServerRatis newXceiverServerRatis( static void initXceiverServerRatis( RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException { final RaftPeer p = RatisHelper.toRaftPeer(id); - final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline); final RaftClient client = RatisHelper.newRaftClient(rpc, p); - client.reinitialize(peers, p.getId()); + client.reinitialize(RatisHelper.newRaftGroup(pipeline), p.getId()); }