HDFS-12720. Ozone: Ratis options are not passed from KSM Client protobuf helper correctly. Contributed by Mukul Kumar Singh.

This commit is contained in:
Mukul Kumar Singh 2017-11-03 09:59:25 +05:30 committed by Owen O'Malley
parent 0559265822
commit 7ebe79e879
15 changed files with 278 additions and 57 deletions

View File

@ -20,6 +20,8 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@ -100,7 +102,8 @@ public ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager, OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient, StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
KeySpaceManagerProtocolClientSideTranslatorPB ksmClient, KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
int chunkSize, String requestId) throws IOException { int chunkSize, String requestId, ReplicationFactor factor,
ReplicationType type) throws IOException {
this.streamEntries = new ArrayList<>(); this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0; this.currentStreamIndex = 0;
this.byteOffset = 0; this.byteOffset = 0;
@ -111,6 +114,8 @@ public ChunkGroupOutputStream(
.setVolumeName(info.getVolumeName()) .setVolumeName(info.getVolumeName())
.setBucketName(info.getBucketName()) .setBucketName(info.getBucketName())
.setKeyName(info.getKeyName()) .setKeyName(info.getKeyName())
.setType(type)
.setFactor(factor)
.setDataSize(info.getDataSize()).build(); .setDataSize(info.getDataSize()).build();
this.openID = handler.getId(); this.openID = handler.getId();
this.xceiverClientManager = xceiverClientManager; this.xceiverClientManager = xceiverClientManager;
@ -292,6 +297,8 @@ public static class Builder {
private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient; private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
private int chunkSize; private int chunkSize;
private String requestID; private String requestID;
private ReplicationType type;
private ReplicationFactor factor;
public Builder setHandler(OpenKeySession handler) { public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler; this.openHandler = handler;
@ -325,9 +332,19 @@ public Builder setRequestID(String id) {
return this; return this;
} }
public Builder setType(ReplicationType type) {
this.type = type;
return this;
}
public Builder setFactor(ReplicationFactor replicationFactor) {
this.factor = replicationFactor;
return this;
}
public ChunkGroupOutputStream build() throws IOException { public ChunkGroupOutputStream build() throws IOException {
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
ksmClient, chunkSize, requestID); ksmClient, chunkSize, requestID, factor, type);
} }
} }

View File

@ -461,6 +461,8 @@ public OzoneOutputStream createKey(
.setKsmClient(keySpaceManagerClient) .setKsmClient(keySpaceManagerClient)
.setChunkSize(chunkSize) .setChunkSize(chunkSize)
.setRequestID(requestId) .setRequestID(requestId)
.setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
.setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
.build(); .build();
return new OzoneOutputStream(groupOutputStream); return new OzoneOutputStream(groupOutputStream);
} }

View File

@ -101,13 +101,13 @@ public Builder setDataSize(long size) {
return this; return this;
} }
public Builder setType(ReplicationType type) { public Builder setType(ReplicationType replicationType) {
this.type = type; this.type = replicationType;
return this; return this;
} }
public Builder setFactor(ReplicationFactor factor) { public Builder setFactor(ReplicationFactor replicationFactor) {
this.factor = factor; this.factor = replicationFactor;
return this; return this;
} }

View File

@ -520,6 +520,8 @@ public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
KeyArgs.Builder keyArgs = KeyArgs.newBuilder() KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName()) .setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName()) .setBucketName(args.getBucketName())
.setFactor(args.getFactor())
.setType(args.getType())
.setKeyName(args.getKeyName()); .setKeyName(args.getKeyName());
if (args.getDataSize() > 0) { if (args.getDataSize() > 0) {
keyArgs.setDataSize(args.getDataSize()); keyArgs.setDataSize(args.getDataSize());
@ -547,6 +549,8 @@ public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
.setVolumeName(args.getVolumeName()) .setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName()) .setBucketName(args.getBucketName())
.setKeyName(args.getKeyName()) .setKeyName(args.getKeyName())
.setFactor(args.getFactor())
.setType(args.getType())
.setDataSize(args.getDataSize()).build(); .setDataSize(args.getDataSize()).build();
req.setKeyArgs(keyArgs); req.setKeyArgs(keyArgs);
req.setClientID(clientID); req.setClientID(clientID);

View File

@ -21,7 +21,7 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
@ -39,7 +39,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.List; import java.util.List;
/** /**
@ -49,7 +48,7 @@ public class XceiverClient extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline; private final Pipeline pipeline;
private final Configuration config; private final Configuration config;
private ChannelFuture channelFuture; private Channel channel;
private Bootstrap b; private Bootstrap b;
private EventLoopGroup group; private EventLoopGroup group;
@ -70,9 +69,7 @@ public XceiverClient(Pipeline pipeline, Configuration config) {
@Override @Override
public void connect() throws Exception { public void connect() throws Exception {
if (channelFuture != null if (channel != null && channel.isActive()) {
&& channelFuture.channel() != null
&& channelFuture.channel().isActive()) {
throw new IOException("This client is already connected to a host."); throw new IOException("This client is already connected to a host.");
} }
@ -92,7 +89,7 @@ public void connect() throws Exception {
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
} }
LOG.debug("Connecting to server Port : " + port); LOG.debug("Connecting to server Port : " + port);
channelFuture = b.connect(leader.getHostName(), port).sync(); channel = b.connect(leader.getHostName(), port).sync().channel();
} }
/** /**
@ -102,17 +99,13 @@ public void connect() throws Exception {
*/ */
@VisibleForTesting @VisibleForTesting
public boolean isConnected() { public boolean isConnected() {
return channelFuture.channel().isActive(); return channel.isActive();
} }
@Override @Override
public void close() { public void close() {
if (group != null) { if (group != null) {
group.shutdownGracefully(0, 0, TimeUnit.SECONDS); group.shutdownGracefully().awaitUninterruptibly();
}
if (channelFuture != null) {
channelFuture.channel().close();
} }
} }
@ -126,11 +119,11 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request) ContainerProtos.ContainerCommandRequestProto request)
throws IOException { throws IOException {
try { try {
if ((channelFuture == null) || (!channelFuture.channel().isActive())) { if ((channel == null) || (!channel.isActive())) {
throw new IOException("This channel is not connected."); throw new IOException("This channel is not connected.");
} }
XceiverClientHandler handler = XceiverClientHandler handler =
channelFuture.channel().pipeline().get(XceiverClientHandler.class); channel.pipeline().get(XceiverClientHandler.class);
return handler.sendCommand(request); return handler.sendCommand(request);
} catch (ExecutionException | InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
@ -149,11 +142,11 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand(
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException { throws IOException, ExecutionException, InterruptedException {
if ((channelFuture == null) || (!channelFuture.channel().isActive())) { if ((channel == null) || (!channel.isActive())) {
throw new IOException("This channel is not connected."); throw new IOException("This channel is not connected.");
} }
XceiverClientHandler handler = XceiverClientHandler handler =
channelFuture.channel().pipeline().get(XceiverClientHandler.class); channel.pipeline().get(XceiverClientHandler.class);
return handler.sendCommandAsync(request); return handler.sendCommandAsync(request);
} }

View File

@ -199,6 +199,7 @@ public OzoneProtos.ReplicationFactor getFactor() {
*/ */
public OzoneProtos.ReplicationType getType() { public OzoneProtos.ReplicationType getType() {
// TODO : Fix me and make Ratis default before release. // TODO : Fix me and make Ratis default before release.
// TODO: Remove this as replication factor and type are pipeline properties
if(isUseRatis()) { if(isUseRatis()) {
return OzoneProtos.ReplicationType.RATIS; return OzoneProtos.ReplicationType.RATIS;
} }

View File

@ -28,6 +28,7 @@
import org.apache.ratis.RatisHelper; import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
@ -37,13 +38,11 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/** /**
* An abstract implementation of {@link XceiverClientSpi} using Ratis. * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@ -77,11 +76,10 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
*/ */
public void createPipeline(String clusterId, List<DatanodeID> datanodes) public void createPipeline(String clusterId, List<DatanodeID> datanodes)
throws IOException { throws IOException {
final List<RaftPeer> newPeers = datanodes.stream() RaftGroup group = RatisHelper.newRaftGroup(datanodes);
.map(RatisHelper::toRaftPeer) LOG.debug("initializing pipeline:{} with nodes:{}", clusterId,
.collect(Collectors.toList()); group.getPeers());
LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers); reinitialize(datanodes, group);
reinitialize(datanodes, newPeers);
} }
/** /**
@ -94,7 +92,7 @@ public OzoneProtos.ReplicationType getPipelineType() {
} }
private void reinitialize( private void reinitialize(
List<DatanodeID> datanodes, Collection<RaftPeer> newPeers) List<DatanodeID> datanodes, RaftGroup group)
throws IOException { throws IOException {
if (datanodes.isEmpty()) { if (datanodes.isEmpty()) {
return; return;
@ -103,7 +101,7 @@ private void reinitialize(
IOException exception = null; IOException exception = null;
for (DatanodeID d : datanodes) { for (DatanodeID d : datanodes) {
try { try {
reinitialize(d, newPeers); reinitialize(d, group);
} catch (IOException ioe) { } catch (IOException ioe) {
if (exception == null) { if (exception == null) {
exception = new IOException( exception = new IOException(
@ -121,14 +119,14 @@ private void reinitialize(
/** /**
* Adds a new peers to the Ratis Ring. * Adds a new peers to the Ratis Ring.
* @param datanode - new datanode * @param datanode - new datanode
* @param newPeers - Raft machines * @param group - Raft group
* @throws IOException - on Failure. * @throws IOException - on Failure.
*/ */
private void reinitialize(DatanodeID datanode, Collection<RaftPeer> newPeers) private void reinitialize(DatanodeID datanode, RaftGroup group)
throws IOException { throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(datanode); final RaftPeer p = RatisHelper.toRaftPeer(datanode);
try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
client.reinitialize(RatisHelper.newRaftGroup(newPeers), p.getId()); client.reinitialize(group, p.getId());
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ",
p, datanode, ioe); p, datanode, ioe);

View File

@ -20,6 +20,7 @@
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import java.io.Serializable; import java.io.Serializable;
@ -136,4 +137,11 @@ public String toString() {
public int compareTo(BlockContainerInfo o) { public int compareTo(BlockContainerInfo o) {
return this.compare(this, o); return this.compare(this, o);
} }
public boolean canAllocate(long size, long containerSize) {
//TODO: move container size inside Container Info
return ((getState() == OzoneProtos.LifeCycleState.ALLOCATED ||
getState() == OzoneProtos.LifeCycleState.OPEN) &&
(getAllocated() + size <= containerSize));
}
} }

View File

@ -28,10 +28,15 @@
import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.List;
import java.util.Collections;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -68,7 +73,8 @@ static <E extends DatanodeID> List<RaftPeer> toRaftPeers(List<E> datanodes) {
/* TODO: use a dummy id for all groups for the moment. /* TODO: use a dummy id for all groups for the moment.
* It should be changed to a unique id for each group. * It should be changed to a unique id for each group.
*/ */
RaftGroupId DUMMY_GROUP_ID = RaftGroupId.randomId(); RaftGroupId DUMMY_GROUP_ID =
RaftGroupId.valueOf(ByteString.copyFromUtf8("AOZONERATISGROUP"));
RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID, RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
Collections.emptyList()); Collections.emptyList());
@ -77,6 +83,13 @@ static RaftGroup emptyRaftGroup() {
return EMPTY_GROUP; return EMPTY_GROUP;
} }
static RaftGroup newRaftGroup(List<DatanodeID> datanodes) {
final List<RaftPeer> newPeers = datanodes.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
return RatisHelper.newRaftGroup(newPeers);
}
static RaftGroup newRaftGroup(Collection<RaftPeer> peers) { static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
return peers.isEmpty()? emptyRaftGroup() return peers.isEmpty()? emptyRaftGroup()
: new RaftGroup(DUMMY_GROUP_ID, peers); : new RaftGroup(DUMMY_GROUP_ID, peers);

View File

@ -410,8 +410,7 @@ public BlockContainerInfo getMatchingContainer(final long size,
while (iter.hasNext()) { while (iter.hasNext()) {
BlockContainerInfo info = iter.next(); BlockContainerInfo info = iter.next();
if (info.getAllocated() + size <= this.containerSize) { if (info.canAllocate(size, this.containerSize)) {
queue.remove(info); queue.remove(info);
info.addAllocated(size); info.addAllocated(size);
info.setLastUsed(Time.monotonicNow()); info.setLastUsed(Time.monotonicNow());
@ -419,10 +418,14 @@ public BlockContainerInfo getMatchingContainer(final long size,
return info; return info;
} else { } else {
// We should close this container. if (info.getState() != LifeCycleState.CLOSED) {
LOG.info("Moving {} to containerCloseQueue.", info.toString()); // We should close this container.
containerCloseQueue.add(info); LOG.info("Moving {} to containerCloseQueue.", info.toString());
//TODO: Next JIRA will handle these containers to close. info.setState(LifeCycleState.CLOSED);
containerCloseQueue.add(info);
//TODO: Next JIRA will handle these containers to close.
//TODO: move container to right queue
}
} }
} }

View File

@ -38,8 +38,6 @@
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.LifeCycleState.ALLOCATED;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.LifeCycleState.OPEN; .LifeCycleState.OPEN;
@ -121,7 +119,12 @@ public synchronized Pipeline getPipeline(String containerName,
.createPipeline(pipeline.getPipelineName(), pipeline.getMachines()); .createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
} }
} else { } else {
pipeline = findOpenPipeline(); Pipeline openPipeline = findOpenPipeline(replicationFactor);
if (openPipeline != null) {
// if an open pipeline is found use the same machines
pipeline = allocateRatisPipeline(openPipeline.getMachines(),
containerName, replicationFactor);
}
} }
if (pipeline == null) { if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find free nodes" + LOG.error("Get pipeline call failed. We are not able to find free nodes" +
@ -135,7 +138,7 @@ public synchronized Pipeline getPipeline(String containerName,
* *
* @return - Pipeline or null * @return - Pipeline or null
*/ */
Pipeline findOpenPipeline() { Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) {
Pipeline pipeline = null; Pipeline pipeline = null;
final int sentinal = -1; final int sentinal = -1;
if (activePipelines.size() == 0) { if (activePipelines.size() == 0) {
@ -149,7 +152,7 @@ Pipeline findOpenPipeline() {
Pipeline temp = Pipeline temp =
activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex); activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
// if we find an operational pipeline just return that. // if we find an operational pipeline just return that.
if (temp.getLifeCycleState() == OPEN) { if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) {
pipeline = temp; pipeline = temp;
break; break;
} }
@ -173,7 +176,7 @@ Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName,
String pipelineName = PREFIX + String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length()); UUID.randomUUID().toString().substring(PREFIX.length());
pipeline.setType(OzoneProtos.ReplicationType.RATIS); pipeline.setType(OzoneProtos.ReplicationType.RATIS);
pipeline.setLifeCycleState(ALLOCATED); pipeline.setLifeCycleState(OPEN);
pipeline.setFactor(factor); pipeline.setFactor(factor);
pipeline.setPipelineName(pipelineName); pipeline.setPipelineName(pipelineName);
pipeline.setContainerName(containerName); pipeline.setContainerName(containerName);
@ -210,10 +213,10 @@ private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor
Preconditions.checkNotNull(datanode); Preconditions.checkNotNull(datanode);
if (!ratisMembers.contains(datanode)) { if (!ratisMembers.contains(datanode)) {
newNodesList.add(datanode); newNodesList.add(datanode);
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.add(datanode);
if (newNodesList.size() == count) { if (newNodesList.size() == count) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new pipeline of size: {}", count); LOG.info("Allocating a new pipeline of size: {}", count);
return newNodesList; return newNodesList;
} }

View File

@ -416,6 +416,8 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
.setKsmClient(keySpaceManagerClient) .setKsmClient(keySpaceManagerClient)
.setChunkSize(chunkSize) .setChunkSize(chunkSize)
.setRequestID(args.getRequestID()) .setRequestID(args.getRequestID())
.setType(xceiverClientManager.getType())
.setFactor(xceiverClientManager.getFactor())
.build(); .build();
return new OzoneOutputStream(groupOutputStream); return new OzoneOutputStream(groupOutputStream);
} }

View File

@ -26,13 +26,18 @@
import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common import org.apache.hadoop.ozone.container.common
.statemachine.DatanodeStateMachine; .statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys; import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager; import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.ksm.protocolPB
.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.web.client.OzoneRestClient; import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB import org.apache.hadoop.scm.protocolPB
@ -252,6 +257,30 @@ public OzoneRestClient createOzoneRestClient() throws OzoneException {
Client.getRpcTimeout(conf))); Client.getRpcTimeout(conf)));
} }
/**
* Creates an RPC proxy connected to this cluster's KeySpaceManager
* for accessing Key Space Manager information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing Key Space Manager information
* @throws IOException if there is an I/O error
*/
public KeySpaceManagerProtocolClientSideTranslatorPB
createKeySpaceManagerClient() throws IOException {
long ksmVersion = RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
InetSocketAddress ksmAddress = OzoneClientUtils
.getKsmAddressForClients(conf);
LOG.info("Creating KeySpaceManager RPC client with address {}",
ksmAddress);
RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
ProtobufRpcEngine.class);
return new KeySpaceManagerProtocolClientSideTranslatorPB(
RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
ksmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
}
/** /**
* Waits for the Ozone cluster to be ready for processing requests. * Waits for the Ozone cluster to be ready for processing requests.
*/ */

View File

@ -38,7 +38,16 @@
import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.protocolPB.
KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.
StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -64,6 +73,10 @@ public class TestOzoneRpcClient {
private static MiniOzoneCluster cluster = null; private static MiniOzoneCluster cluster = null;
private static OzoneClient ozClient = null; private static OzoneClient ozClient = null;
private static ObjectStore store = null; private static ObjectStore store = null;
private static KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
/** /**
* Create a MiniOzoneCluster for testing. * Create a MiniOzoneCluster for testing.
@ -78,13 +91,16 @@ public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED); OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf) cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(5)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
conf.set("ozone.client.protocol", conf.set("ozone.client.protocol",
"org.apache.hadoop.ozone.client.rpc.RpcClient"); "org.apache.hadoop.ozone.client.rpc.RpcClient");
OzoneClientFactory.setConfiguration(conf); OzoneClientFactory.setConfiguration(conf);
ozClient = OzoneClientFactory.getClient(); ozClient = OzoneClientFactory.getClient();
store = ozClient.getObjectStore(); store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
keySpaceManagerClient = cluster.createKeySpaceManagerClient();
} }
@Test @Test
@ -360,6 +376,29 @@ public void testDeleteBucket()
volume.getBucket(bucketName); volume.getBucket(bucketName);
} }
private boolean verifyRatisReplication(String volumeName, String bucketName,
String keyName, ReplicationType type, ReplicationFactor factor)
throws IOException {
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.build();
OzoneProtos.ReplicationType replicationType =
OzoneProtos.ReplicationType.valueOf(type.toString());
OzoneProtos.ReplicationFactor replicationFactor =
OzoneProtos.ReplicationFactor.valueOf(factor.getValue());
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) {
Pipeline pipeline =
storageContainerLocationClient.getContainer(info.getContainerName());
if ((pipeline.getFactor() != replicationFactor) ||
(pipeline.getType() != replicationType)) {
return false;
}
}
return true;
}
@Test @Test
public void testPutKey() public void testPutKey()
@ -387,6 +426,80 @@ public void testPutKey()
OzoneInputStream is = bucket.readKey(keyName); OzoneInputStream is = bucket.readKey(keyName);
byte[] fileContent = new byte[value.getBytes().length]; byte[] fileContent = new byte[value.getBytes().length];
is.read(fileContent); is.read(fileContent);
Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
keyName, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE));
Assert.assertEquals(value, new String(fileContent));
Assert.assertTrue(key.getCreationTime() >= currentTime);
Assert.assertTrue(key.getModificationTime() >= currentTime);
}
}
@Test
public void testPutKeyRatisOneNode()
throws IOException, OzoneException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
long currentTime = Time.now();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
ReplicationFactor.ONE);
out.write(value.getBytes());
out.close();
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
OzoneInputStream is = bucket.readKey(keyName);
byte[] fileContent = new byte[value.getBytes().length];
is.read(fileContent);
is.close();
Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
keyName, ReplicationType.RATIS, ReplicationFactor.ONE));
Assert.assertEquals(value, new String(fileContent));
Assert.assertTrue(key.getCreationTime() >= currentTime);
Assert.assertTrue(key.getModificationTime() >= currentTime);
}
}
@Test
public void testPutKeyRatisThreeNodes()
throws IOException, OzoneException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
long currentTime = Time.now();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
ReplicationFactor.THREE);
out.write(value.getBytes());
out.close();
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
OzoneInputStream is = bucket.readKey(keyName);
byte[] fileContent = new byte[value.getBytes().length];
is.read(fileContent);
is.close();
Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
keyName, ReplicationType.RATIS,
ReplicationFactor.THREE));
Assert.assertEquals(value, new String(fileContent)); Assert.assertEquals(value, new String(fileContent));
Assert.assertTrue(key.getCreationTime() >= currentTime); Assert.assertTrue(key.getCreationTime() >= currentTime);
Assert.assertTrue(key.getModificationTime() >= currentTime); Assert.assertTrue(key.getModificationTime() >= currentTime);
@ -691,6 +804,15 @@ public static void shutdown() throws IOException {
if(ozClient != null) { if(ozClient != null) {
ozClient.close(); ozClient.close();
} }
if (storageContainerLocationClient != null) {
storageContainerLocationClient.close();
}
if (keySpaceManagerClient != null) {
keySpaceManagerClient.close();
}
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -117,22 +117,48 @@ public void validateWriteTest() throws Exception {
} }
@Test @Test
public void ratisTest() throws Exception { public void multiThread() throws Exception {
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();
args.add("-numOfVolumes"); args.add("-numOfVolumes");
args.add("10");
args.add("-numOfBuckets");
args.add("1"); args.add("1");
args.add("-numOfKeys");
args.add("10");
args.add("-numOfThread");
args.add("10");
args.add("-keySize");
args.add("10240");
Corona corona = new Corona(conf);
int res = ToolRunner.run(conf, corona,
args.toArray(new String[0]));
Assert.assertEquals(10, corona.getNumberOfVolumesCreated());
Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
Assert.assertEquals(100, corona.getNumberOfKeysAdded());
Assert.assertEquals(0, res);
}
@Test
public void ratisTest3() throws Exception {
List<String> args = new ArrayList<>();
args.add("-numOfVolumes");
args.add("10");
args.add("-numOfBuckets"); args.add("-numOfBuckets");
args.add("1"); args.add("1");
args.add("-numOfKeys"); args.add("-numOfKeys");
args.add("10"); args.add("10");
args.add("-ratis"); args.add("-ratis");
args.add("3"); args.add("3");
args.add("-numOfThread");
args.add("10");
args.add("-keySize");
args.add("10240");
Corona corona = new Corona(conf); Corona corona = new Corona(conf);
int res = ToolRunner.run(conf, corona, int res = ToolRunner.run(conf, corona,
args.toArray(new String[0])); args.toArray(new String[0]));
Assert.assertEquals(1, corona.getNumberOfVolumesCreated()); Assert.assertEquals(10, corona.getNumberOfVolumesCreated());
Assert.assertEquals(1, corona.getNumberOfBucketsCreated()); Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
Assert.assertEquals(10, corona.getNumberOfKeysAdded()); Assert.assertEquals(100, corona.getNumberOfKeysAdded());
Assert.assertEquals(0, res); Assert.assertEquals(0, res);
} }
} }