diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 7a6cb2db65..3da09f21c4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -215,7 +215,8 @@ public ContainerReportsProto getContainerReport() throws IOException { .setReadBytes(containerData.getReadBytes()) .setWriteBytes(containerData.getWriteBytes()) .setUsed(containerData.getBytesUsed()) - .setState(getState(containerData)); + .setState(getState(containerData)) + .setDeleteTransactionId(containerData.getDeleteTransactionId()); crBuilder.addReports(ciBuilder.build()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 8e1c2cc999..f3a111fa57 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -372,9 +372,7 @@ public void deleteBlocks(List blockIDs) throws IOException { } try { - Map deleteTransactionsMap = - deletedBlockLog.addTransactions(containerBlocks); - containerManager.updateDeleteTransactionId(deleteTransactionsMap); + deletedBlockLog.addTransactions(containerBlocks); } catch (IOException e) { throw new IOException( "Skip writing the deleted blocks info to" diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java index 2bb568614b..db6c1c5dda 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -42,9 +42,10 @@ public interface DeletedBlockLog extends Closeable { * Once DatanodeDeletedBlockTransactions is full, the scan behavior will * stop. * @param transactions a list of TXs will be set into. + * @return Mapping from containerId to latest transactionId for the container. * @throws IOException */ - void getTransactions(DatanodeDeletedBlockTransactions transactions) + Map getTransactions(DatanodeDeletedBlockTransactions transactions) throws IOException; /** @@ -101,10 +102,9 @@ void addTransaction(long containerID, List blocks) * number of containers) together (on success) or non (on failure). * * @param containerBlocksMap a map of containerBlocks. - * @return Mapping from containerId to latest transactionId for the container. * @throws IOException */ - Map addTransactions(Map> containerBlocksMap) + void addTransactions(Map> containerBlocksMap) throws IOException; /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index ca4e1d0bb5..df97c27733 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -26,6 +26,7 @@ .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -45,13 +46,14 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import static java.lang.Math.min; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys @@ -239,21 +241,26 @@ public void commitTransactions( // set of dns which have successfully committed transaction txId. dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); Long containerId = transactionResult.getContainerID(); - if (dnsWithCommittedTxn == null || containerId == null) { - LOG.warn("Transaction txId={} commit by dnId={} failed." - + " Corresponding entry not found.", txID, dnID); + if (dnsWithCommittedTxn == null) { + LOG.warn("Transaction txId={} commit by dnId={} for containerID={} " + + "failed. Corresponding entry not found.", txID, dnID, + containerId); return; } dnsWithCommittedTxn.add(dnID); - Collection containerDnsDetails = + Pipeline pipeline = containerManager.getContainerWithPipeline(containerId) - .getPipeline().getDatanodes().values(); + .getPipeline(); + Collection containerDnsDetails = + pipeline.getDatanodes().values(); // The delete entry can be safely removed from the log if all the - // corresponding nodes commit the txn. - if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) { + // corresponding nodes commit the txn. It is required to check that + // the nodes returned in the pipeline match the replication factor. + if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size()) + >= pipeline.getFactor().getNumber()) { List containerDns = containerDnsDetails.stream() - .map(dnDetails -> dnDetails.getUuid()) + .map(DatanodeDetails::getUuid) .collect(Collectors.toList()); if (dnsWithCommittedTxn.containsAll(containerDns)) { transactionToDNsCommitMap.remove(txID); @@ -338,15 +345,13 @@ public int getNumOfValidTransactions() throws IOException { * {@inheritDoc} * * @param containerBlocksMap a map of containerBlocks. - * @return Mapping from containerId to latest transactionId for the container. * @throws IOException */ @Override - public Map addTransactions( + public void addTransactions( Map> containerBlocksMap) throws IOException { BatchOperation batch = new BatchOperation(); - Map deleteTransactionsMap = new HashMap<>(); lock.lock(); try { long currentLatestID = lastTxID; @@ -356,13 +361,11 @@ public Map addTransactions( byte[] key = Longs.toByteArray(currentLatestID); DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID, entry.getKey(), entry.getValue()); - deleteTransactionsMap.put(entry.getKey(), currentLatestID); batch.put(key, tx.toByteArray()); } lastTxID = currentLatestID; batch.put(LATEST_TXID, Longs.toByteArray(lastTxID)); deletedStore.writeBatch(batch); - return deleteTransactionsMap; } finally { lock.unlock(); } @@ -376,10 +379,11 @@ public void close() throws IOException { } @Override - public void getTransactions(DatanodeDeletedBlockTransactions transactions) - throws IOException { + public Map getTransactions( + DatanodeDeletedBlockTransactions transactions) throws IOException { lock.lock(); try { + Map deleteTransactionMap = new HashMap<>(); deletedStore.iterate(null, (key, value) -> { if (!Arrays.equals(LATEST_TXID, key)) { DeletedBlocksTransaction block = DeletedBlocksTransaction @@ -388,6 +392,7 @@ public void getTransactions(DatanodeDeletedBlockTransactions transactions) if (block.getCount() > -1 && block.getCount() <= maxRetry) { if (transactions.addTransaction(block, transactionToDNsCommitMap.get(block.getTxID()))) { + deleteTransactionMap.put(block.getContainerID(), block.getTxID()); transactionToDNsCommitMap .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>()); } @@ -396,6 +401,7 @@ public void getTransactions(DatanodeDeletedBlockTransactions transactions) } return true; }); + return deleteTransactionMap; } finally { lock.unlock(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java new file mode 100644 index 0000000000..736daac54c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +public class PendingDeleteHandler implements + EventHandler { + + private SCMBlockDeletingService scmBlockDeletingService; + + public PendingDeleteHandler( + SCMBlockDeletingService scmBlockDeletingService) { + this.scmBlockDeletingService = scmBlockDeletingService; + } + + @Override + public void onMessage(PendingDeleteStatusList pendingDeleteStatusList, + EventPublisher publisher) { + scmBlockDeletingService.handlePendingDeletes(pendingDeleteStatusList); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java new file mode 100644 index 0000000000..904762db59 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import java.util.LinkedList; +import java.util.List; + +public class PendingDeleteStatusList { + + private List pendingDeleteStatuses; + private DatanodeDetails datanodeDetails; + + public PendingDeleteStatusList(DatanodeDetails datanodeDetails) { + this.datanodeDetails = datanodeDetails; + pendingDeleteStatuses = new LinkedList<>(); + } + + public void addPendingDeleteStatus(long dnDeleteTransactionId, + long scmDeleteTransactionId, long containerId) { + pendingDeleteStatuses.add( + new PendingDeleteStatus(dnDeleteTransactionId, scmDeleteTransactionId, + containerId)); + } + + public static class PendingDeleteStatus { + private long dnDeleteTransactionId; + private long scmDeleteTransactionId; + private long containerId; + + public PendingDeleteStatus(long dnDeleteTransactionId, + long scmDeleteTransactionId, long containerId) { + this.dnDeleteTransactionId = dnDeleteTransactionId; + this.scmDeleteTransactionId = scmDeleteTransactionId; + this.containerId = containerId; + } + + public long getDnDeleteTransactionId() { + return dnDeleteTransactionId; + } + + public long getScmDeleteTransactionId() { + return scmDeleteTransactionId; + } + + public long getContainerId() { + return containerId; + } + + } + + public List getPendingDeleteStatuses() { + return pendingDeleteStatuses; + } + + public int getNumPendingDeletes() { + return pendingDeleteStatuses.size(); + } + + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 6f65fdd2c9..699fd37d04 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -56,7 +57,7 @@ */ public class SCMBlockDeletingService extends BackgroundService { - static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(SCMBlockDeletingService.class); // ThreadPoolSize=2, 1 for scheduler and the other for the scanner. @@ -106,6 +107,19 @@ public BackgroundTaskQueue getTasks() { return queue; } + public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) { + DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails(); + for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : deletionStatusList + .getPendingDeleteStatuses()) { + LOG.info( + "Block deletion txnID mismatch in datanode {} for containerID {}." + + " Datanode delete txnID: {}, SCM txnID: {}", + dnDetails.getUuid(), deletionStatus.getContainerId(), + deletionStatus.getDnDeleteTransactionId(), + deletionStatus.getScmDeleteTransactionId()); + } + } + private class DeletedBlockTransactionScanner implements BackgroundTask { @@ -123,11 +137,12 @@ public EmptyTaskResult call() throws Exception { LOG.debug("Running DeletedBlockTransactionScanner"); DatanodeDeletedBlockTransactions transactions = null; List datanodes = nodeManager.getNodes(NodeState.HEALTHY); + Map transactionMap = null; if (datanodes != null) { transactions = new DatanodeDeletedBlockTransactions(mappingService, blockDeleteLimitSize, datanodes.size()); try { - deletedBlockLog.getTransactions(transactions); + transactionMap = deletedBlockLog.getTransactions(transactions); } catch (IOException e) { // We may tolerant a number of failures for sometime // but if it continues to fail, at some point we need to raise @@ -159,6 +174,7 @@ public EmptyTaskResult call() throws Exception { transactions.getTransactionIDList(dnId))); } } + mappingService.updateDeleteTransactionId(transactionMap); } if (dnTxCount > 0) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index d84551a0f7..863d6c5a24 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -23,11 +23,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo; +import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; @@ -43,6 +45,7 @@ import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; @@ -86,6 +89,7 @@ public class ContainerMapping implements Mapping { private final LeaseManager containerLeaseManager; private final float containerCloseThreshold; private final ContainerCloser closer; + private final EventPublisher eventPublisher; private final long size; /** @@ -128,6 +132,7 @@ public ContainerMapping( OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; this.containerStateManager = new ContainerStateManager(conf, this); + LOG.trace("Container State Manager created."); this.pipelineSelector = new PipelineSelector(nodeManager, containerStateManager, conf, eventPublisher); @@ -135,7 +140,7 @@ public ContainerMapping( this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); - LOG.trace("Container State Manager created."); + this.eventPublisher = eventPublisher; long containerCreationLeaseTimeout = conf.getTimeDuration( ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, @@ -398,8 +403,13 @@ public HddsProtos.LifeCycleState updateContainerState( */ public void updateDeleteTransactionId(Map deleteTransactionMap) throws IOException { + if (deleteTransactionMap == null) { + return; + } + lock.lock(); try { + BatchOperation batch = new BatchOperation(); for (Map.Entry entry : deleteTransactionMap.entrySet()) { long containerID = entry.getKey(); byte[] dbKey = Longs.toByteArray(containerID); @@ -413,10 +423,11 @@ public void updateDeleteTransactionId(Map deleteTransactionMap) ContainerInfo containerInfo = ContainerInfo.fromProtobuf( HddsProtos.SCMContainerInfo.parseFrom(containerBytes)); containerInfo.updateDeleteTransactionId(entry.getValue()); - containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray()); - containerStateManager - .updateDeleteTransactionId(containerID, entry.getValue()); + batch.put(dbKey, containerInfo.getProtobuf().toByteArray()); } + containerStore.writeBatch(batch); + containerStateManager + .updateDeleteTransactionId(deleteTransactionMap); } finally { lock.unlock(); } @@ -484,7 +495,8 @@ public void processContainerReports(DatanodeDetails datanodeDetails, throws IOException { List containerInfos = reports.getReportsList(); - + PendingDeleteStatusList pendingDeleteStatusList = + new PendingDeleteStatusList(datanodeDetails); for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); @@ -498,6 +510,14 @@ public void processContainerReports(DatanodeDetails datanodeDetails, HddsProtos.SCMContainerInfo newState = reconcileState(datanodeState, knownState, datanodeDetails); + if (knownState.getDeleteTransactionId() > datanodeState + .getDeleteTransactionId()) { + pendingDeleteStatusList + .addPendingDeleteStatus(datanodeState.getDeleteTransactionId(), + knownState.getDeleteTransactionId(), + knownState.getContainerID()); + } + // FIX ME: This can be optimized, we write twice to memory, where a // single write would work well. // @@ -529,6 +549,11 @@ public void processContainerReports(DatanodeDetails datanodeDetails, lock.unlock(); } } + if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { + eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, + pendingDeleteStatusList); + } + } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index f0ab213f11..6b983a6e87 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -360,13 +361,14 @@ public ContainerInfo updateContainerInfo(ContainerInfo info) /** * Update deleteTransactionId for a container. * - * @param containerID ContainerID of the container whose delete - * transactionId needs to be updated. - * @param transactionId latest transactionId to be updated for the container + * @param deleteTransactionMap maps containerId to its new + * deleteTransactionID */ - public void updateDeleteTransactionId(Long containerID, long transactionId) { - containers.getContainerMap().get(ContainerID.valueof(containerID)) - .updateDeleteTransactionId(transactionId); + public void updateDeleteTransactionId(Map deleteTransactionMap) { + for (Map.Entry entry : deleteTransactionMap.entrySet()) { + containers.getContainerMap().get(ContainerID.valueof(entry.getKey())) + .updateDeleteTransactionId(entry.getValue()); + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 70b1e9635a..5911ce206b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.events; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .CloseContainerStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler @@ -146,6 +147,14 @@ public final class SCMEvents { new TypedEvent(DeleteBlockCommandStatus.class, "DeleteBlockCommandStatus"); + /** + * This event will be triggered while processing container reports from DN + * when deleteTransactionID of container in report mismatches with the + * deleteTransactionID on SCM. + */ + public static final Event PENDING_DELETE_STATUS = + new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus"); + /** * This is the command for ReplicationManager to handle under/over * replication. Sent by the ContainerReportHandler after processing the diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 47a9100b2b..178e2bde17 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; +import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; @@ -219,6 +220,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); + PendingDeleteHandler pendingDeleteHandler = + new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); ContainerReportHandler containerReportHandler = new ContainerReportHandler(scmContainerManager, node2ContainerMap, @@ -235,6 +238,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus); + eventQueue + .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 7049029d7a..2beb4e74d2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -106,31 +106,6 @@ public void testAllocateBlock() throws Exception { Assert.assertNotNull(block); } - @Test - public void testDeleteBlock() throws Exception { - AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, - type, factor, containerOwner); - Assert.assertNotNull(block); - long transactionId = - mapping.getContainer(block.getBlockID().getContainerID()) - .getDeleteTransactionId(); - Assert.assertEquals(0, transactionId); - blockManager.deleteBlocks(Collections.singletonList( - block.getBlockID())); - Assert.assertEquals(++transactionId, - mapping.getContainer(block.getBlockID().getContainerID()) - .getDeleteTransactionId()); - - block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, - type, factor, containerOwner); - Assert.assertNotNull(block); - blockManager.deleteBlocks(Collections.singletonList( - block.getBlockID())); - Assert.assertEquals(++transactionId, - mapping.getContainer(block.getBlockID().getContainerID()) - .getDeleteTransactionId()); - } - @Test public void testAllocateOversizedBlock() throws IOException { long size = 6 * GB; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index ee9aed21a2..badd435c7f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -23,6 +23,11 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -43,6 +48,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ozShell.TestOzoneShell; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.utils.MetadataStore; import org.junit.Assert; import org.junit.BeforeClass; @@ -101,7 +107,7 @@ public void testBlockDeletion() String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); - String value = RandomStringUtils.random(1000000); + String value = RandomStringUtils.random(10000000); store.createVolume(volumeName); OzoneVolume volume = store.getVolume(volumeName); volume.createBucket(bucketName); @@ -111,7 +117,9 @@ public void testBlockDeletion() OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE); - out.write(value.getBytes()); + for (int i = 0; i < 100; i++) { + out.write(value.getBytes()); + } out.close(); OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) @@ -144,6 +152,49 @@ public void testBlockDeletion() Assert.assertTrue(!containerIdsWithDeletedBlocks.isEmpty()); // Containers in the DN and SCM should have same delete transactionIds matchContainerTransactionIds(); + // Containers in the DN and SCM should have same delete transactionIds + // after DN restart. The assertion is just to verify that the state of + // containerInfos in dn and scm is consistent after dn restart. + cluster.restartHddsDatanode(0); + matchContainerTransactionIds(); + + // verify PENDING_DELETE_STATUS event is fired + verifyBlockDeletionEvent(); + } + + private void verifyBlockDeletionEvent() + throws IOException, InterruptedException { + LogCapturer logCapturer = + LogCapturer.captureLogs(SCMBlockDeletingService.LOG); + // Create dummy container reports with deleteTransactionId set as 0 + ContainerReportsProto containerReport = dnContainerSet.getContainerReport(); + ContainerReportsProto.Builder dummyReportsBuilder = + ContainerReportsProto.newBuilder(); + for (ContainerInfo containerInfo : containerReport.getReportsList()) { + dummyReportsBuilder.addReports( + ContainerInfo.newBuilder(containerInfo).setDeleteTransactionId(0) + .build()); + } + ContainerReportsProto dummyReport = dummyReportsBuilder.build(); + + logCapturer.clearOutput(); + scm.getScmContainerManager().processContainerReports( + cluster.getHddsDatanodes().get(0).getDatanodeDetails(), dummyReport); + // wait for event to be handled by event handler + Thread.sleep(1000); + String output = logCapturer.getOutput(); + for (ContainerInfo containerInfo : dummyReport.getReportsList()) { + long containerId = containerInfo.getContainerID(); + // Event should be triggered only for containers which have deleted blocks + if (containerIdsWithDeletedBlocks.contains(containerId)) { + Assert.assertTrue(output.contains( + "for containerID " + containerId + ". Datanode delete txnID")); + } else { + Assert.assertTrue(!output.contains( + "for containerID " + containerId + ". Datanode delete txnID")); + } + } + logCapturer.clearOutput(); } private void matchContainerTransactionIds() throws IOException {