diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2f118727cc..d1e7e8c516 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -40,6 +41,9 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.UUID; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final Configuration config; - private XceiverClientProtocolServiceStub asyncStub; + private Map asyncStubs; private XceiverClientMetrics metrics; - private ManagedChannel channel; + private Map channels; private final Semaphore semaphore; private boolean closed = false; @@ -72,46 +76,62 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); + this.channels = new HashMap<>(); + this.asyncStubs = new HashMap<>(); } @Override public void connect() throws Exception { - DatanodeDetails leader = this.pipeline.getLeader(); + // leader by default is the 1st datanode in the datanode list of pipleline + DatanodeDetails leader = this.pipeline.getLeader(); + // just make a connection to the 1st datanode at the beginning + connectToDatanode(leader); + } + + private void connectToDatanode(DatanodeDetails dn) { // read port from the data node, on failure use default configured // port. - int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); + int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); if (port == 0) { port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } - LOG.debug("Connecting to server Port : " + leader.getIpAddress()); - channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) - .usePlaintext() - .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) - .build(); - asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); + LOG.debug("Connecting to server Port : " + dn.getIpAddress()); + ManagedChannel channel = + NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .build(); + XceiverClientProtocolServiceStub asyncStub = + XceiverClientProtocolServiceGrpc.newStub(channel); + asyncStubs.put(dn.getUuid(), asyncStub); + channels.put(dn.getUuid(), channel); } - /** - * Returns if the xceiver client connects to a server. + * Returns if the xceiver client connects to all servers in the pipeline. * * @return True if the connection is alive, false otherwise. */ @VisibleForTesting - public boolean isConnected() { - return !channel.isTerminated() && !channel.isShutdown(); + public boolean isConnected(DatanodeDetails details) { + return isConnected(channels.get(details.getUuid())); + } + + private boolean isConnected(ManagedChannel channel) { + return channel != null && !channel.isTerminated() && !channel.isShutdown(); } @Override public void close() { closed = true; - channel.shutdownNow(); - try { - channel.awaitTermination(60, TimeUnit.MINUTES); - } catch (Exception e) { - LOG.error("Unexpected exception while waiting for channel termination", - e); + for (ManagedChannel channel : channels.values()) { + channel.shutdownNow(); + try { + channel.awaitTermination(60, TimeUnit.MINUTES); + } catch (Exception e) { + LOG.error("Unexpected exception while waiting for channel termination", + e); + } } } @@ -120,6 +140,51 @@ public Pipeline getPipeline() { return pipeline; } + @Override + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + int size = pipeline.getMachines().size(); + ContainerCommandResponseProto responseProto = null; + int dnIndex = 0; + + // In case of an exception or an error, we will try to read from the + // datanodes in the pipeline in a round robin fashion. + + // TODO: cache the correct leader info in here, so that any subsequent calls + // should first go to leader + for (DatanodeDetails dn : pipeline.getMachines()) { + try { + + // In case the command gets retried on a 2nd datanode, + // sendCommandAsyncCall will create a new channel and async stub + // in case these don't exist for the specific datanode. + responseProto = + sendCommandAsync(request, dn).get(); + dnIndex++; + if (responseProto.getResult() == ContainerProtos.Result.SUCCESS + || dnIndex == size) { + return responseProto; + } + } catch (ExecutionException | InterruptedException e) { + if (dnIndex < size) { + LOG.warn( + "Failed to execute command " + request + " on datanode " + dn + .getUuidString() +". Retrying", e); + } else { + throw new IOException("Failed to execute command " + request, e); + } + } + } + return responseProto; + } + + // TODO: for a true async API, once the waitable future while executing + // the command on one channel fails, it should be retried asynchronously + // on the future Task for all the remaining datanodes. + + // Note: this Async api is not used currently used in any active I/O path. + // In case it gets used, the asynchronous retry logic needs to be plugged + // in here. /** * Sends a given command to server gets a waitable future back. * @@ -128,15 +193,25 @@ public Pipeline getPipeline() { * @throws IOException */ @Override - public CompletableFuture - sendCommandAsync(ContainerCommandRequestProto request) + public CompletableFuture sendCommandAsync( + ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - if(closed){ + return sendCommandAsync(request, pipeline.getLeader()); + } + + private CompletableFuture sendCommandAsync( + ContainerCommandRequestProto request, DatanodeDetails dn) + throws IOException, ExecutionException, InterruptedException { + if (closed) { throw new IOException("This channel is not connected."); } - if(channel == null || !isConnected()) { - reconnect(); + UUID dnId = dn.getUuid(); + ManagedChannel channel = channels.get(dnId); + // If the channel doesn't exist for this specific datanode or the channel + // is closed, just reconnect + if (!isConnected(channel)) { + reconnect(dn); } final CompletableFuture replyFuture = @@ -145,48 +220,54 @@ public Pipeline getPipeline() { long requestTime = Time.monotonicNowNanos(); metrics.incrPendingContainerOpsMetrics(request.getCmdType()); // create a new grpc stream for each non-async call. - final StreamObserver requestObserver = - asyncStub.send(new StreamObserver() { - @Override - public void onNext(ContainerCommandResponseProto value) { - replyFuture.complete(value); - metrics.decrPendingContainerOpsMetrics(request.getCmdType()); - metrics.addContainerOpsLatency(request.getCmdType(), - Time.monotonicNowNanos() - requestTime); - semaphore.release(); - } - @Override - public void onError(Throwable t) { - replyFuture.completeExceptionally(t); - metrics.decrPendingContainerOpsMetrics(request.getCmdType()); - metrics.addContainerOpsLatency(request.getCmdType(), - Time.monotonicNowNanos() - requestTime); - semaphore.release(); - } - @Override - public void onCompleted() { - if (!replyFuture.isDone()) { - replyFuture.completeExceptionally( - new IOException("Stream completed but no reply for request " - + request)); - } - } - }); + // TODO: for async calls, we should reuse StreamObserver resources. + final StreamObserver requestObserver = + asyncStubs.get(dnId) + .send(new StreamObserver() { + @Override + public void onNext(ContainerCommandResponseProto value) { + replyFuture.complete(value); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onError(Throwable t) { + replyFuture.completeExceptionally(t); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally(new IOException( + "Stream completed but no reply for request " + request)); + } + } + }); requestObserver.onNext(request); requestObserver.onCompleted(); return replyFuture; } - private void reconnect() throws IOException { + private void reconnect(DatanodeDetails dn) + throws IOException { + ManagedChannel channel; try { - connect(); + connectToDatanode(dn); + channel = channels.get(dn.getUuid()); } catch (Exception e) { LOG.error("Error while connecting: ", e); throw new IOException(e); } - if (channel == null || !isConnected()) { + if (channel == null || !isConnected(channel)) { throw new IOException("This channel is not connected."); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index d542abc9b2..83b5a4c0e4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import java.io.Closeable; import java.io.IOException; @@ -59,7 +58,7 @@ public class XceiverClientManager implements Closeable { //TODO : change this to SCM configuration class private final Configuration conf; - private final Cache clientCache; + private final Cache clientCache; private final boolean useRatis; private static XceiverClientMetrics metrics; @@ -83,10 +82,10 @@ public XceiverClientManager(Configuration conf) { .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .maximumSize(maxSize) .removalListener( - new RemovalListener() { + new RemovalListener() { @Override public void onRemoval( - RemovalNotification + RemovalNotification removalNotification) { synchronized (clientCache) { // Mark the entry as evicted @@ -98,7 +97,7 @@ public void onRemoval( } @VisibleForTesting - public Cache getClientCache() { + public Cache getClientCache() { return clientCache; } @@ -140,13 +139,14 @@ public void releaseClient(XceiverClientSpi client) { private XceiverClientSpi getClient(Pipeline pipeline) throws IOException { + HddsProtos.ReplicationType type = pipeline.getType(); try { - return clientCache.get(pipeline.getId(), + return clientCache.get(pipeline.getId().getId().toString() + type, new Callable() { @Override public XceiverClientSpi call() throws Exception { XceiverClientSpi client = null; - switch (pipeline.getType()) { + switch (type) { case RATIS: client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); break; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 45e9d6eda3..fa98142fd5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdds.scm; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.MultipleIOException; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; @@ -52,6 +54,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -209,6 +212,11 @@ private CompletableFuture sendRequestAsync( getClient().sendAsync(() -> byteString); } + @VisibleForTesting + public void watchForCommit(long index, long timeout) throws Exception { + getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED) + .get(timeout, TimeUnit.MILLISECONDS); + } /** * Sends a given command to server gets a waitable future back. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index c36ca1f934..b0817f70ea 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -299,6 +299,10 @@ public String toString() { return b.toString(); } + public void setType(HddsProtos.ReplicationType type) { + this.type = type; + } + /** * Returns a JSON string of this object. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 278b129d96..27f41f804a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -81,14 +81,17 @@ private ContainerProtocolCalls() { * @param xceiverClient client to perform call * @param datanodeBlockID blockID to identify container * @param traceID container protocol call args + * @param blockCommitSequenceId latest commit Id of the block * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID, String traceID) throws IOException { + DatanodeBlockID datanodeBlockID, String traceID, + long blockCommitSequenceId) throws IOException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() - .setBlockID(datanodeBlockID); + .setBlockID(datanodeBlockID) + .setBlockCommitSequenceId(blockCommitSequenceId); String id = xceiverClient.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto @@ -388,7 +391,9 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, BlockID blockID, String traceID) throws IOException { GetBlockRequestProto.Builder getBlock = GetBlockRequestProto .newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()); + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + // by default, set the bcsId to be 0 + .setBlockCommitSequenceId(0); ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = GetSmallFileRequestProto .newBuilder().setBlock(getBlock) diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index da55db3e22..8421ef2dd5 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -140,6 +140,8 @@ enum Result { UNKNOWN_CONTAINER_TYPE = 34; BLOCK_NOT_COMMITTED = 35; CONTAINER_UNHEALTHY = 36; + UNKNOWN_BCSID = 37; + BCSID_MISMATCH = 38; } /** @@ -315,6 +317,7 @@ message PutBlockResponseProto { message GetBlockRequestProto { required DatanodeBlockID blockID = 1; + required uint64 blockCommitSequenceId = 2; } message GetBlockResponseProto { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index da77f1c5cd..004d643be0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -483,7 +483,8 @@ ContainerCommandResponseProto handleGetBlock( try { BlockID blockID = BlockID.getFromProtobuf( request.getGetBlock().getBlockID()); - responseData = blockManager.getBlock(kvContainer, blockID); + responseData = blockManager.getBlock(kvContainer, blockID, + request.getGetBlock().getBlockCommitSequenceId()); long numBytes = responseData.getProtoBufMessage().toByteArray().length; metrics.incContainerBytesStats(Type.GetBlock, numBytes); @@ -755,7 +756,8 @@ ContainerCommandResponseProto handleGetSmallFile( try { BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() .getBlockID()); - BlockData responseData = blockManager.getBlock(kvContainer, blockID); + BlockData responseData = blockManager.getBlock(kvContainer, blockID, + getSmallFileReq.getBlock().getBlockCommitSequenceId()); ContainerProtos.ChunkInfo chunkInfo = null; ByteString dataBuf = ByteString.EMPTY; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 67cda9f5ac..082ed677fe 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -45,7 +45,8 @@ import java.util.Map; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; - +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; /** * This class is for performing block related operations on the KeyValue * Container. @@ -68,6 +69,12 @@ public BlockManagerImpl(Configuration conf) { this.config = conf; } + private long getBlockCommitSequenceId(MetadataStore db) + throws IOException { + byte[] bscId = db.get(blockCommitSequenceIdKey); + return bscId == null ? 0 : Longs.fromByteArray(bscId); + } + /** * Puts or overwrites a block. * @@ -91,21 +98,19 @@ public long putBlock(Container container, BlockData data) throws IOException { Preconditions.checkNotNull(db, "DB cannot be null here"); long blockCommitSequenceId = data.getBlockCommitSequenceId(); - byte[] blockCommitSequenceIdValue = db.get(blockCommitSequenceIdKey); + long blockCommitSequenceIdValue = getBlockCommitSequenceId(db); // default blockCommitSequenceId for any block is 0. It the putBlock // request is not coming via Ratis(for test scenarios), it will be 0. // In such cases, we should overwrite the block as well - if (blockCommitSequenceIdValue != null && blockCommitSequenceId != 0) { - if (blockCommitSequenceId <= Longs - .fromByteArray(blockCommitSequenceIdValue)) { + if (blockCommitSequenceId != 0) { + if (blockCommitSequenceId <= blockCommitSequenceIdValue) { // Since the blockCommitSequenceId stored in the db is greater than // equal to blockCommitSequenceId to be updated, it means the putBlock // transaction is reapplied in the ContainerStateMachine on restart. // It also implies that the given block must already exist in the db. // just log and return - LOG.warn("blockCommitSequenceId " + Longs - .fromByteArray(blockCommitSequenceIdValue) + LOG.warn("blockCommitSequenceId " + blockCommitSequenceIdValue + " in the Container Db is greater than" + " the supplied value " + blockCommitSequenceId + " .Ignoring it"); return data.getSize(); @@ -129,10 +134,12 @@ public long putBlock(Container container, BlockData data) throws IOException { * * @param container - Container from which block need to be fetched. * @param blockID - BlockID of the block. + * @param bcsId latest commit Id of the block * @return Key Data. * @throws IOException */ - public BlockData getBlock(Container container, BlockID blockID) + @Override + public BlockData getBlock(Container container, BlockID blockID, long bcsId) throws IOException { Preconditions.checkNotNull(blockID, "BlockID cannot be null in GetBlock request"); @@ -145,6 +152,14 @@ public BlockData getBlock(Container container, BlockID blockID) // This is a post condition that acts as a hint to the user. // Should never fail. Preconditions.checkNotNull(db, "DB cannot be null here"); + + long containerBCSId = getBlockCommitSequenceId(db); + if (containerBCSId < bcsId) { + throw new StorageContainerException( + "Unable to find the block with bcsID " + bcsId + " .Container " + + container.getContainerData().getContainerID() + " bcsId is " + + containerBCSId + ".", UNKNOWN_BCSID); + } byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID())); if (kData == null) { throw new StorageContainerException("Unable to find the block.", @@ -152,6 +167,12 @@ public BlockData getBlock(Container container, BlockID blockID) } ContainerProtos.BlockData blockData = ContainerProtos.BlockData.parseFrom(kData); + long id = blockData.getBlockCommitSequenceId(); + if (id != bcsId) { + throw new StorageContainerException( + "bcsId " + bcsId + " mismatches with existing block Id " + + id + " for block " + blockID + ".", BCSID_MISMATCH); + } return BlockData.getFromProtoBuf(blockData); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 35ed22a6c4..8c865835b4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -45,10 +45,12 @@ public interface BlockManager { * * @param container - Container from which block need to be get. * @param blockID - BlockID of the Block. + * @param bcsId latest commit id of the block * @return Block Data. * @throws IOException */ - BlockData getBlock(Container container, BlockID blockID) throws IOException; + BlockData getBlock(Container container, BlockID blockID, long bcsId) + throws IOException; /** * Deletes an existing block. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java index 6fe6d81ee4..65477d8135 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java @@ -113,7 +113,7 @@ public void testPutAndGetBlock() throws Exception { assertEquals(1, keyValueContainer.getContainerData().getKeyCount()); //Get Block BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer, - blockData.getBlockID()); + blockData.getBlockID(), 0); assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID()); assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID()); @@ -139,7 +139,7 @@ public void testDeleteBlock() throws Exception { assertEquals(0, keyValueContainer.getContainerData().getKeyCount()); try { - blockManager.getBlock(keyValueContainer, blockID); + blockManager.getBlock(keyValueContainer, blockID, 0); fail("testDeleteBlock"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains( @@ -197,7 +197,7 @@ public void testGetNoSuchBlock() throws Exception { keyValueContainer.getContainerData().getKeyCount()); try { //Since the block has been deleted, we should not be able to find it - blockManager.getBlock(keyValueContainer, blockID); + blockManager.getBlock(keyValueContainer, blockID, 0); fail("testGetNoSuchBlock failed"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains( diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 125784c46c..3772c59418 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -22,7 +22,9 @@ import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -276,8 +278,13 @@ public static LengthInputStream getFromOmKeyInfo( long containerID = blockID.getContainerID(); ContainerWithPipeline containerWithPipeline = storageContainerLocationClient.getContainerWithPipeline(containerID); + Pipeline pipeline = containerWithPipeline.getPipeline(); + + // irrespective of the container state, we will always read via Standalone + // protocol. + pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE); XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(containerWithPipeline.getPipeline()); + .acquireClient(pipeline); boolean success = false; containerKey = omKeyLocationInfo.getLocalID(); try { @@ -287,7 +294,8 @@ public static LengthInputStream getFromOmKeyInfo( ContainerProtos.DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, requestId); + .getBlock(xceiverClient, datanodeBlockID, requestId, + omKeyLocationInfo.getBlockCommitSequenceId()); List chunks = response.getBlockData().getChunksList(); for (ContainerProtos.ChunkInfo chunk : chunks) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 9f46b2d009..6d13bb2b96 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -116,6 +116,10 @@ public void addStream(OutputStream outputStream, long length) { public List getStreamEntries() { return streamEntries; } + @VisibleForTesting + public XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } public List getLocationInfoList() throws IOException { List locationInfoList = new ArrayList<>(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 302ea465f0..bf6a18947a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -102,7 +102,7 @@ public void testStartMultipleDatanodes() throws Exception { // Verify client is able to connect to the container try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){ client.connect(); - assertTrue(client.isConnected()); + assertTrue(client.isConnected(pipeline.getLeader())); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 881c827432..deb55b4318 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -25,6 +25,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; + import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.*; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -32,6 +37,7 @@ import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -597,6 +603,106 @@ public void testPutKeyRatisThreeNodes() } } + @Test + public void testPutKeyAndGetKeyThreeNodes() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + long currentTime = Time.now(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String keyName = UUID.randomUUID().toString(); + + OzoneOutputStream out = bucket + .createKey(keyName, value.getBytes().length, ReplicationType.RATIS, + ReplicationFactor.THREE); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) out.getOutputStream(); + XceiverClientManager manager = groupOutputStream.getXceiverClientManager(); + out.write(value.getBytes()); + out.close(); + // First, confirm the key info from the client matches the info in OM. + OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); + builder.setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName); + OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()). + getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0); + long containerID = keyInfo.getContainerID(); + long localID = keyInfo.getLocalID(); + OzoneKeyDetails keyDetails = (OzoneKeyDetails) bucket.getKey(keyName); + Assert.assertEquals(keyName, keyDetails.getName()); + + List keyLocations = keyDetails.getOzoneKeyLocations(); + Assert.assertEquals(1, keyLocations.size()); + Assert.assertEquals(containerID, keyLocations.get(0).getContainerID()); + Assert.assertEquals(localID, keyLocations.get(0).getLocalID()); + + // Make sure that the data size matched. + Assert + .assertEquals(value.getBytes().length, keyLocations.get(0).getLength()); + + ContainerWithPipeline container = + cluster.getStorageContainerManager().getContainerManager() + .getContainerWithPipeline(new ContainerID(containerID)); + Pipeline pipeline = container.getPipeline(); + List datanodes = pipeline.getMachines(); + + DatanodeDetails datanodeDetails = datanodes.get(0); + Assert.assertNotNull(datanodeDetails); + + XceiverClientSpi clientSpi = manager.acquireClient(pipeline); + Assert.assertTrue(clientSpi instanceof XceiverClientRatis); + XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi; + + ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000); + // shutdown the datanode + cluster.shutdownHddsDatanode(datanodeDetails); + + Assert.assertTrue(container.getContainerInfo().getState() + == HddsProtos.LifeCycleState.OPEN); + // try to read, this shouls be successful + readKey(bucket, keyName, value); + + Assert.assertTrue(container.getContainerInfo().getState() + == HddsProtos.LifeCycleState.OPEN); + // shutdown the second datanode + datanodeDetails = datanodes.get(1); + cluster.shutdownHddsDatanode(datanodeDetails); + Assert.assertTrue(container.getContainerInfo().getState() + == HddsProtos.LifeCycleState.OPEN); + + // the container is open and with loss of 2 nodes we still should be able + // to read via Standalone protocol + // try to read + readKey(bucket, keyName, value); + + // shutdown the 3rd datanode + datanodeDetails = datanodes.get(2); + cluster.shutdownHddsDatanode(datanodeDetails); + try { + // try to read + readKey(bucket, keyName, value); + Assert.fail("Expected exception not thrown"); + } catch (Exception e) { + } + manager.releaseClient(clientSpi); + } + + private void readKey(OzoneBucket bucket, String keyName, String data) + throws IOException { + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + OzoneInputStream is = bucket.readKey(keyName); + byte[] fileContent = new byte[data.getBytes().length]; + is.read(fileContent); + is.close(); + } + @Test public void testGetKeyDetails() throws IOException, OzoneException { String volumeName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 324187c3a2..5886dc2ae0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -468,6 +468,7 @@ public static ContainerCommandRequestProto getBlockRequest( ContainerProtos.GetBlockRequestProto.Builder getRequest = ContainerProtos.GetBlockRequestProto.newBuilder(); getRequest.setBlockID(blockID); + getRequest.setBlockCommitSequenceId(0); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.newBuilder(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index 52cebb329b..d8a7d5304b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -151,7 +151,7 @@ public void testContainerReplication() throws Exception { .getHandler(ContainerType.KeyValueContainer); BlockData key = handler.getBlockManager() - .getBlock(container, BlockID.getFromProtobuf(blockID)); + .getBlock(container, BlockID.getFromProtobuf(blockID), 0); Assert.assertNotNull(key); Assert.assertEquals(1, key.getChunks().size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java index 92bad270f0..85148e18d1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java @@ -256,6 +256,6 @@ public void testCloseContainer() throws Exception { openContainerBlockMap.getBlockDataMap(testContainerID)); // Make sure the key got committed Assert.assertNotNull(handler.getBlockManager() - .getBlock(container, blockID)); + .getBlock(container, blockID, 0)); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index bea00fecaf..35f8286633 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -556,7 +556,7 @@ public void testPutBlock() throws IOException, NoSuchAlgorithmException { blockData.setChunks(chunkList); blockManager.putBlock(container, blockData); BlockData readBlockData = blockManager. - getBlock(container, blockData.getBlockID()); + getBlock(container, blockData.getBlockID(), 0); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); @@ -608,7 +608,7 @@ public void testPutBlockWithLotsOfChunks() throws IOException, blockData.setChunks(chunkProtoList); blockManager.putBlock(container, blockData); BlockData readBlockData = blockManager. - getBlock(container, blockData.getBlockID()); + getBlock(container, blockData.getBlockID(), 0); ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData @@ -636,7 +636,7 @@ public void testDeleteBlock() throws IOException, NoSuchAlgorithmException { blockManager.deleteBlock(container, blockID); exception.expect(StorageContainerException.class); exception.expectMessage("Unable to find the block."); - blockManager.getBlock(container, blockData.getBlockID()); + blockManager.getBlock(container, blockData.getBlockID(), 0); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index da445bfa9d..8b35bbbb18 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -20,7 +20,6 @@ import com.google.common.cache.Cache; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -107,7 +106,7 @@ public void testFreeByReference() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache cache = + Cache cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -130,8 +129,9 @@ public void testFreeByReference() throws IOException { Assert.assertNotEquals(client1, client2); // least recent container (i.e containerName1) is evicted - XceiverClientSpi nonExistent1 = cache - .getIfPresent(container1.getContainerInfo().getPipelineID()); + XceiverClientSpi nonExistent1 = cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType()); Assert.assertEquals(null, nonExistent1); // However container call should succeed because of refcount on the client. String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); @@ -160,7 +160,7 @@ public void testFreeByEviction() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache cache = + Cache cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -183,8 +183,9 @@ public void testFreeByEviction() throws IOException { Assert.assertNotEquals(client1, client2); // now client 1 should be evicted - XceiverClientSpi nonExistent = cache - .getIfPresent(container1.getContainerInfo().getPipelineID()); + XceiverClientSpi nonExistent = cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType()); Assert.assertEquals(null, nonExistent); // Any container operation should now fail diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index 7eb2ec2e2e..1ecedcc617 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -699,8 +699,8 @@ public void testDeleteKey() throws Exception { .KeyValueContainer); KeyValueContainer container = (KeyValueContainer) cm.getContainerSet() .getContainer(location.getBlockID().getContainerID()); - BlockData blockInfo = keyValueHandler - .getBlockManager().getBlock(container, location.getBlockID()); + BlockData blockInfo = keyValueHandler.getBlockManager() + .getBlock(container, location.getBlockID(), 0); KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); File dataDir = new File(containerData.getChunksPath()); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java index 1c31dd41f3..453cecce13 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java @@ -199,11 +199,11 @@ public Response delete(@PathParam("bucket") String bucketName) } catch (IOException ex) { if (ex.getMessage().contains("BUCKET_NOT_EMPTY")) { OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable - .BUCKET_NOT_EMPTY, S3ErrorTable.Resource.BUCKET); + .BUCKET_NOT_EMPTY, bucketName); throw os3Exception; } else if (ex.getMessage().contains("BUCKET_NOT_FOUND")) { OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable - .NO_SUCH_BUCKET, S3ErrorTable.Resource.BUCKET); + .NO_SUCH_BUCKET, bucketName); throw os3Exception; } else { throw ex; diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java index 61f066c6fc..c6b4e6672c 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java @@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; -import org.apache.hadoop.ozone.s3.exception.S3ErrorTable.Resource; import org.apache.hadoop.ozone.s3.header.AuthorizationHeaderV2; import org.apache.hadoop.ozone.s3.header.AuthorizationHeaderV4; @@ -61,7 +60,7 @@ protected OzoneBucket getBucket(OzoneVolume volume, String bucketName) LOG.error("Error occurred is {}", ex); if (ex.getMessage().contains("NOT_FOUND")) { OS3Exception oex = - S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, Resource.BUCKET); + S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName); throw oex; } else { throw ex; @@ -80,7 +79,7 @@ protected OzoneBucket getBucket(String bucketName) LOG.error("Error occurred is {}", ex); if (ex.getMessage().contains("NOT_FOUND")) { OS3Exception oex = - S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, Resource.BUCKET); + S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName); throw oex; } else { throw ex; @@ -187,7 +186,7 @@ public String parseUsername( if (auth == null) { throw S3ErrorTable - .newError(S3ErrorTable.MALFORMED_HEADER, Resource.HEADER); + .newError(S3ErrorTable.MALFORMED_HEADER, auth); } String userName; diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 15ad2c496b..3f88af964e 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -150,7 +150,7 @@ public Response get( } catch (IOException ex) { if (ex.getMessage().contains("NOT_FOUND")) { OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable - .NO_SUCH_OBJECT, S3ErrorTable.Resource.OBJECT); + .NO_SUCH_KEY, keyPath); throw os3Exception; } else { throw ex; @@ -176,9 +176,8 @@ public Response head( } catch (IOException ex) { LOG.error("Exception occurred in HeadObject", ex); if (ex.getMessage().contains("KEY_NOT_FOUND")) { - OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable - .NO_SUCH_OBJECT, S3ErrorTable.Resource.OBJECT); - throw os3Exception; + // Just return 404 with no content + return Response.status(Status.NOT_FOUND).build(); } else { throw ex; } @@ -215,7 +214,7 @@ public Response delete( } catch (IOException ex) { if (ex.getMessage().contains("BUCKET_NOT_FOUND")) { throw S3ErrorTable.newError(S3ErrorTable - .NO_SUCH_BUCKET, S3ErrorTable.Resource.BUCKET); + .NO_SUCH_BUCKET, bucketName); } else if (!ex.getMessage().contains("NOT_FOUND")) { throw ex; } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java index f5adb717da..9d0f76742c 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java @@ -45,52 +45,23 @@ private S3ErrorTable() { "BucketNotEmpty", "The bucket you tried to delete is not empty.", HTTP_CONFLICT); - public static final OS3Exception NO_SUCH_OBJECT = new OS3Exception( - "NoSuchObject", "The specified object does not exist", HTTP_NOT_FOUND); - - public static final OS3Exception MALFORMED_HEADER = new OS3Exception( "AuthorizationHeaderMalformed", "The authorization header you provided " + "is invalid.", HTTP_NOT_FOUND); + public static final OS3Exception NO_SUCH_KEY = new OS3Exception( + "NoSuchObject", "The specified key does not exist", HTTP_NOT_FOUND); + /** * Create a new instance of Error. * @param e Error Template * @param resource Resource associated with this exception * @return creates a new instance of error based on the template */ - public static OS3Exception newError(OS3Exception e, Resource resource) { + public static OS3Exception newError(OS3Exception e, String resource) { OS3Exception err = new OS3Exception(e.getCode(), e.getErrorMessage(), e.getHttpCode()); - err.setResource(resource.getResource()); + err.setResource(resource); return err; } - - /** - * Resources, which can be defined in OS3Exception. - */ - public enum Resource { - BUCKET("Bucket"), - OBJECT("Object"), - HEADER("header"), - VOLUME("Volume"); - - private final String resource; - - /** - * Constructs resource. - * @param value - */ - Resource(String value) { - this.resource = value; - } - - /** - * Get resource. - * @return string - */ - public String getResource() { - return this.resource; - } - } } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV2.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV2.java index e08931bc04..8e745f27ed 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV2.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV2.java @@ -52,28 +52,24 @@ public AuthorizationHeaderV2(String auth) throws OS3Exception { public void parseHeader() throws OS3Exception { String[] split = authHeader.split(" "); if (split.length != 2) { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } identifier = split[0]; if (!IDENTIFIER.equals(identifier)) { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } String[] remainingSplit = split[1].split(":"); if (remainingSplit.length != 2) { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } accessKeyID = remainingSplit[0]; signature = remainingSplit[1]; if (isBlank(accessKeyID) || isBlank(signature)) { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV4.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV4.java index b6e2bf16fb..88c64ca4ef 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV4.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/AuthorizationHeaderV4.java @@ -64,8 +64,7 @@ public void parseAuthHeader() throws OS3Exception { String[] split = authHeader.split(" "); if (split.length != 4) { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } algorithm = split[0]; @@ -78,24 +77,21 @@ public void parseAuthHeader() throws OS3Exception { credential = credential.substring(CREDENTIAL.length(), credential .length() - 1); } else { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } if (signedHeaders.startsWith(SIGNEDHEADERS)) { signedHeaders = signedHeaders.substring(SIGNEDHEADERS.length(), signedHeaders.length() - 1); } else { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } if (signature.startsWith(SIGNATURE)) { signature = signature.substring(SIGNATURE.length(), signature .length()); } else { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); } // Parse credential. Other parts of header are not validated yet. When diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/Credential.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/Credential.java index b8c519b827..19699a0163 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/Credential.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/header/Credential.java @@ -63,8 +63,7 @@ public void parseCredential() throws OS3Exception { awsService = split[3]; awsRequest = split[4]; } else { - throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable - .Resource.HEADER); + throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, credential); } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/exception/TestOS3Exception.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/exception/TestOS3Exception.java index 3611123e7f..fa6e2c7dfa 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/exception/TestOS3Exception.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/exception/TestOS3Exception.java @@ -32,7 +32,7 @@ public void testOS3Exception() { OS3Exception ex = new OS3Exception("AccessDenied", "Access Denied", 403); String requestId = OzoneUtils.getRequestID(); - ex = S3ErrorTable.newError(ex, S3ErrorTable.Resource.BUCKET); + ex = S3ErrorTable.newError(ex, "bucket"); ex.setRequestId(requestId); String val = ex.toXml(); String formatString = "\n" +