HDDS-279. DeleteBlocks command should not be sent for open containers. Contributed by Lokesh Jain.
This commit is contained in:
parent
7631e0adae
commit
b28bdc7e8b
@ -28,6 +28,8 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
|
||||
/**
|
||||
@ -53,21 +55,26 @@ public class DatanodeDeletedBlockTransactions {
|
||||
this.nodeNum = nodeNum;
|
||||
}
|
||||
|
||||
public void addTransaction(DeletedBlocksTransaction tx,
|
||||
Set<UUID> dnsWithTransactionCommitted) throws IOException {
|
||||
public boolean addTransaction(DeletedBlocksTransaction tx,
|
||||
Set<UUID> dnsWithTransactionCommitted) {
|
||||
Pipeline pipeline = null;
|
||||
try {
|
||||
pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
|
||||
.getPipeline();
|
||||
ContainerWithPipeline containerWithPipeline =
|
||||
mappingService.getContainerWithPipeline(tx.getContainerID());
|
||||
if (containerWithPipeline.getContainerInfo().isContainerOpen()) {
|
||||
return false;
|
||||
}
|
||||
pipeline = containerWithPipeline.getPipeline();
|
||||
} catch (IOException e) {
|
||||
SCMBlockDeletingService.LOG.warn("Got container info error.", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
SCMBlockDeletingService.LOG.warn(
|
||||
"Container {} not found, continue to process next",
|
||||
tx.getContainerID());
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
for (DatanodeDetails dd : pipeline.getMachines()) {
|
||||
@ -78,6 +85,7 @@ public class DatanodeDeletedBlockTransactions {
|
||||
addTransactionToDN(dnID, tx);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
|
||||
|
@ -386,9 +386,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
||||
.parseFrom(value);
|
||||
|
||||
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
|
||||
Set<UUID> dnsWithTransactionCommitted = transactionToDNsCommitMap
|
||||
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
|
||||
transactions.addTransaction(block, dnsWithTransactionCommitted);
|
||||
if (transactions.addTransaction(block,
|
||||
transactionToDNsCommitMap.get(block.getTxID()))) {
|
||||
transactionToDNsCommitMap
|
||||
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
|
||||
}
|
||||
}
|
||||
return !transactions.isFull();
|
||||
}
|
||||
|
@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class OzoneTestUtils {
|
||||
|
||||
/**
|
||||
* Close containers which contain the blocks listed in
|
||||
* omKeyLocationInfoGroups.
|
||||
*
|
||||
* @param omKeyLocationInfoGroups locationInfos for a key.
|
||||
* @param scm StorageContainerManager instance.
|
||||
* @return true if close containers is successful.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean closeContainers(
|
||||
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups,
|
||||
StorageContainerManager scm) throws IOException {
|
||||
return performOperationOnKeyContainers((blockID) -> {
|
||||
try {
|
||||
scm.getScmContainerManager()
|
||||
.updateContainerState(blockID.getContainerID(),
|
||||
HddsProtos.LifeCycleEvent.FINALIZE);
|
||||
scm.getScmContainerManager()
|
||||
.updateContainerState(blockID.getContainerID(),
|
||||
HddsProtos.LifeCycleEvent.CLOSE);
|
||||
Assert.assertFalse(scm.getScmContainerManager()
|
||||
.getContainerWithPipeline(blockID.getContainerID())
|
||||
.getContainerInfo().isContainerOpen());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}, omKeyLocationInfoGroups);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the provided consumer on containers which contain the blocks
|
||||
* listed in omKeyLocationInfoGroups.
|
||||
*
|
||||
* @param consumer Consumer which accepts BlockID as argument.
|
||||
* @param omKeyLocationInfoGroups locationInfos for a key.
|
||||
* @return true if consumer is successful.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean performOperationOnKeyContainers(
|
||||
Consumer<BlockID> consumer,
|
||||
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups) throws IOException {
|
||||
|
||||
try {
|
||||
for (OmKeyLocationInfoGroup omKeyLocationInfoGroup :
|
||||
omKeyLocationInfoGroups) {
|
||||
List<OmKeyLocationInfo> omKeyLocationInfos =
|
||||
omKeyLocationInfoGroup.getLocationList();
|
||||
for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfos) {
|
||||
BlockID blockID = omKeyLocationInfo.getBlockID();
|
||||
consumer.accept(blockID);
|
||||
}
|
||||
}
|
||||
} catch (Error e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -212,6 +212,10 @@ public class TestStorageContainerManager {
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
|
||||
for (OmKeyInfo keyInfo : keyLocations.values()) {
|
||||
OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
|
||||
cluster.getStorageContainerManager());
|
||||
}
|
||||
|
||||
Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
|
||||
keyLocations, helper);
|
||||
@ -294,6 +298,10 @@ public class TestStorageContainerManager {
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
|
||||
for (OmKeyInfo keyInfo : keyLocations.values()) {
|
||||
OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
|
||||
cluster.getStorageContainerManager());
|
||||
}
|
||||
|
||||
createDeleteTXLog(delLog, keyLocations, helper);
|
||||
// Verify a few TX gets created in the TX log.
|
||||
|
@ -61,6 +61,7 @@ import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
@ -356,10 +357,18 @@ public class TestBlockDeletingService {
|
||||
// 1st interval processes 1 container 1 block and 10 chunks
|
||||
deleteAndWait(service, 1);
|
||||
Assert.assertEquals(10, getNumberOfChunksInContainers(containerSet));
|
||||
deleteAndWait(service, 2);
|
||||
deleteAndWait(service, 3);
|
||||
deleteAndWait(service, 4);
|
||||
deleteAndWait(service, 5);
|
||||
|
||||
AtomicInteger timesToProcess = new AtomicInteger(1);
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
timesToProcess.incrementAndGet();
|
||||
deleteAndWait(service, timesToProcess.get());
|
||||
if (getNumberOfChunksInContainers(containerSet) == 0) {
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {}
|
||||
return false;
|
||||
}, 100, 100000);
|
||||
Assert.assertEquals(0, getNumberOfChunksInContainers(containerSet));
|
||||
} finally {
|
||||
service.shutdown();
|
||||
|
@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
||||
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
@ -29,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneTestUtils;
|
||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
@ -40,7 +40,6 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -53,13 +52,15 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
|
||||
import static org.apache.hadoop.ozone
|
||||
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
|
||||
public class TestBlockDeletion {
|
||||
private static OzoneConfiguration conf = null;
|
||||
private static ObjectStore store;
|
||||
private static MiniOzoneCluster cluster = null;
|
||||
private static ContainerSet dnContainerSet = null;
|
||||
private static StorageContainerManager scm = null;
|
||||
private static OzoneManager om = null;
|
||||
@ -81,9 +82,10 @@ public class TestBlockDeletion {
|
||||
conf.setQuietMode(false);
|
||||
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
|
||||
TimeUnit.MILLISECONDS);
|
||||
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 200,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
MiniOzoneCluster cluster =
|
||||
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
|
||||
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
store = OzoneClientFactory.getRpcClient(conf).getObjectStore();
|
||||
dnContainerSet = cluster.getHddsDatanodes().get(0)
|
||||
@ -127,6 +129,14 @@ public class TestBlockDeletion {
|
||||
matchContainerTransactionIds();
|
||||
om.deleteKey(keyArgs);
|
||||
Thread.sleep(5000);
|
||||
// The blocks should not be deleted in the DN as the container is open
|
||||
Assert.assertTrue(!verifyBlocksDeleted(omKeyLocationInfoGroupList));
|
||||
|
||||
// close the containers which hold the blocks for the key
|
||||
Assert
|
||||
.assertTrue(
|
||||
OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm));
|
||||
Thread.sleep(5000);
|
||||
// The blocks should be deleted in the DN.
|
||||
Assert.assertTrue(verifyBlocksDeleted(omKeyLocationInfoGroupList));
|
||||
|
||||
@ -157,7 +167,7 @@ public class TestBlockDeletion {
|
||||
private boolean verifyBlocksCreated(
|
||||
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups)
|
||||
throws IOException {
|
||||
return performOperationOnKeyContainers((blockID) -> {
|
||||
return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
|
||||
try {
|
||||
MetadataStore db = KeyUtils.getDB((KeyValueContainerData)
|
||||
dnContainerSet.getContainer(blockID.getContainerID())
|
||||
@ -172,7 +182,7 @@ public class TestBlockDeletion {
|
||||
private boolean verifyBlocksDeleted(
|
||||
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups)
|
||||
throws IOException {
|
||||
return performOperationOnKeyContainers((blockID) -> {
|
||||
return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
|
||||
try {
|
||||
MetadataStore db = KeyUtils.getDB((KeyValueContainerData)
|
||||
dnContainerSet.getContainer(blockID.getContainerID())
|
||||
@ -188,25 +198,4 @@ public class TestBlockDeletion {
|
||||
}
|
||||
}, omKeyLocationInfoGroups);
|
||||
}
|
||||
|
||||
private boolean performOperationOnKeyContainers(Consumer<BlockID> consumer,
|
||||
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups)
|
||||
throws IOException {
|
||||
|
||||
try {
|
||||
for (OmKeyLocationInfoGroup omKeyLocationInfoGroup :
|
||||
omKeyLocationInfoGroups) {
|
||||
List<OmKeyLocationInfo> omKeyLocationInfos =
|
||||
omKeyLocationInfoGroup.getLocationList();
|
||||
for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfos) {
|
||||
BlockID blockID = omKeyLocationInfo.getBlockID();
|
||||
consumer.accept(blockID);
|
||||
}
|
||||
}
|
||||
} catch (Error e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.OzoneTestUtils;
|
||||
import org.apache.hadoop.ozone.client.BucketArgs;
|
||||
import org.apache.hadoop.ozone.client.VolumeArgs;
|
||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||
@ -698,6 +699,8 @@ public class TestKeys {
|
||||
for (OmKeyInfo keyInfo : createdKeys) {
|
||||
List<OmKeyLocationInfo> locations =
|
||||
keyInfo.getLatestVersionLocations().getLocationList();
|
||||
OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
|
||||
ozoneCluster.getStorageContainerManager());
|
||||
for (OmKeyLocationInfo location : locations) {
|
||||
KeyValueHandler keyValueHandler = (KeyValueHandler) cm
|
||||
.getDispatcher().getHandler(ContainerProtos.ContainerType
|
||||
|
Loading…
x
Reference in New Issue
Block a user