HDDS-676. Enable Read from open Containers via Standalone Protocol.
Contributed by Shashikant Banerjee.
This commit is contained in:
parent
e98a506227
commit
e3cca12048
@ -22,6 +22,7 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
|
||||
@ -40,6 +41,9 @@
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.UUID;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
|
||||
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
|
||||
private final Pipeline pipeline;
|
||||
private final Configuration config;
|
||||
private XceiverClientProtocolServiceStub asyncStub;
|
||||
private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
|
||||
private XceiverClientMetrics metrics;
|
||||
private ManagedChannel channel;
|
||||
private Map<UUID, ManagedChannel> channels;
|
||||
private final Semaphore semaphore;
|
||||
private boolean closed = false;
|
||||
|
||||
@ -72,46 +76,62 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
|
||||
this.semaphore =
|
||||
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
|
||||
this.metrics = XceiverClientManager.getXceiverClientMetrics();
|
||||
this.channels = new HashMap<>();
|
||||
this.asyncStubs = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect() throws Exception {
|
||||
DatanodeDetails leader = this.pipeline.getLeader();
|
||||
|
||||
// leader by default is the 1st datanode in the datanode list of pipleline
|
||||
DatanodeDetails leader = this.pipeline.getLeader();
|
||||
// just make a connection to the 1st datanode at the beginning
|
||||
connectToDatanode(leader);
|
||||
}
|
||||
|
||||
private void connectToDatanode(DatanodeDetails dn) {
|
||||
// read port from the data node, on failure use default configured
|
||||
// port.
|
||||
int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
|
||||
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
|
||||
if (port == 0) {
|
||||
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
||||
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
|
||||
}
|
||||
LOG.debug("Connecting to server Port : " + leader.getIpAddress());
|
||||
channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
|
||||
.usePlaintext()
|
||||
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
|
||||
.build();
|
||||
asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
|
||||
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
|
||||
ManagedChannel channel =
|
||||
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
|
||||
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
|
||||
.build();
|
||||
XceiverClientProtocolServiceStub asyncStub =
|
||||
XceiverClientProtocolServiceGrpc.newStub(channel);
|
||||
asyncStubs.put(dn.getUuid(), asyncStub);
|
||||
channels.put(dn.getUuid(), channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns if the xceiver client connects to a server.
|
||||
* Returns if the xceiver client connects to all servers in the pipeline.
|
||||
*
|
||||
* @return True if the connection is alive, false otherwise.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public boolean isConnected() {
|
||||
return !channel.isTerminated() && !channel.isShutdown();
|
||||
public boolean isConnected(DatanodeDetails details) {
|
||||
return isConnected(channels.get(details.getUuid()));
|
||||
}
|
||||
|
||||
private boolean isConnected(ManagedChannel channel) {
|
||||
return channel != null && !channel.isTerminated() && !channel.isShutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closed = true;
|
||||
channel.shutdownNow();
|
||||
try {
|
||||
channel.awaitTermination(60, TimeUnit.MINUTES);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unexpected exception while waiting for channel termination",
|
||||
e);
|
||||
for (ManagedChannel channel : channels.values()) {
|
||||
channel.shutdownNow();
|
||||
try {
|
||||
channel.awaitTermination(60, TimeUnit.MINUTES);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unexpected exception while waiting for channel termination",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,6 +140,56 @@ public Pipeline getPipeline() {
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerCommandResponseProto sendCommand(
|
||||
ContainerCommandRequestProto request) throws IOException {
|
||||
return sendCommandWithRetry(request);
|
||||
}
|
||||
|
||||
public ContainerCommandResponseProto sendCommandWithRetry(
|
||||
ContainerCommandRequestProto request) throws IOException {
|
||||
int size = pipeline.getMachines().size();
|
||||
ContainerCommandResponseProto responseProto = null;
|
||||
DatanodeDetails dn = null;
|
||||
|
||||
// In case of an exception or an error, we will try to read from the
|
||||
// datanodes in the pipeline in a round robin fashion.
|
||||
|
||||
// TODO: cache the correct leader info in here, so that any subsequent calls
|
||||
// should first go to leader
|
||||
for (int dnIndex = 0; dnIndex < size; dnIndex++) {
|
||||
try {
|
||||
dn = pipeline.getMachines().get(dnIndex);
|
||||
LOG.debug("Executing command " + request + " on datanode " + dn);
|
||||
// In case the command gets retried on a 2nd datanode,
|
||||
// sendCommandAsyncCall will create a new channel and async stub
|
||||
// in case these don't exist for the specific datanode.
|
||||
responseProto = sendCommandAsync(request, dn).get();
|
||||
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
|
||||
break;
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
LOG.warn("Failed to execute command " + request + " on datanode " + dn
|
||||
.getUuidString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
if (responseProto != null) {
|
||||
return responseProto;
|
||||
} else {
|
||||
throw new IOException(
|
||||
"Failed to execute command " + request + " on the pipeline "
|
||||
+ pipeline.getId());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: for a true async API, once the waitable future while executing
|
||||
// the command on one channel fails, it should be retried asynchronously
|
||||
// on the future Task for all the remaining datanodes.
|
||||
|
||||
// Note: this Async api is not used currently used in any active I/O path.
|
||||
// In case it gets used, the asynchronous retry logic needs to be plugged
|
||||
// in here.
|
||||
/**
|
||||
* Sends a given command to server gets a waitable future back.
|
||||
*
|
||||
@ -128,15 +198,25 @@ public Pipeline getPipeline() {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<ContainerCommandResponseProto>
|
||||
sendCommandAsync(ContainerCommandRequestProto request)
|
||||
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
|
||||
ContainerCommandRequestProto request)
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
if(closed){
|
||||
return sendCommandAsync(request, pipeline.getLeader());
|
||||
}
|
||||
|
||||
private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
|
||||
ContainerCommandRequestProto request, DatanodeDetails dn)
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
if (closed) {
|
||||
throw new IOException("This channel is not connected.");
|
||||
}
|
||||
|
||||
if(channel == null || !isConnected()) {
|
||||
reconnect();
|
||||
UUID dnId = dn.getUuid();
|
||||
ManagedChannel channel = channels.get(dnId);
|
||||
// If the channel doesn't exist for this specific datanode or the channel
|
||||
// is closed, just reconnect
|
||||
if (!isConnected(channel)) {
|
||||
reconnect(dn);
|
||||
}
|
||||
|
||||
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
|
||||
@ -145,48 +225,54 @@ public Pipeline getPipeline() {
|
||||
long requestTime = Time.monotonicNowNanos();
|
||||
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
|
||||
// create a new grpc stream for each non-async call.
|
||||
final StreamObserver<ContainerCommandRequestProto> requestObserver =
|
||||
asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
|
||||
@Override
|
||||
public void onNext(ContainerCommandResponseProto value) {
|
||||
replyFuture.complete(value);
|
||||
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
||||
metrics.addContainerOpsLatency(request.getCmdType(),
|
||||
Time.monotonicNowNanos() - requestTime);
|
||||
semaphore.release();
|
||||
}
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
replyFuture.completeExceptionally(t);
|
||||
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
||||
metrics.addContainerOpsLatency(request.getCmdType(),
|
||||
Time.monotonicNowNanos() - requestTime);
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!replyFuture.isDone()) {
|
||||
replyFuture.completeExceptionally(
|
||||
new IOException("Stream completed but no reply for request "
|
||||
+ request));
|
||||
}
|
||||
}
|
||||
});
|
||||
// TODO: for async calls, we should reuse StreamObserver resources.
|
||||
final StreamObserver<ContainerCommandRequestProto> requestObserver =
|
||||
asyncStubs.get(dnId)
|
||||
.send(new StreamObserver<ContainerCommandResponseProto>() {
|
||||
@Override
|
||||
public void onNext(ContainerCommandResponseProto value) {
|
||||
replyFuture.complete(value);
|
||||
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
||||
metrics.addContainerOpsLatency(request.getCmdType(),
|
||||
Time.monotonicNowNanos() - requestTime);
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
replyFuture.completeExceptionally(t);
|
||||
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
||||
metrics.addContainerOpsLatency(request.getCmdType(),
|
||||
Time.monotonicNowNanos() - requestTime);
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!replyFuture.isDone()) {
|
||||
replyFuture.completeExceptionally(new IOException(
|
||||
"Stream completed but no reply for request " + request));
|
||||
}
|
||||
}
|
||||
});
|
||||
requestObserver.onNext(request);
|
||||
requestObserver.onCompleted();
|
||||
return replyFuture;
|
||||
}
|
||||
|
||||
private void reconnect() throws IOException {
|
||||
private void reconnect(DatanodeDetails dn)
|
||||
throws IOException {
|
||||
ManagedChannel channel;
|
||||
try {
|
||||
connect();
|
||||
connectToDatanode(dn);
|
||||
channel = channels.get(dn.getUuid());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error while connecting: ", e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (channel == null || !isConnected()) {
|
||||
if (channel == null || !isConnected(channel)) {
|
||||
throw new IOException("This channel is not connected.");
|
||||
}
|
||||
}
|
||||
|
@ -27,7 +27,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
@ -59,7 +58,7 @@ public class XceiverClientManager implements Closeable {
|
||||
|
||||
//TODO : change this to SCM configuration class
|
||||
private final Configuration conf;
|
||||
private final Cache<PipelineID, XceiverClientSpi> clientCache;
|
||||
private final Cache<String, XceiverClientSpi> clientCache;
|
||||
private final boolean useRatis;
|
||||
|
||||
private static XceiverClientMetrics metrics;
|
||||
@ -83,10 +82,10 @@ public XceiverClientManager(Configuration conf) {
|
||||
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
|
||||
.maximumSize(maxSize)
|
||||
.removalListener(
|
||||
new RemovalListener<PipelineID, XceiverClientSpi>() {
|
||||
new RemovalListener<String, XceiverClientSpi>() {
|
||||
@Override
|
||||
public void onRemoval(
|
||||
RemovalNotification<PipelineID, XceiverClientSpi>
|
||||
RemovalNotification<String, XceiverClientSpi>
|
||||
removalNotification) {
|
||||
synchronized (clientCache) {
|
||||
// Mark the entry as evicted
|
||||
@ -98,7 +97,7 @@ public void onRemoval(
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Cache<PipelineID, XceiverClientSpi> getClientCache() {
|
||||
public Cache<String, XceiverClientSpi> getClientCache() {
|
||||
return clientCache;
|
||||
}
|
||||
|
||||
@ -140,13 +139,14 @@ public void releaseClient(XceiverClientSpi client) {
|
||||
|
||||
private XceiverClientSpi getClient(Pipeline pipeline)
|
||||
throws IOException {
|
||||
HddsProtos.ReplicationType type = pipeline.getType();
|
||||
try {
|
||||
return clientCache.get(pipeline.getId(),
|
||||
return clientCache.get(pipeline.getId().getId().toString() + type,
|
||||
new Callable<XceiverClientSpi>() {
|
||||
@Override
|
||||
public XceiverClientSpi call() throws Exception {
|
||||
XceiverClientSpi client = null;
|
||||
switch (pipeline.getType()) {
|
||||
switch (type) {
|
||||
case RATIS:
|
||||
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
|
||||
break;
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.ratis.proto.RaftProtos;
|
||||
import org.apache.ratis.retry.RetryPolicy;
|
||||
import org.apache.ratis.thirdparty.com.google.protobuf
|
||||
.InvalidProtocolBufferException;
|
||||
@ -52,6 +53,7 @@
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
@ -209,6 +211,10 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
|
||||
getClient().sendAsync(() -> byteString);
|
||||
}
|
||||
|
||||
public void watchForCommit(long index, long timeout) throws Exception {
|
||||
getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED)
|
||||
.get(timeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
/**
|
||||
* Sends a given command to server gets a waitable future back.
|
||||
*
|
||||
|
@ -299,6 +299,10 @@ public String toString() {
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
public void setType(HddsProtos.ReplicationType type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a JSON string of this object.
|
||||
*
|
||||
|
@ -22,7 +22,9 @@
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
@ -276,8 +278,13 @@ public static LengthInputStream getFromOmKeyInfo(
|
||||
long containerID = blockID.getContainerID();
|
||||
ContainerWithPipeline containerWithPipeline =
|
||||
storageContainerLocationClient.getContainerWithPipeline(containerID);
|
||||
Pipeline pipeline = containerWithPipeline.getPipeline();
|
||||
|
||||
// irrespective of the container state, we will always read via Standalone
|
||||
// protocol.
|
||||
pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE);
|
||||
XceiverClientSpi xceiverClient = xceiverClientManager
|
||||
.acquireClient(containerWithPipeline.getPipeline());
|
||||
.acquireClient(pipeline);
|
||||
boolean success = false;
|
||||
containerKey = omKeyLocationInfo.getLocalID();
|
||||
try {
|
||||
|
@ -116,6 +116,10 @@ public void addStream(OutputStream outputStream, long length) {
|
||||
public List<ChunkOutputStreamEntry> getStreamEntries() {
|
||||
return streamEntries;
|
||||
}
|
||||
@VisibleForTesting
|
||||
public XceiverClientManager getXceiverClientManager() {
|
||||
return xceiverClientManager;
|
||||
}
|
||||
|
||||
public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
|
||||
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
|
||||
|
@ -102,7 +102,7 @@ public void testStartMultipleDatanodes() throws Exception {
|
||||
// Verify client is able to connect to the container
|
||||
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
|
||||
client.connect();
|
||||
assertTrue(client.isConnected());
|
||||
assertTrue(client.isConnected(pipeline.getLeader()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,11 @@
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.*;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
@ -32,6 +37,7 @@
|
||||
import org.apache.hadoop.hdds.client.OzoneQuota;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||
@ -597,6 +603,108 @@ public void testPutKeyRatisThreeNodes()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutKeyAndGetKeyThreeNodes()
|
||||
throws Exception {
|
||||
String volumeName = UUID.randomUUID().toString();
|
||||
String bucketName = UUID.randomUUID().toString();
|
||||
|
||||
String value = "sample value";
|
||||
store.createVolume(volumeName);
|
||||
OzoneVolume volume = store.getVolume(volumeName);
|
||||
volume.createBucket(bucketName);
|
||||
OzoneBucket bucket = volume.getBucket(bucketName);
|
||||
|
||||
String keyName = UUID.randomUUID().toString();
|
||||
|
||||
OzoneOutputStream out = bucket
|
||||
.createKey(keyName, value.getBytes().length, ReplicationType.RATIS,
|
||||
ReplicationFactor.THREE);
|
||||
ChunkGroupOutputStream groupOutputStream =
|
||||
(ChunkGroupOutputStream) out.getOutputStream();
|
||||
XceiverClientManager manager = groupOutputStream.getXceiverClientManager();
|
||||
out.write(value.getBytes());
|
||||
out.close();
|
||||
// First, confirm the key info from the client matches the info in OM.
|
||||
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
|
||||
builder.setVolumeName(volumeName).setBucketName(bucketName)
|
||||
.setKeyName(keyName);
|
||||
OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()).
|
||||
getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0);
|
||||
long containerID = keyInfo.getContainerID();
|
||||
long localID = keyInfo.getLocalID();
|
||||
OzoneKeyDetails keyDetails = bucket.getKey(keyName);
|
||||
Assert.assertEquals(keyName, keyDetails.getName());
|
||||
|
||||
List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations();
|
||||
Assert.assertEquals(1, keyLocations.size());
|
||||
Assert.assertEquals(containerID, keyLocations.get(0).getContainerID());
|
||||
Assert.assertEquals(localID, keyLocations.get(0).getLocalID());
|
||||
|
||||
// Make sure that the data size matched.
|
||||
Assert
|
||||
.assertEquals(value.getBytes().length, keyLocations.get(0).getLength());
|
||||
|
||||
ContainerWithPipeline container =
|
||||
cluster.getStorageContainerManager().getContainerManager()
|
||||
.getContainerWithPipeline(new ContainerID(containerID));
|
||||
Pipeline pipeline = container.getPipeline();
|
||||
List<DatanodeDetails> datanodes = pipeline.getMachines();
|
||||
|
||||
DatanodeDetails datanodeDetails = datanodes.get(0);
|
||||
Assert.assertNotNull(datanodeDetails);
|
||||
|
||||
XceiverClientSpi clientSpi = manager.acquireClient(pipeline);
|
||||
Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
|
||||
XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
|
||||
|
||||
ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
|
||||
// shutdown the datanode
|
||||
cluster.shutdownHddsDatanode(datanodeDetails);
|
||||
|
||||
Assert.assertTrue(container.getContainerInfo().getState()
|
||||
== HddsProtos.LifeCycleState.OPEN);
|
||||
// try to read, this shouls be successful
|
||||
readKey(bucket, keyName, value);
|
||||
|
||||
Assert.assertTrue(container.getContainerInfo().getState()
|
||||
== HddsProtos.LifeCycleState.OPEN);
|
||||
// shutdown the second datanode
|
||||
datanodeDetails = datanodes.get(1);
|
||||
cluster.shutdownHddsDatanode(datanodeDetails);
|
||||
Assert.assertTrue(container.getContainerInfo().getState()
|
||||
== HddsProtos.LifeCycleState.OPEN);
|
||||
|
||||
// the container is open and with loss of 2 nodes we still should be able
|
||||
// to read via Standalone protocol
|
||||
// try to read
|
||||
readKey(bucket, keyName, value);
|
||||
|
||||
// shutdown the 3rd datanode
|
||||
datanodeDetails = datanodes.get(2);
|
||||
cluster.shutdownHddsDatanode(datanodeDetails);
|
||||
try {
|
||||
// try to read
|
||||
readKey(bucket, keyName, value);
|
||||
Assert.fail("Expected exception not thrown");
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("Failed to execute command"));
|
||||
Assert.assertTrue(
|
||||
e.getMessage().contains("on the pipeline " + pipeline.getId()));
|
||||
}
|
||||
manager.releaseClient(clientSpi);
|
||||
}
|
||||
|
||||
private void readKey(OzoneBucket bucket, String keyName, String data)
|
||||
throws IOException {
|
||||
OzoneKey key = bucket.getKey(keyName);
|
||||
Assert.assertEquals(keyName, key.getName());
|
||||
OzoneInputStream is = bucket.readKey(keyName);
|
||||
byte[] fileContent = new byte[data.getBytes().length];
|
||||
is.read(fileContent);
|
||||
is.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyDetails() throws IOException, OzoneException {
|
||||
String volumeName = UUID.randomUUID().toString();
|
||||
|
@ -20,7 +20,6 @@
|
||||
import com.google.common.cache.Cache;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
@ -107,7 +106,7 @@ public void testFreeByReference() throws IOException {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
|
||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
||||
Cache<PipelineID, XceiverClientSpi> cache =
|
||||
Cache<String, XceiverClientSpi> cache =
|
||||
clientManager.getClientCache();
|
||||
|
||||
ContainerWithPipeline container1 =
|
||||
@ -130,8 +129,9 @@ public void testFreeByReference() throws IOException {
|
||||
Assert.assertNotEquals(client1, client2);
|
||||
|
||||
// least recent container (i.e containerName1) is evicted
|
||||
XceiverClientSpi nonExistent1 = cache
|
||||
.getIfPresent(container1.getContainerInfo().getPipelineID());
|
||||
XceiverClientSpi nonExistent1 = cache.getIfPresent(
|
||||
container1.getContainerInfo().getPipelineID().getId().toString()
|
||||
+ container1.getContainerInfo().getReplicationType());
|
||||
Assert.assertEquals(null, nonExistent1);
|
||||
// However container call should succeed because of refcount on the client.
|
||||
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
|
||||
@ -160,7 +160,7 @@ public void testFreeByEviction() throws IOException {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
|
||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
||||
Cache<PipelineID, XceiverClientSpi> cache =
|
||||
Cache<String, XceiverClientSpi> cache =
|
||||
clientManager.getClientCache();
|
||||
|
||||
ContainerWithPipeline container1 =
|
||||
@ -183,8 +183,9 @@ public void testFreeByEviction() throws IOException {
|
||||
Assert.assertNotEquals(client1, client2);
|
||||
|
||||
// now client 1 should be evicted
|
||||
XceiverClientSpi nonExistent = cache
|
||||
.getIfPresent(container1.getContainerInfo().getPipelineID());
|
||||
XceiverClientSpi nonExistent = cache.getIfPresent(
|
||||
container1.getContainerInfo().getPipelineID().getId().toString()
|
||||
+ container1.getContainerInfo().getReplicationType());
|
||||
Assert.assertEquals(null, nonExistent);
|
||||
|
||||
// Any container operation should now fail
|
||||
|
Loading…
Reference in New Issue
Block a user