diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index ddcf9667c5..c068046aac 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -214,14 +214,14 @@ public ContainerCommandResponseProto sendCommand( @Override public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, List excludeDns) throws IOException { Preconditions.checkState(HddsUtils.isReadOnly(request)); return sendCommandWithRetry(request, excludeDns); } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, List excludeDns) throws IOException { ContainerCommandResponseProto responseProto = null; @@ -231,24 +231,24 @@ private XceiverClientReply sendCommandWithRetry( // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader List dns = pipeline.getNodes(); - DatanodeDetails datanode = null; List healthyDns = excludeDns != null ? dns.stream().filter(dnId -> { - for (UUID excludeId : excludeDns) { - if (dnId.getUuid().equals(excludeId)) { + for (DatanodeDetails excludeId : excludeDns) { + if (dnId.equals(excludeId)) { return false; } } return true; }).collect(Collectors.toList()) : dns; + XceiverClientReply reply = new XceiverClientReply(null); for (DatanodeDetails dn : healthyDns) { try { LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. + reply.addDatanode(dn); responseProto = sendCommandAsync(request, dn).getResponse().get(); - datanode = dn; if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { break; } @@ -264,8 +264,8 @@ private XceiverClientReply sendCommandWithRetry( } if (responseProto != null) { - return new XceiverClientReply( - CompletableFuture.completedFuture(responseProto), datanode.getUuid()); + reply.setResponse(CompletableFuture.completedFuture(responseProto)); + return reply; } else { throw new IOException( "Failed to execute command " + request + " on the pipeline " @@ -382,11 +382,11 @@ private void reconnect(DatanodeDetails dn, String encodedToken) } @Override - public long watchForCommit(long index, long timeout) + public XceiverClientReply watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException { // there is no notion of watch for commit index in standalone pipeline - return 0; + return null; }; public long getReplicatedMinCommitIndex() { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 0caf10d2af..673a82bbd8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hdds.scm; -import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -59,6 +59,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * An abstract implementation of {@link XceiverClientSpi} using Ratis. @@ -91,7 +92,7 @@ public static XceiverClientRatis newXceiverClientRatis( private final GrpcTlsConfig tlsConfig; // Map to track commit index at every server - private final ConcurrentHashMap commitInfoMap; + private final ConcurrentHashMap commitInfoMap; // create a separate RaftClient for watchForCommit API private RaftClient watchClient; @@ -118,7 +119,8 @@ private void updateCommitInfosMap( // of the servers if (commitInfoMap.isEmpty()) { commitInfoProtos.forEach(proto -> commitInfoMap - .put(proto.getServer().getAddress(), proto.getCommitIndex())); + .put(RatisHelper.toDatanodeId(proto.getServer()), + proto.getCommitIndex())); // In case the commit is happening 2 way, just update the commitIndex // for the servers which have been successfully updating the commit // indexes. This is important because getReplicatedMinCommitIndex() @@ -126,7 +128,7 @@ private void updateCommitInfosMap( // been replicating data successfully. } else { commitInfoProtos.forEach(proto -> commitInfoMap - .computeIfPresent(proto.getServer().getAddress(), + .computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()), (address, index) -> { index = proto.getCommitIndex(); return index; @@ -218,15 +220,23 @@ public long getReplicatedMinCommitIndex() { return minIndex.isPresent() ? minIndex.getAsLong() : 0; } + private void addDatanodetoReply(UUID address, XceiverClientReply reply) { + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(address.toString()); + reply.addDatanode(builder.build()); + } + @Override - public long watchForCommit(long index, long timeout) + public XceiverClientReply watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException { long commitIndex = getReplicatedMinCommitIndex(); + XceiverClientReply clientReply = new XceiverClientReply(null); if (commitIndex >= index) { // return the min commit index till which the log has been replicated to // all servers - return commitIndex; + clientReply.setLogIndex(commitIndex); + return clientReply; } LOG.debug("commit index : {} watch timeout : {}", index, timeout); // create a new RaftClient instance for watch request @@ -250,26 +260,30 @@ public long watchForCommit(long index, long timeout) // TODO : need to remove the code to create the new RaftClient instance // here once the watch request bypassing sliding window in Raft Client // gets fixed. - watchClient = - RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, + watchClient = RatisHelper + .newRaftClient(rpcType, getPipeline(), retryPolicy, maxOutstandingRequests, tlsConfig); reply = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); - Optional - proto = reply.getCommitInfos().stream().min(Comparator.comparing( - RaftProtos.CommitInfoProto :: getCommitIndex)); - Preconditions.checkState(proto.isPresent()); - String address = proto.get().getServer().getAddress(); - // since 3 way commit has failed, the updated map from now on will - // only store entries for those datanodes which have had successful - // replication. - commitInfoMap.remove(address); - LOG.info( - "Could not commit " + index + " to all the nodes. Server " + address - + " has failed." + " Committed by majority."); + List commitInfoProtoList = + reply.getCommitInfos().stream() + .filter(i -> i.getCommitIndex() < index) + .collect(Collectors.toList()); + commitInfoProtoList.parallelStream().forEach(proto -> { + UUID address = RatisHelper.toDatanodeId(proto.getServer()); + addDatanodetoReply(address, clientReply); + // since 3 way commit has failed, the updated map from now on will + // only store entries for those datanodes which have had successful + // replication. + commitInfoMap.remove(address); + LOG.info( + "Could not commit " + index + " to all the nodes. Server " + address + + " has failed." + " Committed by majority."); + }); } - return index; + clientReply.setLogIndex(index); + return clientReply; } /** @@ -296,17 +310,28 @@ public XceiverClientReply sendCommandAsync( RaftRetryFailureException raftRetryFailureException = reply.getRetryFailureException(); if (raftRetryFailureException != null) { + // in case of raft retry failure, the raft client is + // not able to connect to the leader hence the pipeline + // can not be used but this instance of RaftClient will close + // and refreshed again. In case the client cannot connect to + // leader, getClient call will fail. + + // No need to set the failed Server ID here. Ozone client + // will directly exclude this pipeline in next allocate block + // to SCM as in this case, it is the raft client which is not + // able to connect to leader in the pipeline, though the + // pipeline can still be functional. throw new CompletionException(raftRetryFailureException); } ContainerCommandResponseProto response = ContainerCommandResponseProto .parseFrom(reply.getMessage().getContent()); + UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId()); if (response.getResult() == ContainerProtos.Result.SUCCESS) { updateCommitInfosMap(reply.getCommitInfos()); - asyncReply.setLogIndex(reply.getLogIndex()); - asyncReply.setDatanode( - RatisHelper.toDatanodeId(reply.getReplierId())); } + asyncReply.setLogIndex(reply.getLogIndex()); + addDatanodetoReply(serverId, asyncReply); return response; } catch (InvalidProtocolBufferException e) { throw new CompletionException(e); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 3ea36d45dc..66671637bf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -42,7 +42,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutionException; /** @@ -290,7 +289,7 @@ private synchronized void readChunkFromContainer() throws IOException { XceiverClientReply reply; ReadChunkResponseProto readChunkResponse = null; final ChunkInfo chunkInfo = chunks.get(chunkIndex); - List excludeDns = null; + List excludeDns = null; ByteString byteString; List dnList = xceiverClient.getPipeline().getNodes(); while (true) { @@ -334,7 +333,7 @@ private synchronized void readChunkFromContainer() throws IOException { if (excludeDns == null) { excludeDns = new ArrayList<>(); } - excludeDns.add(reply.getDatanode()); + excludeDns.addAll(reply.getDatanodes()); if (excludeDns.size() == dnList.size()) { throw ioe; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index f3ba656426..2e156b3f5b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -41,6 +42,7 @@ import java.io.OutputStream; import java.nio.Buffer; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.UUID; import java.util.List; import java.util.ArrayList; @@ -102,14 +104,17 @@ public class BlockOutputStream extends OutputStream { // by all servers private long totalAckDataLength; - // list to hold up all putBlock futures - private List> - futureList; + // future Map to hold up all putBlock futures + private ConcurrentHashMap> + futureMap; // map containing mapping for putBlock logIndex to to flushedDataLength Map. private ConcurrentHashMap commitIndex2flushedDataMap; private int currentBufferIndex; + private List failedServers; + /** * Creates a new BlockOutputStream. * @@ -157,10 +162,11 @@ public BlockOutputStream(BlockID blockID, String key, responseExecutor = Executors.newSingleThreadExecutor(); commitIndex2flushedDataMap = new ConcurrentHashMap<>(); totalAckDataLength = 0; - futureList = new ArrayList<>(); + futureMap = new ConcurrentHashMap<>(); totalDataFlushedLength = 0; currentBufferIndex = 0; writtenDataLength = 0; + failedServers = Collections.emptyList(); } public BlockID getBlockID() { @@ -182,6 +188,9 @@ private long computeBufferData() { return dataLength; } + public List getFailedServers() { + return failedServers; + } @Override public void write(int b) throws IOException { @@ -299,7 +308,7 @@ private void updateFlushIndex(long index) { Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); totalAckDataLength = commitIndex2flushedDataMap.remove(index); LOG.debug("Total data successfully replicated: " + totalAckDataLength); - futureList.remove(0); + futureMap.remove(totalAckDataLength); // Flush has been committed to required servers successful. // just swap the bufferList head and tail after clearing. ByteBuffer currentBuffer = bufferList.remove(0); @@ -320,7 +329,7 @@ private void updateFlushIndex(long index) { private void handleFullBuffer() throws IOException { try { checkOpen(); - if (!futureList.isEmpty()) { + if (!futureMap.isEmpty()) { waitOnFlushFutures(); } } catch (InterruptedException | ExecutionException e) { @@ -362,9 +371,22 @@ private void adjustBuffersOnException() { private void watchForCommit(long commitIndex) throws IOException { checkOpen(); Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); + long index; try { - long index = + XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex, watchTimeout); + if (reply == null) { + index = 0; + } else { + List dnList = reply.getDatanodes(); + if (!dnList.isEmpty()) { + if (failedServers.isEmpty()) { + failedServers = new ArrayList<>(); + } + failedServers.addAll(dnList); + } + index = reply.getLogIndex(); + } adjustBuffers(index); } catch (TimeoutException | InterruptedException | ExecutionException e) { LOG.warn("watchForCommit failed for index " + commitIndex, e); @@ -392,8 +414,7 @@ ContainerCommandResponseProto> handlePartialFlush() try { validateResponse(e); } catch (IOException sce) { - future.completeExceptionally(sce); - return e; + throw new CompletionException(sce); } // if the ioException is not set, putBlock is successful if (ioException == null) { @@ -422,7 +443,7 @@ ContainerCommandResponseProto> handlePartialFlush() throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } - futureList.add(flushFuture); + futureMap.put(flushPos, flushFuture); return flushFuture; } @@ -516,8 +537,8 @@ public void close() throws IOException { private void waitOnFlushFutures() throws InterruptedException, ExecutionException { - CompletableFuture combinedFuture = CompletableFuture - .allOf(futureList.toArray(new CompletableFuture[futureList.size()])); + CompletableFuture combinedFuture = CompletableFuture.allOf( + futureMap.values().toArray(new CompletableFuture[futureMap.size()])); // wait for all the transactions to complete combinedFuture.get(); } @@ -553,10 +574,10 @@ public void cleanup(boolean invalidateClient) { } xceiverClientManager = null; xceiverClient = null; - if (futureList != null) { - futureList.clear(); + if (futureMap != null) { + futureMap.clear(); } - futureList = null; + futureMap = null; if (commitIndex2flushedDataMap != null) { commitIndex2flushedDataMap.clear(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java index 5678555608..bae0758fdd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java @@ -19,20 +19,28 @@ package org.apache.hadoop.hdds.scm; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; -import java.util.UUID; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; /** - * This class represents the Async reply from XceiverClient. + * This class represents the reply from XceiverClient. */ public class XceiverClientReply { private CompletableFuture response; private Long logIndex; - private UUID dnId; + + /** + * List of datanodes where the command got executed and reply is received. + * If there is an exception in the reply, these datanodes will inform + * about the servers where there is a failure. + */ + private List datanodes; public XceiverClientReply( CompletableFuture response) { @@ -40,10 +48,11 @@ public XceiverClientReply( } public XceiverClientReply( - CompletableFuture response, UUID dnId) { + CompletableFuture response, + List datanodes) { this.logIndex = (long) 0; this.response = response; - this.dnId = dnId; + this.datanodes = datanodes == null ? new ArrayList<>() : datanodes; } public CompletableFuture getResponse() { @@ -58,12 +67,12 @@ public void setLogIndex(Long logIndex) { this.logIndex = logIndex; } - public UUID getDatanode() { - return dnId; + public List getDatanodes() { + return datanodes; } - public void setDatanode(UUID datanodeId) { - this.dnId = datanodeId; + public void addDatanode(DatanodeDetails dn) { + datanodes.add(dn); } public void setResponse( diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index aee2afd4ea..1a183664a1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -21,11 +21,11 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -123,7 +123,7 @@ public ContainerCommandResponseProto sendCommand( * @throws IOException */ public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, List excludeDns) throws IOException { try { XceiverClientReply reply; @@ -157,14 +157,14 @@ public XceiverClientReply sendCommand( * Check if an specfic commitIndex is replicated to majority/all servers. * @param index index to watch for * @param timeout timeout provided for the watch ipeartion to complete - * @return the min commit index replicated to all or majority servers - * in case of a failure + * @return reply containing the min commit index replicated to all or majority + * servers in case of a failure * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException * @throws IOException */ - public abstract long watchForCommit(long index, long timeout) + public abstract XceiverClientReply watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java new file mode 100644 index 0000000000..ed4ac54532 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.common.helpers; + + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; + +import java.util.*; + +/** + * This class contains set of dns and containers which ozone client provides + * to be handed over to SCM when block allocation request comes. + */ +public class ExcludeList { + + private final List datanodes; + private final List containerIds; + private final List pipelineIds; + + + public ExcludeList() { + datanodes = new ArrayList<>(); + containerIds = new ArrayList<>(); + pipelineIds = new ArrayList<>(); + } + + public List getContainerIds() { + return containerIds; + } + + public List getDatanodes() { + return datanodes; + } + + public void addDatanodes(Collection dns) { + datanodes.addAll(dns); + } + + public void addDatanode(DatanodeDetails dn) { + datanodes.add(dn); + } + + public void addConatinerId(ContainerID containerId) { + containerIds.add(containerId); + } + + public void addPipeline(PipelineID pipelineId) { + pipelineIds.add(pipelineId); + } + + public List getPipelineIds() { + return pipelineIds; + } + + public HddsProtos.ExcludeListProto getProtoBuf() { + HddsProtos.ExcludeListProto.Builder builder = + HddsProtos.ExcludeListProto.newBuilder(); + containerIds.parallelStream() + .forEach(id -> builder.addContainerIds(id.getId())); + datanodes.parallelStream().forEach(dn -> { + builder.addDatanodes(dn.getUuidString()); + }); + pipelineIds.parallelStream().forEach(pipelineID -> { + builder.addPipelineIds(pipelineID.getProtobuf()); + }); + return builder.build(); + } + + public static ExcludeList getFromProtoBuf( + HddsProtos.ExcludeListProto excludeListProto) { + ExcludeList excludeList = new ExcludeList(); + excludeListProto.getContainerIdsList().parallelStream().forEach(id -> { + excludeList.addConatinerId(ContainerID.valueof(id)); + }); + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + excludeListProto.getDatanodesList().forEach(dn -> { + builder.setUuid(dn); + excludeList.addDatanode(builder.build()); + }); + excludeListProto.getPipelineIdsList().forEach(pipelineID -> { + excludeList.addPipeline(PipelineID.getFromProtobuf(pipelineID)); + }); + return excludeList; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java index 6e95e7027c..1a32672617 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.protocol; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; @@ -47,11 +48,14 @@ public interface ScmBlockLocationProtocol extends Closeable { * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. * @param size - size of the block. + * @param excludeList List of datanodes/containers to exclude during block + * allocation. * @return allocated block accessing info (key, pipeline). * @throws IOException */ AllocatedBlock allocateBlock(long size, ReplicationType type, - ReplicationFactor factor, String owner) throws IOException; + ReplicationFactor factor, String owner, ExcludeList excludeList) + throws IOException; /** * Delete blocks for a set of object keys. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 184dd2b2c8..96276f501c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.tracing.TracingUtil; @@ -80,7 +81,7 @@ public ScmBlockLocationProtocolClientSideTranslatorPB( @Override public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, - String owner) throws IOException { + String owner, ExcludeList excludeList) throws IOException { Preconditions.checkArgument(size > 0, "block size must be greater than 0"); AllocateScmBlockRequestProto request = @@ -90,6 +91,7 @@ public AllocatedBlock allocateBlock(long size, .setFactor(factor) .setOwner(owner) .setTraceID(TracingUtil.exportCurrentSpan()) + .setExcludeList(excludeList.getProtoBuf()) .build(); final AllocateScmBlockResponseProto response; try { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 62968314df..73e402523a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector; import org.apache.hadoop.io.Text; @@ -71,7 +73,6 @@ import java.io.IOException; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutionException; /** @@ -232,7 +233,8 @@ public static XceiverClientReply putBlockAsync( * @throws IOException if there is an I/O error while performing the call */ public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, BlockID blockID, String traceID, List excludeDns) + ChunkInfo chunk, BlockID blockID, String traceID, + List excludeDns) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto .newBuilder() @@ -563,6 +565,9 @@ public static void validateContainerResponse( } else if (response.getResult() == ContainerProtos.Result.BLOCK_NOT_COMMITTED) { throw new BlockNotCommittedException(response.getMessage()); + } else if (response.getResult() + == ContainerProtos.Result.CLOSED_CONTAINER_IO) { + throw new ContainerNotOpenException(response.getMessage()); } throw new StorageContainerException( response.getMessage(), response.getResult()); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index 956218ab35..0d434c172a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; @@ -77,7 +78,8 @@ public AllocateScmBlockResponseProto allocateScmBlock( request.getTraceID())) { AllocatedBlock allocatedBlock = impl.allocateBlock(request.getSize(), request.getType(), - request.getFactor(), request.getOwner()); + request.getFactor(), request.getOwner(), + ExcludeList.getFromProtoBuf(request.getExcludeList())); if (allocatedBlock != null) { return AllocateScmBlockResponseProto.newBuilder() diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 08a0359c40..85fbca600c 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -42,6 +42,7 @@ message AllocateScmBlockRequestProto { required hadoop.hdds.ReplicationFactor factor = 3; required string owner = 4; optional string traceID = 5; + optional ExcludeListProto excludeList = 6; } diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index cc8b3179e5..a7d7704352 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -199,6 +199,12 @@ enum ScmOps { queryNode = 11; } +message ExcludeListProto { + repeated string datanodes = 1; + repeated int64 containerIds = 2; + repeated PipelineID pipelineIds = 3; +} + /** * Block ID that uniquely identify a block by SCM. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java index f9aa0cd4f7..a43b18c426 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import java.io.Closeable; import java.io.IOException; @@ -36,11 +37,14 @@ public interface BlockManager extends Closeable { * @param size - Block Size * @param type Replication Type * @param factor - Replication Factor + * @param excludeList List of datanodes/containers to exclude during block + * allocation. * @return AllocatedBlock * @throws IOException */ AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, String owner) throws IOException; + HddsProtos.ReplicationFactor factor, String owner, + ExcludeList excludeList) throws IOException; /** * Deletes a list of blocks in an atomic operation. Internally, SCM diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index dea7c020f6..0e4bb50b6d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -35,9 +35,11 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -60,6 +62,8 @@ .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; +import java.util.function.Predicate; + /** Block Manager manages the block access for SCM. */ public class BlockManagerImpl implements EventHandler, @@ -145,12 +149,14 @@ public void stop() throws IOException { * @param size - Block Size * @param type Replication Type * @param factor - Replication Factor + * @param excludeList List of datanodes/containers to exclude during block + * allocation. * @return Allocated block * @throws IOException on failure. */ @Override - public AllocatedBlock allocateBlock(final long size, - ReplicationType type, ReplicationFactor factor, String owner) + public AllocatedBlock allocateBlock(final long size, ReplicationType type, + ReplicationFactor factor, String owner, ExcludeList excludeList) throws IOException { LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor); ScmUtils.preCheck(ScmOps.allocateBlock, chillModePrecheck); @@ -177,8 +183,10 @@ public AllocatedBlock allocateBlock(final long size, ContainerInfo containerInfo; while (true) { - List availablePipelines = pipelineManager - .getPipelines(type, factor, Pipeline.PipelineState.OPEN); + List availablePipelines = + pipelineManager + .getPipelines(type, factor, Pipeline.PipelineState.OPEN, + excludeList.getDatanodes(), excludeList.getPipelineIds()); Pipeline pipeline; if (availablePipelines.size() == 0) { try { @@ -197,7 +205,13 @@ public AllocatedBlock allocateBlock(final long size, // look for OPEN containers that match the criteria. containerInfo = containerManager .getMatchingContainer(size, owner, pipeline); - if (containerInfo != null) { + + // TODO: if getMachingContainer results in containers which are in exclude + // list, we may end up in this loop forever. This case needs to be + // addressed. + if (containerInfo != null && (excludeList.getContainerIds() == null + || !discardContainer(containerInfo.containerID(), + excludeList.getContainerIds()))) { return newBlock(containerInfo); } } @@ -210,6 +224,11 @@ public AllocatedBlock allocateBlock(final long size, return null; } + private boolean discardContainer(ContainerID containerId, + List containers) { + Predicate predicate = p -> p.equals(containerId); + return containers.parallelStream().anyMatch(predicate); + } /** * newBlock - returns a new block assigned to a container. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index e360e7b064..11ba2c3b0a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -25,6 +25,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.NavigableSet; @@ -51,6 +52,10 @@ List getPipelines(ReplicationType type, List getPipelines(ReplicationType type, ReplicationFactor factor, Pipeline.PipelineState state); + List getPipelines(ReplicationType type, ReplicationFactor factor, + Pipeline.PipelineState state, Collection excludeDns, + Collection excludePipelines); + void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index a0ef964fb2..6be747b7e8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.NavigableSet; @@ -81,6 +83,13 @@ List getPipelines(ReplicationType type, ReplicationFactor factor, return pipelineStateMap.getPipelines(type, factor, state); } + List getPipelines(ReplicationType type, ReplicationFactor factor, + PipelineState state, Collection excludeDns, + Collection excludePipelines) { + return pipelineStateMap + .getPipelines(type, factor, state, excludeDns, excludePipelines); + } + List getPipelines(ReplicationType type, PipelineState... states) { return pipelineStateMap.getPipelines(type, states); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 8e0b62bacc..6fc27a6cdb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -217,6 +219,57 @@ List getPipelines(ReplicationType type, ReplicationFactor factor, .collect(Collectors.toList()); } + /** + * Get list of pipeline corresponding to specified replication type, + * replication factor and pipeline state. + * + * @param type - ReplicationType + * @param state - Required PipelineState + * @param excludeDns list of dns to exclude + * @param excludePipelines pipelines to exclude + * @return List of pipelines with specified replication type, + * replication factor and pipeline state + */ + List getPipelines(ReplicationType type, ReplicationFactor factor, + PipelineState state, Collection excludeDns, + Collection excludePipelines) { + Preconditions.checkNotNull(type, "Replication type cannot be null"); + Preconditions.checkNotNull(factor, "Replication factor cannot be null"); + Preconditions.checkNotNull(state, "Pipeline state cannot be null"); + Preconditions + .checkNotNull(excludeDns, "Datanode exclude list cannot be null"); + Preconditions + .checkNotNull(excludeDns, "Pipeline exclude list cannot be null"); + return getPipelines(type, factor, state).stream().filter( + pipeline -> !discardPipeline(pipeline, excludePipelines) + && !discardDatanode(pipeline, excludeDns)) + .collect(Collectors.toList()); + } + + private boolean discardPipeline(Pipeline pipeline, + Collection excludePipelines) { + if (excludePipelines.isEmpty()) { + return false; + } + Predicate predicate = p -> p.equals(pipeline.getId()); + return excludePipelines.parallelStream().anyMatch(predicate); + } + + private boolean discardDatanode(Pipeline pipeline, + Collection excludeDns) { + if (excludeDns.isEmpty()) { + return false; + } + boolean discard = false; + for (DatanodeDetails dn : pipeline.getNodes()) { + Predicate predicate = p -> p.equals(dn); + discard = excludeDns.parallelStream().anyMatch(predicate); + if (discard) { + break; + } + } + return discard; + } /** * Get set of containerIDs corresponding to a pipeline. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 371660742a..90facca18c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.Collection; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -205,6 +206,20 @@ public List getPipelines(ReplicationType type, } } + @Override + public List getPipelines(ReplicationType type, + ReplicationFactor factor, Pipeline.PipelineState state, + Collection excludeDns, + Collection excludePipelines) { + lock.readLock().lock(); + try { + return stateManager + .getPipelines(type, factor, state, excludeDns, excludePipelines); + } finally { + lock.readLock().unlock(); + } + } + @Override public void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 1d7b59d8ac..52cb0454dd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; @@ -155,9 +156,9 @@ public void join() throws InterruptedException { } @Override - public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType - type, HddsProtos.ReplicationFactor factor, String owner) throws - IOException { + public AllocatedBlock allocateBlock(long size, + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, + String owner, ExcludeList excludeList) throws IOException { Map auditMap = Maps.newHashMap(); auditMap.put("size", String.valueOf(size)); auditMap.put("type", type.name()); @@ -165,7 +166,8 @@ public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType auditMap.put("owner", owner); boolean auditSuccess = true; try { - return scm.getScmBlockManager().allocateBlock(size, type, factor, owner); + return scm.getScmBlockManager() + .allocateBlock(size, type, factor, owner, excludeList); } catch (Exception ex) { auditSuccess = false; AUDIT.logWriteFailure( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 625e561e87..856e06b65a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.SCMContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -127,7 +128,7 @@ public void testAllocateBlock() throws Exception { return !blockManager.isScmInChillMode(); }, 10, 1000 * 5); AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, - type, factor, containerOwner); + type, factor, containerOwner, new ExcludeList()); Assert.assertNotNull(block); } @@ -140,7 +141,7 @@ public void testAllocateOversizedBlock() throws Exception { long size = 6 * GB; thrown.expectMessage("Unsupported block size"); AllocatedBlock block = blockManager.allocateBlock(size, - type, factor, containerOwner); + type, factor, containerOwner, new ExcludeList()); } @@ -154,7 +155,7 @@ public void testAllocateBlockFailureInChillMode() throws Exception { thrown.expectMessage("ChillModePrecheck failed for " + "allocateBlock"); blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, - type, factor, containerOwner); + type, factor, containerOwner, new ExcludeList()); } @Test @@ -165,7 +166,7 @@ public void testAllocateBlockSucInChillMode() throws Exception { return !blockManager.isScmInChillMode(); }, 10, 1000 * 5); Assert.assertNotNull(blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, - type, factor, containerOwner)); + type, factor, containerOwner, new ExcludeList())); } @Test(timeout = 10000) @@ -179,12 +180,14 @@ public void testMultipleBlockAllocation() pipelineManager.createPipeline(type, factor); AllocatedBlock allocatedBlock = blockManager - .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner); + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner, + new ExcludeList()); // block should be allocated in different pipelines GenericTestUtils.waitFor(() -> { try { AllocatedBlock block = blockManager - .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner); + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner, + new ExcludeList()); return !block.getPipeline().getId() .equals(allocatedBlock.getPipeline().getId()); } catch (IOException e) { @@ -227,7 +230,8 @@ public void testMultipleBlockAllocationWithClosedContainer() GenericTestUtils.waitFor(() -> { try { blockManager - .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner); + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner, + new ExcludeList()); } catch (IOException e) { } return verifyNumberOfContainersInPipelines( @@ -250,7 +254,8 @@ public void testMultipleBlockAllocationWithClosedContainer() GenericTestUtils.waitFor(() -> { try { blockManager - .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner); + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner, + new ExcludeList()); } catch (IOException e) { } return verifyNumberOfContainersInPipelines( @@ -271,7 +276,8 @@ public void testBlockAllocationWithNoAvailablePipelines() } Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size()); Assert.assertNotNull(blockManager - .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner)); + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner, + new ExcludeList())); Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size()); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 9239db8a19..9651518887 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -19,17 +19,28 @@ import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.*; +import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.RaftRetryFailureException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeoutException; /** A utility class for OzoneClient. */ public final class OzoneClientUtils { private OzoneClientUtils() {} + private static final List> EXCEPTION_LIST = + new ArrayList>() {{ + add(TimeoutException.class); + add(ContainerNotOpenException.class); + add(RaftRetryFailureException.class); + add(AlreadyClosedException.class); + }}; /** * Returns a BucketInfo object constructed using fields of the input * OzoneBucket object. @@ -110,4 +121,8 @@ public static KeyInfoDetails asKeyInfoDetails(OzoneKeyDetails key) { keyInfo.setFileEncryptionInfo(key.getFileEncryptionInfo()); return keyInfo; } + + public static List> getExceptionList() { + return EXCEPTION_LIST; + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 5ae7e8bf7e..16a825ff0a 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ChecksumType; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -32,6 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import java.util.Collection; + /** * Helper class used inside {@link BlockOutputStream}. * */ @@ -159,6 +162,14 @@ long getTotalSuccessfulFlushedData() throws IOException { } } + Collection getFailedServers() throws IOException { + if (outputStream != null) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; + return out.getFailedServers(); + } + return null; + } + long getWrittenDataLength() throws IOException { if (outputStream != null) { BlockOutputStream out = (BlockOutputStream) this.outputStream; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 3b0e51b4eb..7035c73ce9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -21,20 +21,22 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ChecksumType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.RaftRetryFailureException; @@ -46,7 +48,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Optional; +import java.util.Collection; import java.util.ListIterator; import java.util.concurrent.TimeoutException; @@ -84,7 +86,7 @@ public class KeyOutputStream extends OutputStream { private List bufferList; private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; - + private ExcludeList excludeList; /** * A constructor for testing purpose only. */ @@ -181,6 +183,7 @@ public KeyOutputStream(OpenKeySession handler, Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); Preconditions.checkState(blockSize % streamBufferMaxSize == 0); this.bufferList = new ArrayList<>(); + this.excludeList = new ExcludeList(); } /** @@ -307,9 +310,8 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) current.write(b, off, writeLen); } } catch (IOException ioe) { - boolean retryFailure = checkForRetryFailure(ioe); - if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe) - || retryFailure) { + Throwable t = checkForException(ioe); + if (t != null) { // for the current iteration, totalDataWritten - currentPos gives the // amount of data already written to the buffer @@ -321,7 +323,7 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) writeLen = retry ? (int) len : (int) (current.getWrittenDataLength() - currentPos); LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, retryFailure); + handleException(current, currentStreamIndex, t); } else { throw ioe; } @@ -340,8 +342,10 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) * Discards the subsequent pre allocated blocks and removes the streamEntries * from the streamEntries list for the container which is closed. * @param containerID id of the closed container + * @param pipelineId id of the associated pipeline */ - private void discardPreallocatedBlocks(long containerID) { + private void discardPreallocatedBlocks(long containerID, + PipelineID pipelineId) { // currentStreamIndex < streamEntries.size() signifies that, there are still // pre allocated blocks available. if (currentStreamIndex < streamEntries.size()) { @@ -349,8 +353,10 @@ private void discardPreallocatedBlocks(long containerID) { streamEntries.listIterator(currentStreamIndex); while (streamEntryIterator.hasNext()) { BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); - if (streamEntry.getBlockID().getContainerID() - == containerID && streamEntry.getCurrentPosition() == 0) { + if (((pipelineId != null && streamEntry.getPipeline().getId() + .equals(pipelineId)) || (containerID != -1 + && streamEntry.getBlockID().getContainerID() == containerID)) + && streamEntry.getCurrentPosition() == 0) { streamEntryIterator.remove(); } } @@ -382,17 +388,39 @@ private void removeEmptyBlocks() { * * @param streamEntry StreamEntry * @param streamIndex Index of the entry - * @param retryFailure if true the xceiverClient needs to be invalidated in - * the client cache. + * @param exception actual exception that occurred * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - int streamIndex, boolean retryFailure) throws IOException { + int streamIndex, Throwable exception) throws IOException { + boolean retryFailure = checkForRetryFailure(exception); + boolean closedContainerException = false; + if (!retryFailure) { + closedContainerException = checkIfContainerIsClosed(exception); + } + PipelineID pipelineId = null; long totalSuccessfulFlushedData = streamEntry.getTotalSuccessfulFlushedData(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); + LOG.warn("Encountered exception {}", exception); + LOG.info( + "The last committed block length is {}, uncommitted data length is {}", + totalSuccessfulFlushedData, bufferedDataLen); + Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); + long containerId = streamEntry.getBlockID().getContainerID(); + Collection failedServers = streamEntry.getFailedServers(); + Preconditions.checkNotNull(failedServers); + if (!failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); + } + if (checkIfContainerIsClosed(exception)) { + excludeList.addConatinerId(ContainerID.valueof(containerId)); + } else if (retryFailure || exception instanceof TimeoutException) { + pipelineId = streamEntry.getPipeline().getId(); + excludeList.addPipeline(pipelineId); + } // just clean up the current stream. streamEntry.cleanup(retryFailure); if (bufferedDataLen > 0) { @@ -405,21 +433,21 @@ private void handleException(BlockOutputStreamEntry streamEntry, streamEntries.remove(streamIndex); currentStreamIndex -= 1; } - // discard subsequent pre allocated blocks from the streamEntries list - // from the closed container - discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID()); - } - private boolean checkIfContainerIsClosed(IOException ioe) { - if (ioe.getCause() != null) { - return checkForException(ioe, ContainerNotOpenException.class) || Optional - .of(ioe.getCause()) - .filter(e -> e instanceof StorageContainerException) - .map(e -> (StorageContainerException) e) - .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) - .isPresent(); + if (closedContainerException) { + // discard subsequent pre allocated blocks from the streamEntries list + // from the closed container + discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), + null); + } else { + // In case there is timeoutException or Watch for commit happening over + // majority or the client connection failure to the leader in the + // pipeline, just discard all the preallocated blocks on this pipeline. + // Next block allocation will happen with excluding this specific pipeline + // This will ensure if 2 way commit happens , it cannot span over multiple + // blocks + discardPreallocatedBlocks(-1, pipelineId); } - return false; } /** @@ -427,31 +455,27 @@ private boolean checkIfContainerIsClosed(IOException ioe) { * In case of retry failure, ratis client throws RaftRetryFailureException * and all succeeding operations are failed with AlreadyClosedException. */ - private boolean checkForRetryFailure(IOException ioe) { - return checkForException(ioe, RaftRetryFailureException.class, - AlreadyClosedException.class); + private boolean checkForRetryFailure(Throwable t) { + return t instanceof RaftRetryFailureException + || t instanceof AlreadyClosedException; } - private boolean checkForException(IOException ioe, Class... classes) { + private boolean checkIfContainerIsClosed(Throwable t) { + return t instanceof ContainerNotOpenException; + } + + private Throwable checkForException(IOException ioe) { Throwable t = ioe.getCause(); while (t != null) { - for (Class cls : classes) { + for (Class cls : OzoneClientUtils + .getExceptionList()) { if (cls.isInstance(t)) { - return true; + return t; } } t = t.getCause(); } - return false; - } - - private boolean checkIfTimeoutException(IOException ioe) { - if (ioe.getCause() != null) { - return Optional.of(ioe.getCause()) - .filter(e -> e instanceof TimeoutException).isPresent(); - } else { - return false; - } + return null; } private long getKeyLength() { @@ -469,7 +493,8 @@ private long getKeyLength() { * @throws IOException */ private void allocateNewBlock(int index) throws IOException { - OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID); + OmKeyLocationInfo subKeyInfo = + omClient.allocateBlock(keyArgs, openID, excludeList); addKeyLocationInfo(subKeyInfo); } @@ -495,19 +520,25 @@ private void handleFlushOrClose(boolean close) throws IOException { BlockOutputStreamEntry entry = streamEntries.get(streamIndex); if (entry != null) { try { + Collection failedServers = entry.getFailedServers(); + + // failed servers can be null in case there is no data written in the + // stream + if (failedServers != null && !failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); + } if (close) { entry.close(); } else { entry.flush(); } } catch (IOException ioe) { - boolean retryFailure = checkForRetryFailure(ioe); - if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe) - || retryFailure) { + Throwable t = checkForException(ioe); + if (t != null) { // This call will allocate a new streamEntry and write the Data. // Close needs to be retried on the newly allocated streamEntry as // as well. - handleException(entry, streamIndex, retryFailure); + handleException(entry, streamIndex, t); handleFlushOrClose(close); } else { throw ioe; @@ -564,6 +595,11 @@ public FileEncryptionInfo getFileEncryptionInfo() { return feInfo; } + @VisibleForTesting + public ExcludeList getExcludeList() { + return excludeList; + } + /** * Builder class of KeyOutputStream. */ diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 9bb83a04b1..54f4e82862 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.om.protocol; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; @@ -38,6 +39,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; + import org.apache.hadoop.security.KerberosInfo; /** @@ -175,11 +177,13 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName) * * @param args the key to append * @param clientID the client identification + * @param excludeList List of datanodes/containers to exclude during block + * allocation * @return an allocated block * @throws IOException */ - OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) - throws IOException; + OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, + ExcludeList excludeList) throws IOException; /** * Look up for the container of an existing key. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 0d266c6588..ff7a1d89ff 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; @@ -674,8 +675,8 @@ private OMResponse handleError(OMResponse resp) throws OMException { } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId) - throws IOException { + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId, + ExcludeList excludeList) throws IOException { AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder(); KeyArgs keyArgs = KeyArgs.newBuilder() .setVolumeName(args.getVolumeName()) @@ -684,6 +685,8 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId) .setDataSize(args.getDataSize()).build(); req.setKeyArgs(keyArgs); req.setClientID(clientId); + req.setExcludeList(excludeList.getProtoBuf()); + OMRequest omRequest = createOMRequest(Type.AllocateBlock) .setAllocateBlockRequest(req) diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 28a802ce7c..b1168260c8 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -611,6 +611,7 @@ message CommitKeyResponse { message AllocateBlockRequest { required KeyArgs keyArgs = 1; required uint64 clientID = 2; + optional hadoop.hdds.ExcludeListProto excludeList = 3; } message AllocateBlockResponse { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 2415335290..bf4f4d4486 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -24,9 +24,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -43,8 +40,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.test.GenericTestUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -56,7 +51,6 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; -import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; @@ -232,22 +226,32 @@ public void testMultiBlockWrites() throws Exception { public void testMultiBlockWrites2() throws Exception { String keyName = getKeyName(); OzoneOutputStream key = - createKey(keyName, ReplicationType.RATIS, 4 * blockSize); + createKey(keyName, ReplicationType.RATIS, 2 * blockSize); KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - // With the initial size provided, it should have pre allocated 4 blocks - Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); + // With the initial size provided, it should have pre allocated 2 blocks + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); String dataString = ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize)); byte[] data = dataString.getBytes(UTF_8); key.write(data); - // 3 block are completely written to the DataNode in 3 blocks. + // 2 block are completely written to the DataNode in 3 blocks. // Data of length half of chunkSize resides in the chunkOutput stream buffer String dataString2 = - ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2); + ContainerTestHelper.getFixedLengthString(keyString, chunkSize); key.write(dataString2.getBytes(UTF_8)); + key.flush(); + + String dataString3 = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize); + key.write(dataString3.getBytes(UTF_8)); + key.flush(); + + String dataString4 = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2); + key.write(dataString4.getBytes(UTF_8)); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) @@ -260,15 +264,16 @@ public void testMultiBlockWrites2() throws Exception { // read the key from OM again and match the length.The length will still // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); // Though we have written only block initially, the close will hit // closeContainerException and remaining data in the chunkOutputStream // buffer will be copied into a different allocated block and will be // committed. - Assert.assertEquals(dataString.concat(dataString2).getBytes(UTF_8).length, + + String dataCommitted = + dataString.concat(dataString2).concat(dataString3).concat(dataString4); + Assert.assertEquals(dataCommitted.getBytes(UTF_8).length, keyInfo.getDataSize()); - validateData(keyName, dataString.concat(dataString2).getBytes(UTF_8)); + validateData(keyName, dataCommitted.getBytes(UTF_8)); } @Test @@ -337,55 +342,8 @@ private void waitForContainerClose(String keyName, containerIdList.add(info.getContainerID()); } Assert.assertTrue(!containerIdList.isEmpty()); - waitForContainerClose(containerIdList.toArray(new Long[0])); - } - - private void waitForContainerClose(Long... containerIdList) - throws ContainerNotFoundException, PipelineNotFoundException, - TimeoutException, InterruptedException { - List pipelineList = new ArrayList<>(); - for (long containerID : containerIdList) { - cluster.getStorageContainerManager().getEventQueue() - .fireEvent(SCMEvents.CLOSE_CONTAINER, - ContainerID.valueof(containerID)); - ContainerInfo container = - cluster.getStorageContainerManager().getContainerManager() - .getContainer(ContainerID.valueof(containerID)); - Pipeline pipeline = - cluster.getStorageContainerManager().getPipelineManager() - .getPipeline(container.getPipelineID()); - pipelineList.add(pipeline); - List datanodes = pipeline.getNodes(); - for (DatanodeDetails details : datanodes) { - Assert.assertFalse(ContainerTestHelper - .isContainerClosed(cluster, containerID, details)); - // send the order to close the container - cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(details.getUuid(), - new CloseContainerCommand(containerID, pipeline.getId())); - } - } - int index = 0; - for (long containerID : containerIdList) { - Pipeline pipeline = pipelineList.get(index); - List datanodes = pipeline.getNodes(); - // Below condition avoids the case where container has been allocated - // but not yet been used by the client. In such a case container is never - // created. - if (datanodes.stream().anyMatch(dn -> ContainerTestHelper - .isContainerPresent(cluster, containerID, dn))) { - for (DatanodeDetails datanodeDetails : datanodes) { - GenericTestUtils.waitFor(() -> ContainerTestHelper - .isContainerClosed(cluster, containerID, datanodeDetails), - 500, 15 * 1000); - //double check if it's really closed - // (waitFor also throws an exception) - Assert.assertTrue(ContainerTestHelper - .isContainerClosed(cluster, containerID, datanodeDetails)); - } - } - index++; - } + ContainerTestHelper + .waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); } @Ignore // test needs to be fixed after close container is handled for diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index f94257cacb..092c56f368 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -36,12 +37,11 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -54,16 +54,16 @@ */ public class TestFailureHandlingByClient { - private static MiniOzoneCluster cluster; - private static OzoneConfiguration conf; - private static OzoneClient client; - private static ObjectStore objectStore; - private static int chunkSize; - private static int blockSize; - private static String volumeName; - private static String bucketName; - private static String keyString; - private static int maxRetries; + private MiniOzoneCluster cluster; + private OzoneConfiguration conf; + private OzoneClient client; + private ObjectStore objectStore; + private int chunkSize; + private int blockSize; + private String volumeName; + private String bucketName; + private String keyString; + private int maxRetries; /** * Create a MiniDFSCluster for testing. @@ -72,8 +72,7 @@ public class TestFailureHandlingByClient { * * @throws IOException */ - @Before - public void init() throws Exception { + private void init() throws Exception { conf = new OzoneConfiguration(); maxRetries = 100; chunkSize = (int) OzoneConsts.MB; @@ -101,11 +100,14 @@ public void init() throws Exception { objectStore.getVolume(volumeName).createBucket(bucketName); } + private void startCluster() throws Exception { + init(); + } + /** * Shutdown MiniDFSCluster. */ - @After - public void shutdown() { + private void shutdown() { if (cluster != null) { cluster.shutdown(); } @@ -113,6 +115,7 @@ public void shutdown() { @Test public void testBlockWritesWithDnFailures() throws Exception { + startCluster(); String keyName = UUID.randomUUID().toString(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); byte[] data = @@ -148,10 +151,12 @@ public void testBlockWritesWithDnFailures() throws Exception { OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); Assert.assertEquals(data.length, keyInfo.getDataSize()); validateData(keyName, data); + shutdown(); } @Test public void testMultiBlockWritesWithDnFailures() throws Exception { + startCluster(); String keyName = "ratis3"; OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); String data = @@ -188,11 +193,13 @@ public void testMultiBlockWritesWithDnFailures() throws Exception { OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize()); validateData(keyName, data.concat(data).getBytes()); + shutdown(); } @Test public void testMultiBlockWritesWithIntermittentDnFailures() throws Exception { + startCluster(); String keyName = UUID.randomUUID().toString(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 6 * blockSize); @@ -232,8 +239,235 @@ public void testMultiBlockWritesWithIntermittentDnFailures() OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize()); validateData(keyName, data.concat(data).concat(data).getBytes()); + shutdown(); } + @Test + public void testWriteSmallFile() throws Exception { + startCluster(); + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, 0); + String data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize/2); + key.write(data.getBytes()); + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); + long containerId = locationInfoList.get(0).getContainerID(); + BlockID blockId = locationInfoList.get(0).getBlockID(); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List datanodes = pipeline.getNodes(); + + cluster.shutdownHddsDatanode(datanodes.get(0)); + cluster.shutdownHddsDatanode(datanodes.get(1)); + key.close(); + // this will throw AlreadyClosedException and and current stream + // will be discarded and write a new block + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + + // Make sure a new block is written + Assert.assertNotEquals( + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0) + .getBlockID(), blockId); + Assert.assertEquals(data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.getBytes()); + shutdown(); + } + + + @Test + public void testContainerExclusionWithClosedContainerException() + throws Exception { + startCluster(); + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, blockSize); + String data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); + + // Assert that 1 block will be preallocated + Assert.assertEquals(1, locationInfoList.size()); + key.write(data.getBytes()); + key.flush(); + long containerId = locationInfoList.get(0).getContainerID(); + BlockID blockId = locationInfoList.get(0).getBlockID(); + List containerIdList = new ArrayList<>(); + containerIdList.add(containerId); + + // below check will assert if the container does not get closed + ContainerTestHelper + .waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + + // This write will hit ClosedContainerException and this container should + // will be added in the excludelist + key.write(data.getBytes()); + key.flush(); + + Assert.assertTrue(keyOutputStream.getExcludeList().getContainerIds() + .contains(ContainerID.valueof(containerId))); + Assert.assertTrue( + keyOutputStream.getExcludeList().getDatanodes().isEmpty()); + Assert.assertTrue( + keyOutputStream.getExcludeList().getPipelineIds().isEmpty()); + + // The close will just write to the buffer + key.close(); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + + // Make sure a new block is written + Assert.assertNotEquals( + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0) + .getBlockID(), blockId); + Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).getBytes()); + shutdown(); + } + + @Test + public void testDatanodeExclusionWithMajorityCommit() throws Exception { + startCluster(); + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, blockSize); + String data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); + + // Assert that 1 block will be preallocated + Assert.assertEquals(1, locationInfoList.size()); + key.write(data.getBytes()); + key.flush(); + long containerId = locationInfoList.get(0).getContainerID(); + BlockID blockId = locationInfoList.get(0).getBlockID(); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List datanodes = pipeline.getNodes(); + + // shutdown 1 datanode. This will make sure the 2 way commit happens for + // next write ops. + cluster.shutdownHddsDatanode(datanodes.get(0)); + + key.write(data.getBytes()); + key.write(data.getBytes()); + // The close will just write to the buffer + key.close(); + Assert.assertTrue(keyOutputStream.getExcludeList().getDatanodes() + .contains(datanodes.get(0))); + Assert.assertTrue( + keyOutputStream.getExcludeList().getContainerIds().isEmpty()); + Assert.assertTrue( + keyOutputStream.getExcludeList().getPipelineIds().isEmpty()); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + + // Make sure a new block is written + Assert.assertNotEquals( + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0) + .getBlockID(), blockId); + Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).concat(data).getBytes()); + shutdown(); + } + + + @Test + public void testPipelineExclusionWithPipelineFailure() throws Exception { + startCluster(); + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, blockSize); + String data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); + + // Assert that 1 block will be preallocated + Assert.assertEquals(1, locationInfoList.size()); + key.write(data.getBytes()); + key.flush(); + long containerId = locationInfoList.get(0).getContainerID(); + BlockID blockId = locationInfoList.get(0).getBlockID(); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List datanodes = pipeline.getNodes(); + + // Two nodes, next write will hit AlraedyClosedException , the pipeline + // will be added in the exclude list + cluster.shutdownHddsDatanode(datanodes.get(0)); + cluster.shutdownHddsDatanode(datanodes.get(1)); + + key.write(data.getBytes()); + key.write(data.getBytes()); + // The close will just write to the buffer + key.close(); + Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() + .contains(pipeline.getId())); + Assert.assertTrue( + keyOutputStream.getExcludeList().getContainerIds().isEmpty()); + Assert.assertTrue( + keyOutputStream.getExcludeList().getDatanodes().isEmpty()); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + + // Make sure a new block is written + Assert.assertNotEquals( + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0) + .getBlockID(), blockId); + Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).concat(data).getBytes()); + shutdown(); + } private OzoneOutputStream createKey(String keyName, ReplicationType type, long size) throws Exception { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index ad5adc7efa..d06cee7bc4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -31,6 +31,7 @@ import java.util.Objects; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; @@ -44,8 +45,13 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -62,6 +68,7 @@ import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.Assert; import org.slf4j.Logger; @@ -697,4 +704,57 @@ public static void validateData(String keyName, byte[] data, public static String getFixedLengthString(String string, int length) { return String.format("%1$" + length + "s", string); } + + public static void waitForContainerClose(MiniOzoneCluster cluster, + Long... containerIdList) + throws ContainerNotFoundException, PipelineNotFoundException, + TimeoutException, InterruptedException { + List pipelineList = new ArrayList<>(); + for (long containerID : containerIdList) { + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + pipelineList.add(pipeline); + List datanodes = pipeline.getNodes(); + + for (DatanodeDetails details : datanodes) { + // Client will issue write chunk and it will create the container on + // datanodes. + // wait for the container to be created + GenericTestUtils + .waitFor(() -> isContainerPresent(cluster, containerID, details), + 500, 100 * 1000); + Assert.assertTrue(isContainerPresent(cluster, containerID, details)); + + // make sure the container gets created first + Assert.assertFalse(ContainerTestHelper + .isContainerClosed(cluster, containerID, details)); + // send the order to close the container + cluster.getStorageContainerManager().getEventQueue() + .fireEvent(SCMEvents.CLOSE_CONTAINER, + ContainerID.valueof(containerID)); + } + } + int index = 0; + for (long containerID : containerIdList) { + Pipeline pipeline = pipelineList.get(index); + List datanodes = pipeline.getNodes(); + // Below condition avoids the case where container has been allocated + // but not yet been used by the client. In such a case container is never + // created. + for (DatanodeDetails datanodeDetails : datanodes) { + GenericTestUtils.waitFor(() -> ContainerTestHelper + .isContainerClosed(cluster, containerID, datanodeDetails), 500, + 15 * 1000); + //double check if it's really closed + // (waitFor also throws an exception) + Assert.assertTrue(ContainerTestHelper + .isContainerClosed(cluster, containerID, datanodeDetails)); + } + index++; + } + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java index 8c08c8fe11..5361745de4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -148,7 +149,7 @@ public void testAllocateCommit() throws Exception { // this block will be appended to the latest version of version 2. OmKeyLocationInfo locationInfo = - ozoneManager.allocateBlock(keyArgs, openKey.getId()); + ozoneManager.allocateBlock(keyArgs, openKey.getId(), new ExcludeList()); List locationInfoList = openKey.getKeyInfo().getLatestVersionLocations() .getBlocksLatestVersionOnly(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index c571c36201..122312daa8 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -67,11 +68,13 @@ public interface KeyManager { * * @param args the key to append * @param clientID the client requesting block. + * @param excludeList List of datanodes/containers to exclude during block + * allocation. * @return the reference to the new block. * @throws IOException */ - OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) - throws IOException; + OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, + ExcludeList excludeList) throws IOException; /** * Given the args of a key to put, write an open key entry to meta data. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 25b335372e..5caeaeaa2e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.ozone.OzoneConsts; @@ -220,7 +221,8 @@ private void validateS3Bucket(String volumeName, String bucketName) } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, + ExcludeList excludeList) throws IOException { Preconditions.checkNotNull(args); String volumeName = args.getVolumeName(); @@ -242,7 +244,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) try { allocatedBlock = scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(), - keyInfo.getFactor(), omId); + keyInfo.getFactor(), omId, excludeList); } catch (SCMException ex) { if (ex.getResult() .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) { @@ -390,7 +392,8 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { AllocatedBlock allocatedBlock; try { allocatedBlock = scmBlockClient - .allocateBlock(allocateSize, type, factor, omId); + .allocateBlock(allocateSize, type, factor, omId, + new ExcludeList()); } catch (IOException ex) { if (ex instanceof SCMException) { if (((SCMException) ex).getResult() diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index d2ade56681..22e9cb3320 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB; @@ -1867,7 +1868,8 @@ public void commitKey(OmKeyArgs args, long clientID) } @Override - public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, + ExcludeList excludeList) throws IOException { if(isAclEnabled) { checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE, @@ -1879,7 +1881,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID)); try { metrics.incNumBlockAllocateCalls(); - return keyManager.allocateBlock(args, clientID); + return keyManager.allocateBlock(args, clientID, excludeList); } catch (Exception ex) { metrics.incNumBlockAllocateCallFails(); auditSuccess = false; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 831873d9cb..8d76842124 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; @@ -626,8 +627,9 @@ private AllocateBlockResponse allocateBlock(AllocateBlockRequest request) .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) .build(); - OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs, - request.getClientID()); + OmKeyLocationInfo newLocation = + impl.allocateBlock(omKeyArgs, request.getClientID(), + ExcludeList.getFromProtoBuf(request.getExcludeList())); resp.setKeyLocation(newLocation.getProtobuf()); return resp.build(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java index 3b9c232776..6ff927bf59 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -108,13 +109,14 @@ public ScmBlockLocationTestIngClient(String clusterID, String scmId, * @param type Replication Type * @param factor - Replication factor * @param owner - String owner. + * @param excludeList list of dns/pipelines to exclude * @return * @throws IOException */ @Override public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, - String owner) throws IOException { + String owner, ExcludeList excludeList) throws IOException { DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); Pipeline pipeline = createPipeline(datanodeDetails); long containerID = Time.monotonicNow(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java index c86c12a881..ab846b85f8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -196,7 +197,8 @@ private void createAndDeleteKeys(KeyManager keyManager, int keyCount, //Open, Commit and Delete the Keys in the Key Manager. OpenKeySession session = keyManager.openKey(arg); for (int i = 0; i < numBlocks; i++) { - arg.addLocationInfo(keyManager.allocateBlock(arg, session.getId())); + arg.addLocationInfo( + keyManager.allocateBlock(arg, session.getId(), new ExcludeList())); } keyManager.commitKey(arg, session.getId()); keyManager.deleteKey(arg); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index 4531dbbfa3..992ccafba2 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -91,7 +92,8 @@ public void setUp() throws Exception { private void setupMocks() throws Exception { Mockito.when(scmBlockLocationProtocol .allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class), - Mockito.any(ReplicationFactor.class), Mockito.anyString())) + Mockito.any(ReplicationFactor.class), Mockito.anyString(), + Mockito.any(ExcludeList.class))) .thenThrow( new SCMException("ChillModePrecheck failed for allocateBlock", ResultCodes.CHILL_MODE_EXCEPTION)); @@ -180,7 +182,7 @@ public void allocateBlockFailureInChillMode() throws Exception { .setVolumeName(VOLUME_NAME).build(); LambdaTestUtils.intercept(OMException.class, "ChillModePrecheck failed for allocateBlock", () -> { - keyManager.allocateBlock(keyArgs, 1); + keyManager.allocateBlock(keyArgs, 1, new ExcludeList()); }); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkBlockManager.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkBlockManager.java index cc08709046..edfa397b30 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkBlockManager.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkBlockManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.block.BlockManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -162,6 +163,6 @@ public void allocateBlockBenchMark(BenchMarkBlockManager state, Blackhole bh) throws IOException { state.blockManager .allocateBlock(50, ReplicationType.RATIS, ReplicationFactor.THREE, - "Genesis"); + "Genesis", new ExcludeList()); } } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java index cf25693db1..bba59956ef 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.SCMContainerManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -149,7 +150,7 @@ public void setup() throws Exception { } assertEquals(2, nodeManager.getAllNodes().size()); AllocatedBlock ab1 = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, type, - factor, CONTAINER_OWNER); + factor, CONTAINER_OWNER, new ExcludeList()); blockContainerMap.put(ab1.getBlockID().getLocalID(), ab1.getBlockID().getContainerID()); @@ -162,7 +163,8 @@ public void setup() throws Exception { // the size of blockContainerMap will vary each time the test is run. while (true) { ab2 = blockManager - .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, CONTAINER_OWNER); + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, CONTAINER_OWNER, + new ExcludeList()); blockContainerMap.put(ab2.getBlockID().getLocalID(), ab2.getBlockID().getContainerID()); if (ab1.getBlockID().getContainerID() !=